Представьте: ваш клиент задаёт вопрос в чат-боте на сайте, а ответ приходит через 10 секунд. Он думает, что сайт завис, закрывает вкладку и уходит к конкуренту. Знакомо?
Стриминг решает это: первое слово появляется через 0.3 секунды, текст печатается постепенно. Пользователь видит, что система работает, и остаётся. Стоимость запроса та же - меняется только способ доставки.
Разберём на примере турагентства. У вас на сайте чат-бот подбирает туры. Без стриминга клиент ждёт 8 секунд и уходит. Со стримингом - текст появляется сразу, клиент видит «Ищу варианты...» и остаётся. Результат: больше заявок, меньше отказов.
Что такое стриминг и зачем он бизнесу
SSE (Server-Sent Events) - простой протокол, при котором сервер отправляет данные порциями по мере готовности, а не одним большим ответом.
Без стриминга: отправили запрос -> ждём 8-12 секунд -> получили весь ответ. Пользователь нервничает, нажимает кнопку повторно, уходит.
Со стримингом: отправили -> через 0.2-0.4 секунды появляется первое слово -> текст печатается. Субъективно быстро, даже если общее время генерации то же.
Дополнительные плюсы:
- Можно прервать генерацию, если ответ пошёл не туда - экономия токенов.
- Показывать прогресс для длинных задач.
- Реагировать на первые слова, не дожидаясь конца.
Стриминг нужен в продуктах, где пользователь взаимодействует с моделью напрямую: чат-боты, ассистенты, генерация текста. Не нужен при офлайн-обработке данных (Batch API) и в автоматических конвейерах, где результат идёт сразу в базу.
Стоимость не меняется - токены считаются те же.
OpenAI: stream=True - одна строка
Включить стриминг в OpenAI SDK - одна строка. Вместо ожидания полного ответа вы получаете поток маленьких объектов (чанков, кусочков данных).
Этот код включает потоковую передачу: каждый фрагмент выводится на экран сразу по мере генерации.
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Напиши короткое эссе о Python"}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content is not None:
print(delta.content, end="", flush=True)
finish = chunk.choices[0].finish_reason
if finish is not None:
print() # новая строка в конце
break
Каждый chunk - небольшой объект с полем delta.content. Большую часть времени это 1-4 токена. В конце finish_reason становится 'stop' (нормальное завершение) или 'length' (обрезано по лимиту токенов).
flush=True нужен, чтобы символы выводились сразу. В браузерном интерфейсе или через API это не требуется.
OpenAI Stream helper: удобнее для боевой среды
OpenAI SDK предоставляет менеджер контекста (конструкция Python, которая автоматически управляет ресурсами), который сам собирает финальное сообщение.
Этот вариант удобнее для боевой среды: text_stream отдаёт уже готовые строки без разбора дельт вручную, а get_final_completion() возвращает полный объект ответа с данными о потраченных токенах.
with client.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": "Расскажи про квантовые компьютеры"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# После завершения доступен полный объект
final = stream.get_final_completion()
print(f"\nTokens: {final.usage.total_tokens}")
text_stream - итератор по строкам текста, уже собранным из дельт. get_final_completion() возвращает полный объект с usage (данными по расходу токенов) - удобно для логирования стоимости каждого запроса.
Данные по токенам при стриминге доступны только в последнем чанке или через get_final_completion(). Чтобы получить их в сыром режиме, добавьте stream_options={"include_usage": True} в параметры запроса.
Claude streaming - аналогично, но события называются иначе
Anthropic SDK (библиотека для работы с Claude) устроен аналогично.
Этот код стримит ответ Claude и в конце получает полный объект с данными по потраченным токенам.
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Объясни рекурсию"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Финальный объект
message = stream.get_final_message()
print(f"\nInput: {message.usage.input_tokens}, Output: {message.usage.output_tokens}")
Для тех, кто хочет явно контролировать каждое событие - низкоуровневый вариант с перечнем типов событий:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Привет"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
break
Типы событий Claude: message_start (начало, метаданные модели), content_block_start (начало блока текста), content_block_delta (порция текста), message_delta (данные по токенам в конце), message_stop (конец генерации).
Сборка полного ответа из потока
Для сохранения полного ответа накапливайте буфер параллельно с выводом.
Этот код одновременно выводит текст на экран и собирает его в переменную - чтобы потом использовать полный ответ в коде (сохранить в базу, передать дальше).
full_text = ""
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": "Составь план статьи о Python"}]
) as stream:
for text in stream.text_stream:
full_text += text
print(text, end="", flush=True)
print(f"\nПолный ответ ({len(full_text)} символов):")
print(full_text)
При вызове функций (tool use) стриминг сложнее: Claude может отдавать несколько блоков подряд - сначала текст, потом вызов функции. Нужно отслеживать тип каждого блока и собирать аргументы функции из событий типа input_json_delta.
FastAPI: StreamingResponse - прокидываем стриминг через свой API
FastAPI - популярный Python-фреймворк для создания HTTP API. Часто нужно прокинуть стриминг от языковой модели к клиенту через собственный API: браузер или мобильное приложение обращается к вашему серверу, тот - к OpenAI, и передаёт ответ обратно потоком.
Этот код создаёт FastAPI-эндпоинт, который принимает запрос пользователя, обращается к GPT-4o с потоковой передачей и отправляет каждый фрагмент клиенту сразу по мере получения.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio
app = FastAPI()
aclient = AsyncOpenAI()
async def generate_stream(prompt: str):
async with aclient.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
# Формат SSE: data: <text>\n\n
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
@app.get("/stream")
async def stream_endpoint(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # для nginx: отключить буферизацию
}
)
Заголовок X-Accel-Buffering: no критичен, если перед FastAPI стоит nginx (популярный веб-сервер, часто используемый как прокси). Без этого заголовка nginx буферизует (накапливает) ответ, и клиент получит всё разом в конце - эффект стриминга пропадёт.
На клиенте (в браузерном JavaScript) стрим обрабатывается через EventSource или через fetch с чтением ReadableStream.
Telegram-бот со стримингом - имитация печатания
Telegram не поддерживает настоящий SSE, но можно имитировать эффект: отправить пустое сообщение и редактировать его по мере генерации.
Этот код отправляет в Telegram пустое сообщение и затем редактирует его каждую секунду, добавляя новые фрагменты - создавая эффект печатания в реальном времени.
import asyncio
from telegram import Bot
import anthropic
async def stream_to_telegram(bot: Bot, chat_id: int, prompt: str):
client = anthropic.AsyncAnthropic()
# Отправляем пустое сообщение и получаем message_id
msg = await bot.send_message(chat_id, "...")
buffer = ""
last_update = 0
async with client.messages.stream(
model="claude-haiku-4-5", # Haiku быстрее для интерактивных сценариев
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
buffer += text
now = asyncio.get_event_loop().time()
# Обновляем не чаще 1 раза в секунду (rate limit Telegram: ~30 edit/мин)
if now - last_update >= 1.0 and buffer:
try:
await bot.edit_message_text(
chat_id=chat_id,
message_id=msg.message_id,
text=buffer
)
last_update = now
except Exception:
pass # сообщение не изменилось - telegram вернёт ошибку
# Финальное обновление с полным текстом
await bot.edit_message_text(
chat_id=chat_id,
message_id=msg.message_id,
text=buffer
)
Telegram Bot API ограничивает: не более 30 editMessageText в минуту на бот глобально, и не чаще 1 раза в секунду для конкретного чата. Задержка между обновлениями в 1 секунду - разумный баланс между плавностью и соблюдением лимитов.
Для быстрых ответов в боте используйте claude-haiku-4-5 - первый токен за 0.2-0.3 секунды против 0.5-0.8 секунды у Sonnet.
Отмена и таймауты - не тратим токены впустую
При долгой генерации пользователь может закрыть браузер или нажать «стоп». Нужно корректно прерывать стрим.
Этот код ограничивает время стриминга: если генерация длится дольше 10 секунд, поток обрывается с сообщением пользователю.
import httpx
from openai import AsyncOpenAI
aclient = AsyncOpenAI(timeout=httpx.Timeout(30.0, connect=5.0))
async def stream_with_cancel(prompt: str, max_seconds: float = 10.0):
try:
async with asyncio.timeout(max_seconds):
async with aclient.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield text
except asyncio.TimeoutError:
yield "\n[Превышено время ожидания]"
except asyncio.CancelledError:
# Клиент отключился
pass
httpx.Timeout(30.0, connect=5.0) - 30 секунд на чтение стрима, 5 секунд на установку соединения. asyncio.timeout() позволяет прервать весь стрим по лимиту времени.
При отключении клиента FastAPI и ASGI (интерфейс для асинхронных Python-серверов) автоматически отменяют выполнение через CancelledError. Поймайте его явно, чтобы закрыть стрим корректно и освободить ресурсы.
Стриминг в браузере: EventSource и fetch
Когда ваш FastAPI-сервис отдаёт стрим, браузер должен его принять. Два способа на JavaScript.
EventSource - простой стандартный способ для SSE. Браузер сам переподключается при обрыве:
const source = new EventSource('/stream?prompt=Привет');
source.onmessage = (event) => {
if (event.data === '[DONE]') {
source.close();
return;
}
document.getElementById('output').textContent += event.data;
};
Через fetch с ReadableStream - более гибкий способ, позволяет передавать заголовки (например, авторизационный токен):
const response = await fetch('/stream', {
method: 'POST',
headers: { 'Authorization': 'Bearer ' + token },
body: JSON.stringify({ prompt: 'Привет' })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
document.getElementById('output').textContent += text;
}
EventSource не поддерживает POST-запросы и кастомные заголовки - для авторизованных API используйте fetch.
Стриминг в синхронном Flask
Если приложение написано на Flask (популярный синхронный Python-фреймворк) и нет возможности перейти на asyncio, стриминг работает через функцию-генератор (функцию, которая выдаёт данные по частям через yield).
Этот код добавляет стриминговый эндпоинт в Flask: Flask вызывает генератор и отправляет каждый фрагмент клиенту сразу, не накапливая весь ответ в памяти.
from flask import Flask, Response, stream_with_context
from openai import OpenAI
app = Flask(__name__)
client = OpenAI()
def generate(prompt):
with client.chat.completions.stream(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
@app.route("/stream")
def stream_endpoint():
prompt = request.args.get("prompt", "Привет")
return Response(
stream_with_context(generate(prompt)),
mimetype="text/event-stream",
headers={"X-Accel-Buffering": "no"}
)
stream_with_context нужен в Flask, чтобы генератор имел доступ к контексту запроса во время выполнения. Без него Flask завершит контекст запроса до того, как генератор отдаст все данные.
Типичные подводные камни
Nginx буферизует ответ - самая частая проблема при развёртывании. Без X-Accel-Buffering: no пользователь не увидит стриминг. Проверьте этот заголовок первым делом, если стриминг работал локально, но сломался после выкатки.
Стрим обрывается без ошибки - причина часто в overloaded_error у Claude (код 529) или в сетевом таймауте. Добавьте retry-логику с повторным открытием стрима. В продакшне логируйте все обрывы и отслеживайте их частоту.
Токены при стриминге не считаются - данные по расходу токенов доступны только в конце стрима. Используйте get_final_completion() или get_final_message() после завершения, или добавьте stream_options={"include_usage": True} для OpenAI.
Частые вопросы
Можно ли стримить ответы с вызовом функций?
Да, но сложнее. Аргументы функции приходят как input_json_delta (Claude) или tool_calls[i].function.arguments чанками (OpenAI). Нужно собирать JSON вручную. SDK-хелперы частично упрощают это.
Как реализовать стриминг в синхронном коде?
Используйте синхронные клиенты OpenAI и Anthropic (не Async-версии) с with client.chat.completions.stream(). Работает в обычных скриптах и Flask без asyncio.
Влияет ли стриминг на стоимость?
Нет. Токены считаются те же. Стриминг меняет только способ доставки ответа клиенту - порциями вместо одного куска.
Почему стрим обрывается?
Причин несколько: сетевой таймаут, перегрузка сервера (особенно overloaded_error у Claude), nginx буферизует ответ. Проверьте X-Accel-Buffering: no и увеличьте httpx timeout. Для продакшна добавьте retry при обрыве.
Что делать дальше
- Возьмите код для вашего сценария (FastAPI, Telegram-бот или Flask) из статьи.
- Вставьте свой API-ключ OpenAI или Anthropic.
- Запустите локально - проверьте, что стриминг работает.
- Выкатите на сервер - не забудьте заголовок
X-Accel-Buffering: no.
Всё. Ваши клиенты больше не ждут по 10 секунд.
AI Компас (t.me/kosmoslab_ai) - канал для предпринимателей в РФ и СНГ, которые применяют AI в своём бизнесе без программиста. Разбираем инструменты и схемы - без курсов и теории.