Учебник

Стриминг: как AI-бот отвечает мгновенно, а не за 10 секунд

Ваш чат-бот или ассистент заставляет пользователей ждать 8-12 секунд - они уходят или перезагружают страницу. Разбираем, как включить стриминг (ответ по мере генерации) для OpenAI и Claude без изменения стоимости. Конкретный код для FastAPI, Telegram-бота и браузера - внедрите сами или с менеджером за 2-4 часа.

Макс Космов··8 мин чтения

Представьте: ваш клиент задаёт вопрос в чат-боте на сайте, а ответ приходит через 10 секунд. Он думает, что сайт завис, закрывает вкладку и уходит к конкуренту. Знакомо?

Стриминг решает это: первое слово появляется через 0.3 секунды, текст печатается постепенно. Пользователь видит, что система работает, и остаётся. Стоимость запроса та же - меняется только способ доставки.

Разберём на примере турагентства. У вас на сайте чат-бот подбирает туры. Без стриминга клиент ждёт 8 секунд и уходит. Со стримингом - текст появляется сразу, клиент видит «Ищу варианты...» и остаётся. Результат: больше заявок, меньше отказов.

Что такое стриминг и зачем он бизнесу

SSE (Server-Sent Events) - простой протокол, при котором сервер отправляет данные порциями по мере готовности, а не одним большим ответом.

Без стриминга: отправили запрос -> ждём 8-12 секунд -> получили весь ответ. Пользователь нервничает, нажимает кнопку повторно, уходит.

Со стримингом: отправили -> через 0.2-0.4 секунды появляется первое слово -> текст печатается. Субъективно быстро, даже если общее время генерации то же.

Дополнительные плюсы:

  • Можно прервать генерацию, если ответ пошёл не туда - экономия токенов.
  • Показывать прогресс для длинных задач.
  • Реагировать на первые слова, не дожидаясь конца.

Стриминг нужен в продуктах, где пользователь взаимодействует с моделью напрямую: чат-боты, ассистенты, генерация текста. Не нужен при офлайн-обработке данных (Batch API) и в автоматических конвейерах, где результат идёт сразу в базу.

Стоимость не меняется - токены считаются те же.

OpenAI: stream=True - одна строка

Включить стриминг в OpenAI SDK - одна строка. Вместо ожидания полного ответа вы получаете поток маленьких объектов (чанков, кусочков данных).

Этот код включает потоковую передачу: каждый фрагмент выводится на экран сразу по мере генерации.

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
 model="gpt-4o",
 messages=[{"role": "user", "content": "Напиши короткое эссе о Python"}],
 stream=True
)

for chunk in stream:
 delta = chunk.choices[0].delta
 if delta.content is not None:
 print(delta.content, end="", flush=True)
 
 finish = chunk.choices[0].finish_reason
 if finish is not None:
 print() # новая строка в конце
 break

Каждый chunk - небольшой объект с полем delta.content. Большую часть времени это 1-4 токена. В конце finish_reason становится 'stop' (нормальное завершение) или 'length' (обрезано по лимиту токенов).

flush=True нужен, чтобы символы выводились сразу. В браузерном интерфейсе или через API это не требуется.

OpenAI Stream helper: удобнее для боевой среды

OpenAI SDK предоставляет менеджер контекста (конструкция Python, которая автоматически управляет ресурсами), который сам собирает финальное сообщение.

Этот вариант удобнее для боевой среды: text_stream отдаёт уже готовые строки без разбора дельт вручную, а get_final_completion() возвращает полный объект ответа с данными о потраченных токенах.

with client.chat.completions.stream(
 model="gpt-4o",
 messages=[{"role": "user", "content": "Расскажи про квантовые компьютеры"}]
) as stream:
 for text in stream.text_stream:
 print(text, end="", flush=True)
 
 # После завершения доступен полный объект
 final = stream.get_final_completion()
 print(f"\nTokens: {final.usage.total_tokens}")

