28
0
0
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники
Назад

От стримов к вебсокетам: как я боролся с буферизацией и наконец победил

Время чтения 64 минуты
Нет времени читать?
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники
28
0
0
Нет времени читать?
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники

Перефразируя Эдгара Дега:
«Искусство — это не то, что ты делаешь,
а то, что ты заставляешь машину сделать».

 

Привет. Меня зовут Николай Пискунов, я руководитель направления Big Data и эксперт курса Cloud DevSecOps по безопасной разработке от Академии вАЙТИ. Продолжаю цикл статей о клиенте для облачного сервиса Ollama. В первой части я рассказал о начале работы над клиентом и оставил пасхалку про сломанный стриминг. Во второй мы переехали с PostgreSQL на MongoDB, добавили анализ проектов с уважением к .gitignore и оптимизировали фронтенд.

Я обещал рассказать о поддержке системных промтов и сравнении ответов разных моделей. Но, как это часто бывает в разработке, планы пришлось скорректировать: проблема стриминга, о которой я упоминал ещё в первой статье, вконец меня доконала. Сегодня я расскажу, почему Server-Sent Events (SSE) оказались неподходящим выбором, как вебсокеты исправили ситуацию и как я поигрался с форматированием ответов от ассистента.

От стримов к вебсокетам: как я боролся с буферизацией и наконец победил

Та самая пасхалка: напоминание о проблеме

В первой статье я упомянул, что «стриминг в этом проекте работает с ошибкой». Проблема была в кастомном хуке useEventStream, который использовал fetch вместо нативного EventSource. Причина была уважительной: EventSource поддерживает только GET-запросы, а нам нужен был POST для передачи больших промтов. В результате последний чанк мог теряться, соединение закрывалось преждевременно, а иногда я получал обрывки ответа.

Почему стримы не работают

Давайте честно посмотрим на доступные варианты организации стриминга от ИИ-модели.

Server-Sent Events (SSE) — то, с чего мы начинали. Это простой и элегантный протокол, когда он подходит, но в нашем случае оказался не самым удачным.

Плюсы SSE:

  • простота реализации на сервере;
  • работает поверх обычного HTTP;
  • автоматическое переподключение в браузере.

Минусы, которые стали критическими:

  • Только однонаправленная связь (сервер → клиент). Для передачи промта нам пришлось делать отдельный POST-запрос.
  • Nginx и другие прокси любят буферизировать ответы. Мы выкручивались заголовками X-Accel-Buffering: no, но это работало не везде.
  • Проблемы с корпоративными прокси и фаерволами, которые могут резать долгие соединения.
  • Ограничение на количество одновременных соединений в браузере (обычно шесть на домен).

Reactive Spring (WebFlux). Я рассматривал возможность переписать бэкенд на реактивный стек. Звучало заманчиво: неблокирующий I/O, backpressure, нативная поддержка SSE через Flux.

Плюсы WebFlux:

  • отличная масштабируемость;
  • реактивные драйверы MongoDB;
  • контроль нагрузки через backpressure.

Но минусы перевесили:

  • Проект уже написан на Spring MVC. Переход на WebFlux потребовал бы практически полной переработки бэкенда.
  • Смешивать MVC и WebFlux в одном приложении — плохая практика.
  • Команда (читай: я) гораздо лучше знакома с классическим стеком.
  • Крутая кривая обучения и сложность отладки реактивного кода.

WebSocket — компромиссный вариант. Когда я начал изучать WebSocket, понял: это именно то, что нужно. Да, протокол сложнее, но он решает все наши проблемы.

Плюсы WebSocket, которые меня убедили:

  • Полнодуплексная связь. Клиент может отправлять сообщения в любое время, не делая отдельные HTTP-запросы.
  • Меньше проблем с прокси. WebSocket спроектирован с учётом инфраструктурных особенностей.
  • Единый канал. Всё общение идёт через одно соединение: и отправка промта, и получение стрима, и ошибки, и служебные сообщения.
  • Лучше для реального времени. Никаких костылей с буферизацией и тайм-аутами.
  • Можно использовать STOMP. Этот подпротокол даёт структурированные сообщения с маршрутизацией, делая код похожим на обычные REST-контроллеры.

