Streaming API в LLM: потоковый вывод в Python и Node.js

Streaming API в LLM: потоковый вывод в Python и Node.js

TL;DR: Streaming позволяет получать ответ от LLM по мере генерации — первый токен появляется через ~200ms вместо ожидания полного ответа. Реализуется одним параметром stream=True в Python или stream: true в Node.js через OpenAI-совместимый API. Работает с любой моделью на OfoxAI: GPT-5.4, Claude Opus 4.7, Gemini 3.1 Pro.

Почему без streaming плохо

Пользователь нажимает кнопку и 8 секунд смотрит на пустой экран. Потом весь текст появляется разом. Это стандартное поведение без streaming.

С streaming первые слова появляются через ~200ms. Субъективно приложение кажется в 3–5 раз быстрее — даже если суммарное время генерации одинаковое.

Технически это Server-Sent Events (SSE) — стандартный HTTP-механизм. Сервер держит соединение открытым и отправляет данные чанками по мере генерации. Каждый чанк содержит несколько токенов.

Как выглядит SSE-поток изнутри

Сырой ответ от API:

data: {"choices":[{"delta":{"content":"Привет"},"finish_reason":null}]}
data: {"choices":[{"delta":{"content":", мир"},"finish_reason":null}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]

delta.content содержит новый кусок текста. Поток заканчивается маркером [DONE].

OpenAI SDK скрывает эту сложность — вы просто итерируетесь по объекту stream. Понимать формат полезно при отладке или при работе с raw HTTP.

Python: минимальный пример

Установка:

pip install openai

Код:

from openai import OpenAI

client = OpenAI(
    api_key="sk-your-ofoxai-key",
    base_url="https://api.ofox.ai/v1"
)

stream = client.chat.completions.create(
    model="anthropic/claude-opus-4.7",
    messages=[{"role": "user", "content": "Объясни, что такое RAG"}],
    stream=True
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

Два момента, которые часто упускают:

flush=True — без него Python буферизует вывод и вы увидите текст блоками, а не по токенам. if content: — последний чанк содержит finish_reason="stop" и пустой delta.content, без проверки получите None в выводе.

Модель меняется в одну строку — openai/gpt-5.4, google/gemini-3.1-pro-preview, anthropic/claude-sonnet-4.6. Полный список на ofox.ai/en/models.

Python async: для FastAPI и asyncio-приложений

Если ваш backend асинхронный, используйте AsyncOpenAI:

from openai import AsyncOpenAI
import asyncio

client = AsyncOpenAI(
    api_key="sk-your-ofoxai-key",
    base_url="https://api.ofox.ai/v1"
)

async def stream_response(prompt: str):
    stream = await client.chat.completions.create(
        model="anthropic/claude-opus-4.7",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)

asyncio.run(stream_response("Что такое vector database?"))

AsyncOpenAI использует httpx с настоящим async I/O — не просто обёртка над синхронным клиентом. При высокой нагрузке разница ощутима.

Node.js: минимальный пример

npm install openai
import OpenAI from "openai";

const client = new OpenAI({
  apiKey: "sk-your-ofoxai-key",
  baseURL: "https://api.ofox.ai/v1",
});

const stream = await client.chat.completions.create({
  model: "anthropic/claude-opus-4.7",
  messages: [{ role: "user", content: "Объясни, что такое RAG" }],
  stream: true,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) process.stdout.write(content);
}

for await...of работает с async iterables в Node.js 18+. Никаких дополнительных библиотек.

Опциональная цепочка ?. на chunk.choices[0]?.delta?.content защищает от редких случаев, когда choices пустой — например, при content filtering.

Обработка ошибок

Ошибки при streaming бывают двух типов: до начала потока и в середине.

До начала (неверный API key, модель не найдена) — SDK бросает исключение при вызове .create(). В середине потока (сетевой обрыв, rate limit 429) — исключение бросается внутри цикла. Один try/except ловит оба случая:

try:
    stream = client.chat.completions.create(
        model="anthropic/claude-opus-4.7",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)
except Exception as e:
    print(f"Ошибка: {e}")

Для production добавьте retry с экспоненциальным backoff. С tenacity:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def create_stream(prompt):
    return client.chat.completions.create(
        model="anthropic/claude-opus-4.7",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

Streaming в FastAPI

Большинство приложений проксируют streaming через свой backend. Минимальный FastAPI-эндпоинт:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/chat")
async def chat(prompt: str):
    async def generate():
        stream = await client.chat.completions.create(
            model="anthropic/claude-opus-4.7",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )
        async for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

На фронтенде читайте через fetch с ReadableStream или EventSource. CORS-заголовки нужны, если фронтенд на другом домене.

Подсчёт токенов при streaming

Неочевидный момент: при streaming usage в финальном чанке появляется только если явно запросить:

stream = client.chat.completions.create(
    model="anthropic/claude-opus-4.7",
    messages=[{"role": "user", "content": prompt}],
    stream=True,
    stream_options={"include_usage": True}
)

for chunk in stream:
    if chunk.usage:
        print(f"Токены: {chunk.usage.total_tokens}")
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

Параметр stream_options={"include_usage": True} поддерживается в OpenAI SDK 2.x и работает через OfoxAI для всех моделей.

Когда streaming не нужен

Streaming добавляет сложность. Не стоит его использовать, если ответ нужен целиком перед обработкой (парсинг JSON, structured output), для batch-задач без UI или когда ответы очень короткие — меньше 50 токенов, разница незаметна.

Для structured output (JSON mode) используйте обычный запрос без stream=True — парсить JSON из потока значительно сложнее.

Итог

Streaming — одна строка кода (stream=True), которая меняет UX. Через OfoxAI он работает одинаково для GPT-5.4, Claude Opus 4.7 и Gemini 3.1 Pro — меняете только model. Ключ на ofox.ai.