text_stream - итератор по строкам текста, уже собранным из дельт. get_final_completion() возвращает полный объект с usage (данными по расходу токенов) - удобно для логирования стоимости каждого запроса.

Данные по токенам при стриминге доступны только в последнем чанке или через get_final_completion(). Чтобы получить их в сыром режиме, добавьте stream_options={"include_usage": True} в параметры запроса.

Claude streaming - аналогично, но события называются иначе

Anthropic SDK (библиотека для работы с Claude) устроен аналогично.

Этот код стримит ответ Claude и в конце получает полный объект с данными по потраченным токенам.

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Объясни рекурсию"}]
) as stream:
 for text in stream.text_stream:
 print(text, end="", flush=True)
 
 # Финальный объект
 message = stream.get_final_message()
 print(f"\nInput: {message.usage.input_tokens}, Output: {message.usage.output_tokens}")

Для тех, кто хочет явно контролировать каждое событие - низкоуровневый вариант с перечнем типов событий:

with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=1024,
 messages=[{"role": "user", "content": "Привет"}]
) as stream:
 for event in stream:
 if event.type == "content_block_delta":
 print(event.delta.text, end="", flush=True)
 elif event.type == "message_stop":
 break

Типы событий Claude: message_start (начало, метаданные модели), content_block_start (начало блока текста), content_block_delta (порция текста), message_delta (данные по токенам в конце), message_stop (конец генерации).

Сборка полного ответа из потока

Для сохранения полного ответа накапливайте буфер параллельно с выводом.

Этот код одновременно выводит текст на экран и собирает его в переменную - чтобы потом использовать полный ответ в коде (сохранить в базу, передать дальше).

full_text = ""

with client.messages.stream(
 model="claude-sonnet-4-6",
 max_tokens=2048,
 messages=[{"role": "user", "content": "Составь план статьи о Python"}]
) as stream:
 for text in stream.text_stream:
 full_text += text
 print(text, end="", flush=True)

print(f"\nПолный ответ ({len(full_text)} символов):")
print(full_text)

При вызове функций (tool use) стриминг сложнее: Claude может отдавать несколько блоков подряд - сначала текст, потом вызов функции. Нужно отслеживать тип каждого блока и собирать аргументы функции из событий типа input_json_delta.

FastAPI: StreamingResponse - прокидываем стриминг через свой API

FastAPI - популярный Python-фреймворк для создания HTTP API. Часто нужно прокинуть стриминг от языковой модели к клиенту через собственный API: браузер или мобильное приложение обращается к вашему серверу, тот - к OpenAI, и передаёт ответ обратно потоком.

Этот код создаёт FastAPI-эндпоинт, который принимает запрос пользователя, обращается к GPT-4o с потоковой передачей и отправляет каждый фрагмент клиенту сразу по мере получения.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio

app = FastAPI()
aclient = AsyncOpenAI()

async def generate_stream(prompt: str):
 async with aclient.chat.completions.stream(
 model="gpt-4o",
 messages=[{"role": "user", "content": prompt}]
 ) as stream:
 async for text in stream.text_stream:
 # Формат SSE: data: <text>\n\n
 yield f"data: {text}\n\n"
 yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_endpoint(prompt: str):
 return StreamingResponse(
 generate_stream(prompt),
 media_type="text/event-stream",
 headers={
 "Cache-Control": "no-cache",
 "X-Accel-Buffering": "no" # для nginx: отключить буферизацию
 }
 )

Заголовок X-Accel-Buffering: no критичен, если перед FastAPI стоит nginx (популярный веб-сервер, часто используемый как прокси). Без этого заголовка nginx буферизует (накапливает) ответ, и клиент получит всё разом в конце - эффект стриминга пропадёт.

На клиенте (в браузерном JavaScript) стрим обрабатывается через EventSource или через fetch с чтением ReadableStream.

Telegram-бот со стримингом - имитация печатания

Telegram не поддерживает настоящий SSE, но можно имитировать эффект: отправить пустое сообщение и редактировать его по мере генерации.

Этот код отправляет в Telegram пустое сообщение и затем редактирует его каждую секунду, добавляя новые фрагменты - создавая эффект печатания в реальном времени.

import asyncio
from telegram import Bot
import anthropic

async def stream_to_telegram(bot: Bot, chat_id: int, prompt: str):
 client = anthropic.AsyncAnthropic()
 
 # Отправляем пустое сообщение и получаем message_id
 msg = await bot.send_message(chat_id, "...")
 
 buffer = ""
 last_update = 0
 
 async with client.messages.stream(
 model="claude-haiku-4-5", # Haiku быстрее для интерактивных сценариев
 max_tokens=1024,
 messages=[{"role": "user", "content": prompt}]
 ) as stream:
 async for text in stream.text_stream:
 buffer += text
 now = asyncio.get_event_loop().time()
 
 # Обновляем не чаще 1 раза в секунду (rate limit Telegram: ~30 edit/мин)
 if now - last_update >= 1.0 and buffer:
 try:
 await bot.edit_message_text(
 chat_id=chat_id,
 message_id=msg.message_id,
 text=buffer
 )
 last_update = now
 except Exception:
 pass # сообщение не изменилось - telegram вернёт ошибку
 
 # Финальное обновление с полным текстом
 await bot.edit_message_text(
 chat_id=chat_id,
 message_id=msg.message_id,
 text=buffer
 )

Telegram Bot API ограничивает: не более 30 editMessageText в минуту на бот глобально, и не чаще 1 раза в секунду для конкретного чата. Задержка между обновлениями в 1 секунду - разумный баланс между плавностью и соблюдением лимитов.

Для быстрых ответов в боте используйте claude-haiku-4-5 - первый токен за 0.2-0.3 секунды против 0.5-0.8 секунды у Sonnet.

Отмена и таймауты - не тратим токены впустую

При долгой генерации пользователь может закрыть браузер или нажать «стоп». Нужно корректно прерывать стрим.

Этот код ограничивает время стриминга: если генерация длится дольше 10 секунд, поток обрывается с сообщением пользователю.

import httpx
from openai import AsyncOpenAI

aclient = AsyncOpenAI(timeout=httpx.Timeout(30.0, connect=5.0))

async def stream_with_cancel(prompt: str, max_seconds: float = 10.0):
 try:
 async with asyncio.timeout(max_seconds):
 async with aclient.chat.completions.stream(
 model="gpt-4o",
 messages=[{"role": "user", "content": prompt}]
 ) as stream:
 async for text in stream.text_stream:
 yield text
 except asyncio.TimeoutError:
 yield "\n[Превышено время ожидания]"
 except asyncio.CancelledError:
 # Клиент отключился
 pass

httpx.Timeout(30.0, connect=5.0) - 30 секунд на чтение стрима, 5 секунд на установку соединения. asyncio.timeout() позволяет прервать весь стрим по лимиту времени.

При отключении клиента FastAPI и ASGI (интерфейс для асинхронных Python-серверов) автоматически отменяют выполнение через CancelledError. Поймайте его явно, чтобы закрыть стрим корректно и освободить ресурсы.

Стриминг в браузере: EventSource и fetch

Когда ваш FastAPI-сервис отдаёт стрим, браузер должен его принять. Два способа на JavaScript.

EventSource - простой стандартный способ для SSE. Браузер сам переподключается при обрыве:

const source = new EventSource('/stream?prompt=Привет');
source.onmessage = (event) => {
 if (event.data === '[DONE]') {
 source.close();
 return;
 }
 document.getElementById('output').textContent += event.data;
};

Через fetch с ReadableStream - более гибкий способ, позволяет передавать заголовки (например, авторизационный токен):

const response = await fetch('/stream', {
 method: 'POST',
 headers: { 'Authorization': 'Bearer ' + token },
 body: JSON.stringify({ prompt: 'Привет' })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
 const { done, value } = await reader.read();
 if (done) break;
 const text = decoder.decode(value);
 document.getElementById('output').textContent += text;
}

EventSource не поддерживает POST-запросы и кастомные заголовки - для авторизованных API используйте fetch.

Стриминг в синхронном Flask

Если приложение написано на Flask (популярный синхронный Python-фреймворк) и нет возможности перейти на asyncio, стриминг работает через функцию-генератор (функцию, которая выдаёт данные по частям через yield).

Этот код добавляет стриминговый эндпоинт в Flask: Flask вызывает генератор и отправляет каждый фрагмент клиенту сразу, не накапливая весь ответ в памяти.

from flask import Flask, Response, stream_with_context
from openai import OpenAI

app = Flask(__name__)
client = OpenAI()

def generate(prompt):
 with client.chat.completions.stream(
 model="gpt-4o",
 messages=[{"role": "user", "content": prompt}]
 ) as stream:
 for text in stream.text_stream:
 yield f"data: {text}\n\n"
 yield "data: [DONE]\n\n"

@app.route("/stream")
def stream_endpoint():
 prompt = request.args.get("prompt", "Привет")
 return Response(
 stream_with_context(generate(prompt)),
 mimetype="text/event-stream",
 headers={"X-Accel-Buffering": "no"}
 )

stream_with_context нужен в Flask, чтобы генератор имел доступ к контексту запроса во время выполнения. Без него Flask завершит контекст запроса до того, как генератор отдаст все данные.

Типичные подводные камни

Nginx буферизует ответ - самая частая проблема при развёртывании. Без X-Accel-Buffering: no пользователь не увидит стриминг. Проверьте этот заголовок первым делом, если стриминг работал локально, но сломался после выкатки.

Стрим обрывается без ошибки - причина часто в overloaded_error у Claude (код 529) или в сетевом таймауте. Добавьте retry-логику с повторным открытием стрима. В продакшне логируйте все обрывы и отслеживайте их частоту.

Токены при стриминге не считаются - данные по расходу токенов доступны только в конце стрима. Используйте get_final_completion() или get_final_message() после завершения, или добавьте stream_options={"include_usage": True} для OpenAI.

Частые вопросы

Можно ли стримить ответы с вызовом функций?

Да, но сложнее. Аргументы функции приходят как input_json_delta (Claude) или tool_calls[i].function.arguments чанками (OpenAI). Нужно собирать JSON вручную. SDK-хелперы частично упрощают это.

Как реализовать стриминг в синхронном коде?

Используйте синхронные клиенты OpenAI и Anthropic (не Async-версии) с with client.chat.completions.stream(). Работает в обычных скриптах и Flask без asyncio.

Влияет ли стриминг на стоимость?

Нет. Токены считаются те же. Стриминг меняет только способ доставки ответа клиенту - порциями вместо одного куска.

Почему стрим обрывается?

Причин несколько: сетевой таймаут, перегрузка сервера (особенно overloaded_error у Claude), nginx буферизует ответ. Проверьте X-Accel-Buffering: no и увеличьте httpx timeout. Для продакшна добавьте retry при обрыве.

Что делать дальше

  1. Возьмите код для вашего сценария (FastAPI, Telegram-бот или Flask) из статьи.
  2. Вставьте свой API-ключ OpenAI или Anthropic.
  3. Запустите локально - проверьте, что стриминг работает.
  4. Выкатите на сервер - не забудьте заголовок X-Accel-Buffering: no.

Всё. Ваши клиенты больше не ждут по 10 секунд.

AI Компас (t.me/kosmoslab_ai) - канал для предпринимателей в РФ и СНГ, которые применяют AI в своём бизнесе без программиста. Разбираем инструменты и схемы - без курсов и теории.