Минусы, с которыми пришлось смириться:

  • более сложный протокол, нужно управлять соединениями;
  • требуется обрабатывать переподключения и потерю связи;
  • немного выше накладные расходы на поддержание соединения.

Но для нашего сценария плюсы очевидно перевешивают.

Переходим на вебсокеты

Выбрав WebSocket, я решил не изобретать велосипед и использовать STOMP — проверенный протокол поверх вебсокетов, который отлично интегрируется со Spring.

Убираем старый код в бэкенде

Первым делом я удалил старый эндпоинт /api/chat/stream из ChatController. Теперь он отвечает только за очистку истории. Весь стриминг переехал в новый ChatWebSocketController:


java
@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatWebSocketController {
 
    private final ChatService chatService;
 
    @MessageMapping("/chat.send")
    public void sendMessage(@Valid @Payload ChatMessagePayload payload) {
        new Thread(() -> {
            try {
                chatService.processAndStreamChat(payload);
            } catch (Exception e) {
                log.error("Error processing chat message", e);
            }
        }).start();
    }
 
    @MessageExceptionHandler
    @SendToUser("/topic/errors")
    public String handleException(Exception exception) {
        return "Ошибка на стороне сервера: " + exception.getMessage();
    }
}

 

Обратите внимание: стартует новый поток для обработки сообщения, чтобы не блокировать обработчик STOMP. В продакшне я планирую заменить это пулом потоков, но пока работает и так.

Конфигурация вебсокетов оказалась простой:


java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
 
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("http://localhost:5173")
                .withSockJS();
    }
 
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue", "/user");
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user");
    }
}

 

Клиенты подключаются к /ws через SockJS (фолбэк для старых браузеров), отправляют сообщения в /app/chat.send, а получают ответы в топике /topic/chat/{sessionId}.

Адаптируем сервис под вебсокеты

Сервисный слой изменился минимально. Вместо SseEmitter теперь используется SimpMessagingTemplate для отправки сообщений:


java
public void processAndStreamChat(ChatMessagePayload payload) {
    String destination = "/topic/chat/" + payload.getSessionId();
    
    try {
        // ... подготовка запроса к Ollama Cloud ...
       
        restClient.post()
                .uri("/v1/chat/completions")
                .body(requestBody)
                .exchange((clientRequest, clientResponse) -> {
                    // ... чтение стрима ...
                    
                    while ((line = reader.readLine()) != null) {
                        if (line.startsWith("data: ") && !line.equals("data: [DONE]")) {
                            String text = parseChunk(line);
                            if (!text.isEmpty()) {
                                messagingTemplate.convertAndSend(destination,
                                        new ChatResponsePayload(text, false));
                            }
                        }
                    }
                    
                    messagingTemplate.convertAndSend(destination,
                            new ChatResponsePayload("", true));
                    return null;
                });
    } catch (Exception e) {
        messagingTemplate.convertAndSend(destination,
                new ChatResponsePayload("Ошибка сервера: " + e.getMessage(), true));
    }
}

 

Добавляем на фронтенде новый хук

На клиенте я написал хук useWebSocket, который использует @stomp/stompjs и sockjs-client. Вот его ключевые части:


typescript
export const useWebSocket = ({ sessionId, onMessageReceived, onError }) => {
    const [isConnected, setIsConnected] = useState(false);
    const stompClientRef = useRef<Client | null>(null);
 
    const connect = useCallback(() => {
        const socket = new SockJS(`${baseUrl}/ws`);
        const client = new Client({
            webSocketFactory: () => socket,
            reconnectDelay: 5000,
            heartbeatIncoming: 4000,
            heartbeatOutgoing: 4000,
            
            onConnect: () => {
                setIsConnected(true);
                
                client.subscribe(`/topic/chat/${sessionId}`, (message) => {
                    const response = JSON.parse(message.body);
                    onMessageReceived?.(response.message, response.complete);
                });
                
                client.subscribe('/user/topic/errors', (message) => {
                    onError?.(message.body);
                });
            },
            
            onStompError: (frame) => {
                console.error('STOMP error', frame);
                handleReconnect();
            },
            
            onWebSocketError: (event) => {
                console.error('WebSocket error', event);
                handleReconnect();
            }
        });
        
        client.activate();
        stompClientRef.current = client;
    }, [sessionId]);
 
    const sendMessage = useCallback((message: string, model: string, temperature: number) => {
        if (stompClientRef.current?.connected) {
            stompClientRef.current.publish({
                destination: '/app/chat.send',
                body: JSON.stringify({ sessionId, message, model, temperature })
            });
            return true;
        }
        return false;
    }, [sessionId]);
 
    return { isConnected, sendMessage, disconnect };
};

 

Что здесь важно:

  • автоматическое переподключение с экспоненциальной задержкой;
  • heartbeat для обнаружения мёртвых соединений;
  • отдельный канал для ошибок (/user/topic/errors);
  • обработка всех типов ошибок: STOMP-протокол, WebSocket, тайм-ауты.

Что изменилось для пользователя

В интерфейсе появился индикатор состояния подключения:


tsx
<div className={`connection-status ${isConnected ? 'connected' : 'disconnected'}`}>
    {isConnected ? '🟢 Онлайн' : (isConnecting ? '🟡 Подключение...' : '🔴 Офлайн')}
</div>

 

Теперь пользователь всегда видит, работает ли соединение. Если связь потерялась, хук автоматически переподключается, и чат продолжается с того же места.

Добавляем форматирование ответов

Параллельно с переходом на вебсокеты я занялся давно назревшей проблемой: ответы от нейросети приходили как простой текст, хотя модели часто возвращают размеченный Markdown с блоками кода.

Одно дело, когда ассистент отвечает текстом, и совсем другое — когда выдаёт многострочные примеры кода. Без подсветки синтаксиса и нормального форматирования это выглядело ужасно. Решить проблему помог переход от разметки Markdown на HTML с подсветкой.

Я добавил в проект несколько зависимостей:


json
"dependencies": {
    "marked": "^17.0.3",
    "marked-highlight": "^2.2.3",
    "highlight.js": "^11.11.1",
    "dompurify": "^3.3.1"
}

 

И написал утилиту markdownToHtml:


typescript
import { marked } from 'marked';
import { markedHighlight } from 'marked-highlight';
import DOMPurify from 'dompurify';
import hljs from 'highlight.js';
import 'highlight.js/styles/vs.css';
 
marked.use(
    markedHighlight({
        langPrefix: 'hljs language-',
        highlight(code, lang) {
            const language = hljs.getLanguage(lang) ? lang : 'plaintext';
            return hljs.highlight(code, { language }).value;
        }
    })
);
 
marked.setOptions({ breaks: true, gfm: true });
 
export const markdownToHtml = (markdown: string): string => {
    if (!markdown) return '';
    
    try {
        const rawHtml = marked.parse(markdown) as string;
        return DOMPurify.sanitize(rawHtml, {
            USE_PROFILES: { html: true },
            ALLOWED_ATTR: ['class', 'style']
        });
    } catch (error) {
        console.error('Error parsing Markdown:', error);
        return DOMPurify.sanitize(markdown);
    }
};

 

Теперь каждый ответ ассистента проходит через этот конвейер. Блоки кода получают подсветку синтаксиса с нумерацией строк.

Доработка компонента MessageList

В компоненте MessageList я добавил постобработку HTML: для каждого блока pre определяем язык по классу и добавляем атрибут data-language:


typescript
const processHtmlContent = useCallback((html: string): string => {
    const tempDiv = document.createElement('div');
    tempDiv.innerHTML = html;
 
    const preElements = tempDiv.querySelectorAll('pre');
    preElements.forEach(pre => {
        const codeElement = pre.querySelector('code');
        if (codeElement) {
            const classList = codeElement.className.split(' ');
            const languageClass = classList.find(c => c.startsWith('language-'));
            const language = languageClass ? languageClass.replace('language-', '') : 'plaintext';
            
            pre.setAttribute('data-language', language);
            
            const code = codeElement.textContent || '';
            const lines = code.split('n');
            codeElement.innerHTML = lines
                .map(line => `<span class="line">${line || ' '}</span>`)
                .join('n');
        }
    });
 
    return tempDiv.innerHTML;
}, []);

 

В CSS добавил стили для красивого отображения:


css
.formatted-content pre {
    position: relative;
    background-color: #0d1117;
    border-radius: 8px;
    padding: 1em 0 1em 1em;
    margin: 1.5em 0;
    overflow-x: auto;
    border: 1px solid #30363d;
    font-family: 'SF Mono', monospace;
    font-size: 14px;
}
 
.formatted-content pre::before {
    content: attr(data-language);
    position: absolute;
    top: 0.5em;
    right: 1em;
    color: #8b949e;
    font-size: 12px;
    background-color: #1f2937;
    padding: 0.2em 0.8em;
    border-radius: 20px;
    border: 1px solid #30363d;
}
 
.formatted-content pre .copy-code-btn {
    position: absolute;
    top: 0.5em;
    right: 5em;
    background-color: #1f2937;
    color: #e6edf3;
    border: 1px solid #30363d;
    border-radius: 6px;
    padding: 0.2em 0.8em;
    font-size: 12px;
    cursor: pointer;
    opacity: 0;
    transition: opacity 0.2s ease;
}
 
.formatted-content pre:hover .copy-code-btn {
    opacity: 1;
}

 

Модальное окно для длинных сообщений

Для очень длинных ответов оставил возможность открыть сообщение в модальном окне с полным форматированием. Это полезно, когда ассистент выдаёт несколько сотен строк кода.

Что дальше?

Я обещал во второй статье рассказать о поддержке системных промтов и сравнении ответов разных моделей. Эти планы никуда не делись, просто пришлось сперва закрыть технический долг.

В ближайших планах:

  • Системные промты. Возможность задать контекст для ассистента, который будет учитываться во всех ответах в рамках сессии.
  • Сравнение моделей. Отправлять один запрос сразу нескольким моделям и видеть их ответы рядом. Это поможет выбирать лучшую модель под конкретную задачу.
  • История сессий. Улучшить управление историей: возможность переименовывать сессии, удалять отдельные сообщения, экспортировать диалоги.

Выводы

Переход на вебсокеты оказался правильным решением. Да, пришлось переписать часть кода и добавить новую логику управления соединениями, но результат того стоил:

  • стриминг работает стабильно, без потери чанков;
  • единый канал для всего общения упростил архитектуру;
  • появилась возможность легко добавлять новые типы сообщений (уведомления, статусы, команды);
  • интерфейс стал информативнее с индикатором состояния.

А форматирование ответов с подсветкой кода сделало использование чата по-настоящему приятным. Теперь код от ассистента выглядит так, как и должен выглядеть — с подсветкой, нумерацией строк и возможностью скопировать одним кликом.

Проект живёт и развивается на GitVerse в ветке ws-chat. Заходите, ставьте звёздочки, создавайте issue и пул-реквесты. Вместе мы сделаем его ещё лучше!

P. S. Если ваш стриминг всё ещё не работает, попробуйте вебсокеты. И не забудьте о heartbeat.

Комментарии0
Тоже интересно
Комментировать
Поделиться
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники