DEV-AI

ChatGPT affiche les tokens un par un. Votre API FastAPI renvoie tout d'un coup après 8 secondes d'attente. Ce n'est pas une limitation du LLM — c'est un problème d'architecture. Ce guide vous explique exactement comment implémenter le streaming en production avec FastAPI.

FastAPI streaming LLM Python 2026 : SSE, WebSocket et StreamingResponse

À qui s'adresse cet article ?

TL;DR

20 avril 2026 · ~35 min de lecture · FastAPI 0.115 · Python 3.11+
<500ms
TTFT cible
3
backends couverts
SSE+WS
Protocoles
10 lignes
SSE minimal

1. Pourquoi le streaming change tout pour les LLM

Un LLM comme GPT-4 ou Llama 3 génère du texte token par token, de gauche à droite — c'est une propriété fondamentale de l'architecture Transformer autorégressif. Générer 200 tokens à 30 tok/s prend 6,7 secondes. Sans streaming, l'utilisateur attend 6,7 secondes face à un écran vide, puis reçoit la réponse complète d'un coup. C'est l'expérience d'une API Flask classique.

Avec le streaming, l'utilisateur voit le premier token en 100–300 ms (le TTFT, Time To First Token), et lit la réponse au fur et à mesure de sa génération. L'expérience perçue est radicalement différente — même si le temps total reste identique. C'est pourquoi ChatGPT, Claude, Gemini et toutes les interfaces LLM sérieuses utilisent le streaming.

1.1 La physique du streaming LLM

Deux phases se succèdent dans chaque génération LLM :

Phase 1 : Prefill (prompt processing)

  • Encodage de tous les tokens du prompt en parallèle
  • Remplissage du KV Cache
  • Durée : proportionnelle à la longueur du prompt
  • Exemple : prompt de 1 000 tokens → 200–500 ms prefill
  • C'est ici que se joue le TTFT

Phase 2 : Decode (génération token par token)

  • Génération d'un token à la fois (autorégression)
  • Chaque token est conditionné sur tous les précédents
  • Débit : 20–80 tok/s selon GPU/quantization
  • 200 tokens @ 30 tok/s → 6,7 secondes
  • C'est ici que le streaming a le plus d'impact UX

Sans streaming, le serveur attend la fin de la phase decode avant d'envoyer quoi que ce soit. Avec streaming, chaque token généré est poussé immédiatement vers le client — dès la fin du prefill. L'implémentation correcte de ce pipeline fait toute la différence entre une API "lente" et une API "réactive".

2. SSE vs WebSocket vs Long Polling — lequel choisir ?

Trois mécanismes HTTP permettent de pousser des données en continu du serveur vers le client. Chacun a ses cas d'usage.

Critère SSE
text/event-stream
WebSocket
ws:// / wss://
Long Polling
HTTP classique
Direction Serveur → Client Bidirectionnel Serveur → Client
Protocole HTTP/1.1, HTTP/2 Upgrade WebSocket HTTP classique
Reconnexion auto ✓ Native (EventSource) ✗ Manuelle ✗ Manuelle
Overhead par message Faible Très faible (frames) Élevé (new request)
Interruption mid-stream ✗ Fermer connexion ✓ Message "stop" ✗ Abandon requête
Support NGINX ⚠ proxy_buffering off ⚠ proxy_read_timeout ✓ Standard
Idéal pour LLM Chat simple, API publique Chat interactif, stop Non recommandé

Règle pratique : utilisez SSE pour 90 % des cas — chat IA classique, résumé, génération de texte. Utilisez WebSocket uniquement si l'utilisateur doit pouvoir interrompre la génération ou envoyer des messages pendant que le modèle répond (ex : système de conversation voix/texte temps réel).

3. FastAPI StreamingResponse — le mécanisme fondamental

StreamingResponse est la classe de base pour tout streaming dans FastAPI. Elle prend en entrée un générateur async (ou synchrone) et envoie chaque chunk produit immédiatement au client, sans attendre la fin du générateur.

3.1 StreamingResponse minimal — texte brut

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def token_generator(text: str):
    # Simule la génération token par token
    for word in text.split():
        yield word + " "
        await asyncio.sleep(0.05)  # simule latence decode

@app.get("/stream")
async def stream_text():
    return StreamingResponse(
        token_generator("Bonjour ceci est un test de streaming avec FastAPI"),
        media_type="text/plain"
    )

3.2 Format SSE (Server-Sent Events) — le standard pour les LLM

Les APIs LLM (OpenAI, Anthropic, Mistral) utilisent toutes le format SSE. Ce n'est pas arbitraire : SSE est un standard W3C (EventSource API) qui définit un protocole précis au-dessus de HTTP. Le navigateur gère nativement la reconnexion et le parsing. Chaque message SSE suit le format :

# Format SSE — chaque message se termine par \n\n
data: {"token": "Bonjour"}\n\n
data: {"token": " monde"}\n\n
data: [DONE]\n\n   # signal de fin (convention OpenAI)
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

class ChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"

async def sse_generator(token_source):
    # Wrapper générique : transforme n'importe quel générateur en SSE
    async for token in token_source:
        if token:
            payload = json.dumps({"token": token}, ensure_ascii=False)
            yield f"data: {payload}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(body: ChatRequest):
    return StreamingResponse(
        sse_generator(my_llm_stream(body)),  # my_llm_stream = générateur async
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # désactive le buffering NGINX
            "Connection": "keep-alive",
        }
    )

Le header X-Accel-Buffering: no est critique pour NGINX — sans lui, NGINX bufférise toute la réponse avant de l'envoyer au client, annulant entièrement l'effet du streaming. Nous y reviendrons en section 10.

3.3 Client JavaScript — EventSource API

// Côté client — EventSource pour SSE GET
const source = new EventSource('/chat/stream?prompt=...');
source.onmessage = (e) => {
  if (e.data === '[DONE]') { source.close(); return; }
  const { token } = JSON.parse(e.data);
  document.getElementById('output').textContent += token;
};

// Pour un endpoint POST (SSE via fetch — plus flexible)
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ messages: [{role: 'user', content: 'Bonjour'}] })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const text = decoder.decode(value);
  // parser les lignes "data: {...}\n\n"
  text.split('\n').filter(l => l.startsWith('data: ')).forEach(line => {
    const data = line.slice(6);
    if (data !== '[DONE]') output.textContent += JSON.parse(data).token;
  });
}

StreamingResponse vs SSE structuré : StreamingResponse est la base générique — elle streame n'importe quel générateur. FastAPI documente aujourd'hui le support SSE et propose une voie dédiée pour structurer des événements SSE de manière plus sémantique ; StreamingResponse reste la base générique, toujours pertinente pour comprendre les mécanismes sous-jacents et pour les cas simples.

À retenir

4. Streaming OpenAI avec FastAPI

Le SDK OpenAI Python (v1.x) supporte nativement le streaming async. Les exemples ci-dessous utilisent l'API Chat Completions pour sa compatibilité large et sa simplicité. Pour un nouveau projet, OpenAI recommande désormais la Responses API pour le streaming — nous y reviendrons en section 4.2.

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from pydantic import BaseModel
import json, os

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    yield
    await app.state.openai.close()

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1024

async def openai_stream_generator(client: AsyncOpenAI, body: ChatRequest):
    try:
        stream = await client.chat.completions.create(
            model=body.model,
            messages=body.messages,
            temperature=body.temperature,
            max_tokens=body.max_tokens,
            stream=True,
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content is not None:
                payload = json.dumps(
                    {"token": delta.content, "model": chunk.model},
                    ensure_ascii=False
                )
                yield f"data: {payload}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as e:
        error_payload = json.dumps({"error": str(e)})
        yield f"data: {error_payload}\n\n"
        yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(body: ChatRequest, request: Request):
    return StreamingResponse(
        openai_stream_generator(request.app.state.openai, body),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
            "Connection": "keep-alive",
        }
    )

Point clé : le client AsyncOpenAI est créé une seule fois dans le lifespan et partagé via app.state. Créer un nouveau client à chaque requête ouvre de nouvelles connexions TCP et épuise le pool de connexions en quelques centaines de requêtes/s.

4.1 Streaming avec comptage de tokens (usage stats)

L'API OpenAI n'envoie les statistiques d'usage (prompt_tokens, completion_tokens) que dans le dernier chunk quand stream_options={"include_usage": True} est activé. Voici comment les capturer sans bloquer le stream :

async def openai_stream_with_usage(client: AsyncOpenAI, body: ChatRequest):
    token_count = 0
    stream = await client.chat.completions.create(
        model=body.model,
        messages=body.messages,
        stream=True,
        stream_options={"include_usage": True},  # usage dans dernier chunk
    )
    async for chunk in stream:
        # Dernier chunk : usage seulement, pas de content
        if chunk.usage:
            meta = {"usage": {
                "prompt_tokens": chunk.usage.prompt_tokens,
                "completion_tokens": chunk.usage.completion_tokens,
            }}
            yield f"data: {json.dumps(meta)}\n\n"
            continue
        delta = chunk.choices[0].delta
        if delta.content:
            token_count += 1
            yield f"data: {json.dumps({'token': delta.content})}\n\n"
    yield "data: [DONE]\n\n"

OpenAI Responses API (nouveau, 2025) : les exemples ci-dessus utilisent l'API Chat Completions, qui reste pleinement supportée. Pour un nouveau projet OpenAI, la Responses API (client.responses.create()) est désormais recommandée par OpenAI — elle unifie streaming, tool use et gestion d'état de manière plus propre. La migration est simple : le format SSE côté FastAPI reste identique, seul le SDK d'appel change. Consultez la doc Responses API pour les détails.

5. Streaming Ollama en local avec FastAPI

Ollama expose deux interfaces pour le streaming : la bibliothèque Python (pip install ollama) et l'API HTTP REST (POST /api/chat). Les deux permettent le streaming async — choisissez en fonction de votre infrastructure.

5.1 Via la bibliothèque Python ollama (recommandé)

import ollama
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json

async def ollama_stream_generator(model: str, messages: list):
    client = ollama.AsyncClient()  # host="http://localhost:11434" par défaut
    try:
        async for chunk in await client.chat(
            model=model,
            messages=messages,
            stream=True,
        ):
            token = chunk["message"]["content"]
            if token:
                payload = json.dumps({"token": token}, ensure_ascii=False)
                yield f"data: {payload}\n\n"
            # Dernier chunk : done=True, stats de génération
            if chunk.get("done"):
                stats = {
                    "eval_count": chunk.get("eval_count"),  # tokens générés
                    "eval_duration": chunk.get("eval_duration"),  # ns
                    "prompt_eval_count": chunk.get("prompt_eval_count"),
                }
                yield f"data: {json.dumps({'stats': stats})}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"
        yield "data: [DONE]\n\n"

@app.post("/ollama/stream")
async def ollama_chat_stream(body: ChatRequest):
    return StreamingResponse(
        ollama_stream_generator(body.model, body.messages),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
    )

5.2 Via l'API HTTP Ollama avec httpx (alternative flexible)

Utiliser httpx.AsyncClient directement donne plus de contrôle sur les timeouts, la gestion des connexions, et permet d'utiliser Ollama hébergé sur un serveur distant.

import httpx, json

OLLAMA_URL = "http://localhost:11434/api/chat"

async def ollama_httpx_generator(model: str, messages: list):
    payload = {"model": model, "messages": messages, "stream": True}
    timeout = httpx.Timeout(connect=5.0, read=120.0, write=10.0)

    async with httpx.AsyncClient(timeout=timeout) as client:
        async with client.stream("POST", OLLAMA_URL, json=payload) as resp:
            async for line in resp.aiter_lines():
                if not line:
                    continue
                chunk = json.loads(line)
                token = chunk.get("message", {}).get("content", "")
                if token:
                    yield f"data: {json.dumps({'token': token})}\n\n"
                if chunk.get("done"):
                    break
    yield "data: [DONE]\n\n"

Timeout httpx : le paramètre read=120.0 définit le délai maximal entre deux chunks. Pour un modèle lent sur CPU qui génère 2 tok/s, 120s est raisonnable. Trop court = timeout intempestif mid-stream. Trop long = ressources bloquées pour des clients déconnectés.

À retenir — Ollama

6. Streaming Hugging Face — TextIteratorStreamer

Hugging Face Transformers génère du texte de manière synchrone bloquante via model.generate(). Pour streamer les tokens, il faut utiliser TextIteratorStreamer qui intercepte les tokens au fur et à mesure de leur génération et les expose via un itérateur — dans un thread séparé pour ne pas bloquer la boucle asyncio.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
import torch, json, asyncio
from threading import Thread

@asynccontextmanager
async def lifespan(app: FastAPI):
    model_id = "Qwen/Qwen2.5-1.5B-Instruct"  # ~3 Go VRAM
    app.state.tokenizer = AutoTokenizer.from_pretrained(model_id)
    app.state.model = AutoModelForCausalLM.from_pretrained(
        model_id,
        torch_dtype=torch.float16,
        device_map="auto",  # GPU si disponible, sinon CPU
    )
    app.state.model.eval()
    yield

app = FastAPI(lifespan=lifespan)

async def hf_stream_generator(tokenizer, model, prompt: str, max_new_tokens=512):
    # Formater le prompt avec le template de chat du modèle
    messages = [{"role": "user", "content": prompt}]
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer([text], return_tensors="pt").to(model.device)

    streamer = TextIteratorStreamer(
        tokenizer,
        skip_prompt=True,      # n'émet pas les tokens du prompt
        skip_special_tokens=True  # pas de <|im_end|> etc.
    )

    generate_kwargs = {
        **inputs,
        "streamer": streamer,
        "max_new_tokens": max_new_tokens,
        "do_sample": True,
        "temperature": 0.7,
        "top_p": 0.9,
    }

    # model.generate() bloque — l'exécuter dans un thread séparé
    thread = Thread(target=model.generate, kwargs=generate_kwargs, daemon=True)
    thread.start()

    loop = asyncio.get_event_loop()
    for token_text in streamer:
        if token_text:
            payload = json.dumps({"token": token_text}, ensure_ascii=False)
            yield f"data: {payload}\n\n"
        # yield du contrôle à la boucle asyncio entre chaque token
        await asyncio.sleep(0)
    yield "data: [DONE]\n\n"

@app.post("/hf/stream")
async def hf_stream(prompt: str, request: Request):
    return StreamingResponse(
        hf_stream_generator(
            request.app.state.tokenizer,
            request.app.state.model,
            prompt
        ),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
    )

Pattern thread ↔ asyncio : dans ce contexte, await asyncio.sleep(0) est un checkpoint utile entre deux itérations de la boucle synchrone — il rend la main à la boucle asyncio pour qu'elle puisse traiter d'autres coroutines en attente. Ce n'est pas une règle universelle, mais une bonne pratique dans ce pattern spécifique où une boucle synchrone tourne dans une coroutine.

7. WebSocket bidirectionnel — interruption mid-stream

SSE ne permet pas à l'utilisateur d'envoyer un signal "stop" sans fermer la connexion. WebSocket résout ce problème : le client peut envoyer {"action": "stop"} pendant que le serveur génère, et le serveur peut interrompre proprement la génération.

from fastapi import WebSocket, WebSocketDisconnect
import asyncio, json

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    stop_event = asyncio.Event()

    async def listen_for_stop():
        # Écoute les messages du client en parallèle
        try:
            while True:
                msg = await websocket.receive_text()
                data = json.loads(msg)
                if data.get("action") == "stop":
                    stop_event.set()
        except WebSocketDisconnect:
            stop_event.set()

    try:
        while True:
            msg = await websocket.receive_text()
            request_data = json.loads(msg)

            if request_data.get("action") != "chat":
                continue

            stop_event.clear()
            listener_task = asyncio.create_task(listen_for_stop())

            # Génération avec vérification du stop à chaque token
            async for chunk in await ollama.AsyncClient().chat(
                model=request_data["model"],
                messages=request_data["messages"],
                stream=True,
            ):
                if stop_event.is_set():
                    await websocket.send_json({"type": "stopped"})
                    break
                token = chunk["message"]["content"]
                if token:
                    await websocket.send_json({"type": "token", "data": token})

            listener_task.cancel()
            await websocket.send_json({"type": "done"})

    except WebSocketDisconnect:
        pass

8. Gestion des erreurs, timeouts et déconnexions client

Le streaming introduit des cas d'erreur spécifiques absents des requêtes classiques : le client peut se déconnecter en cours de génération, le LLM peut dépasser son timeout, ou le générateur peut lever une exception au milieu d'un stream déjà commencé.

8.1 Détecter la déconnexion client

from fastapi import Request
import asyncio

async def stream_with_disconnect_check(request: Request, llm_generator):
    async for chunk in llm_generator:
        # Vérifier la déconnexion à chaque token
        if await request.is_disconnected():
            # Libérer les ressources GPU/mémoire proprement
            break
        yield chunk

@app.post("/chat/stream")
async def chat_stream(body: ChatRequest, request: Request):
    generator = openai_stream_generator(request.app.state.openai, body)
    return StreamingResponse(
        stream_with_disconnect_check(request, generator),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"},
    )

8.2 Timeout global sur un stream

async def stream_with_timeout(generator, timeout_seconds: float = 60.0):
    try:
        async with asyncio.timeout(timeout_seconds):  # Python 3.11+
            async for chunk in generator:
                yield chunk
    except asyncio.TimeoutError:
        error = json.dumps({"error": "timeout: génération trop longue"})
        yield f"data: {error}\n\n"
        yield "data: [DONE]\n\n"

9. Métriques — TTFT, TPS et Prometheus

Deux métriques sont fondamentales pour surveiller un endpoint de streaming LLM en production : le TTFT (Time to First Token) qui mesure la latence perçue, et le TPS (Tokens Per Second) qui mesure le débit de génération.

import time
from prometheus_client import Histogram, Counter, make_asgi_app

# Métriques Prometheus
TTFT_HISTOGRAM = Histogram(
    "llm_time_to_first_token_seconds",
    "Délai jusqu'au premier token streamé",
    buckets=[.05, .1, .25, .5, 1, 2, 5],
    labelnames=["model"],
)
TPS_HISTOGRAM = Histogram(
    "llm_tokens_per_second",
    "Débit de génération en tokens/s",
    buckets=[5, 10, 20, 40, 60, 80, 120],
    labelnames=["model"],
)
STREAM_ERRORS = Counter(
    "llm_stream_errors_total",
    "Nombre d'erreurs de streaming",
    labelnames=["model", "error_type"],
)

async def instrumented_stream(generator, model: str):
    start = time.perf_counter()
    first_token_time = None
    token_count = 0

    try:
        async for chunk in generator:
            if first_token_time is None:
                first_token_time = time.perf_counter() - start
                TTFT_HISTOGRAM.labels(model=model).observe(first_token_time)
            token_count += 1
            yield chunk
    except Exception as e:
        STREAM_ERRORS.labels(model=model, error_type=type(e).__name__).inc()
        raise
    finally:
        total = time.perf_counter() - start
        if token_count > 0 and total > 0:
            tps = token_count / total
            TPS_HISTOGRAM.labels(model=model).observe(tps)

# Exposer /metrics pour Prometheus
app.mount("/metrics", make_asgi_app())

À retenir — Métriques

10. HTTP chunked encoding & asyncio internals — ce qui se passe vraiment

Pour comprendre pourquoi le streaming fonctionne (ou échoue) en production, il faut descendre au niveau du protocole HTTP et de la boucle asyncio. Ces deux couches interagissent de manière non évidente, et la plupart des bugs de streaming viennent d'une incompréhension de l'une d'elles.

10.1 HTTP/1.1 Transfer-Encoding: chunked — le protocole wire

Quand FastAPI retourne une StreamingResponse, Uvicorn encode la réponse en chunked transfer encoding (HTTP/1.1 RFC 7230, §4.1). Ce mécanisme permet d'envoyer une réponse dont la taille n'est pas connue à l'avance. Chaque chunk est précédé de sa taille en hexadécimal :

# Ce que Uvicorn envoie sur le socket TCP pour chaque token SSE
HTTP/1.1 200 OK\r\n
Content-Type: text/event-stream\r\n
Transfer-Encoding: chunked\r\n
X-Accel-Buffering: no\r\n
\r\n
1e\r\n                          # taille du chunk en hex (30 octets)
data: {"token": "Bonjour"}\n\n\r\n  # chunk data
1c\r\n                          # chunk suivant
data: {"token": " monde"}\n\n\r\n
e\r\n                           # signal de fin
data: [DONE]\n\n\r\n
0\r\n                           # chunk terminal = fin du stream
\r\n

En pratique, chaque yield dans votre générateur async devient un chunk de réponse côté HTTP/1.1 avec Transfer-Encoding: chunked. Uvicorn appelle send() sur le socket après le yield — sans buffering supplémentaire côté Python. C'est pourquoi le streaming fonctionne naturellement sans proxy : le noyau OS transmet chaque chunk dès qu'il arrive dans le buffer d'envoi (sous réserve des buffers OS et des proxies intermédiaires — cf. section NGINX).

Sous HTTP/2 (activable avec uvicorn --http h2), le chunked encoding disparaît : chaque chunk devient une DATA frame dans le stream HTTP/2. L'effet est identique pour le streaming, mais HTTP/2 permet la multiplexion — plusieurs streams LLM simultanés sur une seule connexion TCP, sans head-of-line blocking.

10.2 La boucle asyncio et les générateurs async — pourquoi await / yield redonnent la main à l'event loop

La boucle d'événements asyncio est single-threaded et coopérative : une coroutine s'exécute jusqu'au prochain await ou yield, puis rend la main à la boucle pour qu'elle exécute d'autres coroutines. C'est ce qui permet à FastAPI de servir des centaines de connexions SSE simultanées avec un seul thread.

# Séquence d'exécution asyncio lors d'un stream
#
# Event loop tick 1 :
async def mon_generateur():
    chunk = await llm_client.get_next_token()  # ← suspend, rend la main à l'event loop
    yield chunk                                   # ← Uvicorn envoie le chunk, suspend à nouveau
#
# Pendant que le LLM génère le token suivant (IO await),
# l'event loop peut traiter d'autres requêtes.
#
# PROBLÈME avec HuggingFace CPU-bound :
for token in text_iterator_streamer:  # ← boucle synchrone ! bloque l'event loop
    yield format_sse(token)
    # Aucun await → l'event loop est bloquée pendant toute la durée d'un token

# SOLUTION :
for token in text_iterator_streamer:
    yield format_sse(token)
    await asyncio.sleep(0)  # ← cède le contrôle, permet à l'event loop de respirer

Le await asyncio.sleep(0) est un checkpoint asyncio — il ne dort pas réellement (délai = 0), mais il suspend la coroutine et met la tâche en fin de file, permettant à la boucle d'événements de traiter les autres coroutines en attente. Sans ce checkpoint, une génération HuggingFace de 200 tokens bloque FastAPI pendant toute la durée — aucune autre requête ne peut être servie.

10.3 Back-pressure — que se passe-t-il quand le client est lent ?

La back-pressure est le mécanisme par lequel un consommateur lent signale au producteur de ralentir. Dans un stream LLM, le "consommateur" est le client HTTP — si sa connexion réseau est lente (3G, latence élevée), son buffer TCP se remplit et il signale au noyau OS de ne plus envoyer.

Couche Mécanisme de back-pressure Comportement FastAPI
TCP Fenêtre de congestion TCP (rwnd) send() bloque quand le buffer OS est plein
asyncio Transport write buffer (64 Ko par défaut) drain() bloque la coroutine si buffer plein
Uvicorn/ASGI await sur chaque send de chunk Le générateur est suspendu si le client ne consomme pas
LLM Pas de back-pressure native Le modèle continue de générer même si client déconnecté

La conséquence pratique : si un client 3G reçoit des tokens à 5 tok/s alors que le LLM en génère 40 tok/s, les chunks s'accumulent dans les buffers TCP. Le générateur asyncio est suspendu au niveau de l'await Uvicorn — le LLM continue quand même de générer dans son propre thread/processus. Pour économiser les ressources GPU, il faut vérifier la déconnexion (section 8.1) et annuler la génération proprement.

11. vLLM et serving haute performance — PagedAttention et continuous batching

Pour des APIs LLM qui doivent servir des centaines de requêtes simultanées, les solutions basées sur HuggingFace model.generate() ou Ollama montrent leurs limites. vLLM (UC Berkeley, 2023) introduit deux innovations fondamentales qui changent la physique du serving LLM : PagedAttention et le continuous batching.

11.1 PagedAttention — fragmenter le KV Cache comme une mémoire virtuelle

Le KV Cache (Key-Value Cache) stocke les vecteurs K et V de l'attention pour chaque token déjà traité. Sans optimisation, chaque requête alloue un bloc contigu de VRAM proportionnel à la longueur maximale du contexte — même si la réponse finale est courte. Pour Llama 3.1 8B avec un contexte de 8K tokens, cela représente ~2 Go par requête. Avec 10 requêtes simultanées : 20 Go rien que pour le KV Cache.

PagedAttention fragmente le KV Cache en blocs de taille fixe (typiquement 16 tokens), alloués dynamiquement — exactement comme la mémoire virtuelle d'un OS pagine la RAM physique. Un bloc n'est alloué que quand les tokens correspondants sont réellement générés. La fragmentation interne est réduite à 4 % au lieu de 60–80 % avec l'allocation contigue.

11.2 Continuous batching — éliminer l'inefficacité du batch statique

Le batch statique classique (HuggingFace) traite N requêtes simultanément, mais doit attendre que la requête la plus longue soit terminée avant d'accepter de nouvelles requêtes dans le batch. Si une requête génère 10 tokens et une autre 500 tokens, le batch de 10 tokens "bloque" les slots GPU pendant 490 tokens inutiles.

Le continuous batching (vLLM, TGI) insère de nouvelles requêtes dans le batch en cours de génération, dès qu'un slot se libère. Résultat mesuré par l'équipe Berkeley : 23× plus de débit qu'HuggingFace sur Llama 13B pour 100 requêtes simultanées.

11.3 Intégrer vLLM avec FastAPI — API OpenAI-compatible

vLLM expose nativement une API compatible OpenAI — ce qui signifie que votre code FastAPI qui utilise déjà AsyncOpenAI fonctionne sans modification :

# Lancer vLLM (interface moderne recommandée)
pip install vllm
vllm serve meta-llama/Llama-3.1-8B-Instruct \
    --tensor-parallel-size 2 \     # répartir sur 2 GPUs
    --max-model-len 8192 \
    --port 8001
# Dans FastAPI — pointer vers vLLM au lieu d'OpenAI
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.openai = AsyncOpenAI(
        api_key="token-vllm",        # vLLM accepte n'importe quel token
        base_url="http://localhost:8001/v1",  # ← vLLM local
    )
    yield

# Le reste du code de streaming est identique à la section 4
# vLLM gère le continuous batching, PagedAttention, et le streaming en interne

11.4 Benchmarks comparatifs — HuggingFace vs Ollama vs vLLM

Backend TPS (tok/s) 1 req TPS 10 req simultanées TTFT moyen VRAM overhead
HuggingFace generate() 35–45 35–45 (séquentiel) 150–300 ms Élevé (blocs contigus)
Ollama (llama.cpp) 40–60 File d'attente 100–200 ms Faible (GGUF mmap)
vLLM (PagedAttention) 55–80 400–900 (batché) 50–150 ms Optimal (pages 16 tok)

Ordres de grandeur indicatifs — protocole : A100 80 Go, Llama 3.1 8B FP16, contexte 2K, réponse 200 tokens, charge concurrente 1/10 req. Les chiffres varient selon le GPU (RTX 4090 ≈ 60–70% d'un A100), la quantization (Q4 ≈ 2× plus lent en TPS que FP16), le batch size réel et la longueur du prompt. Ces chiffres illustrent surtout les tendances relatives entre backends, pas une promesse universelle. Référence : Kwon et al., Efficient Memory Management for Large Language Model Serving with PagedAttention, SOSP 2023.

Règle de décision : Ollama pour le développement local et les projets sans contrainte de throughput. vLLM ou TGI (Text Generation Inference de HuggingFace) dès que vous avez >5 utilisateurs simultanés ou des SLOs de latence stricts (<200 ms TTFT). Le coût de migration est faible car les deux exposent l'API OpenAI-compatible.

12. SSE resumable — reprise après déconnexion réseau

Pattern avancé — deux cas distincts : La spec W3C EventSource définit Last-Event-ID pour la reconnexion automatique via l'objet EventSource du navigateur — le navigateur renvoie automatiquement l'ID du dernier événement reçu. En revanche, un streaming via fetch() POST (section 3.3) ne bénéficie pas de cette reconnexion native — si la connexion se coupe, le fetch échoue et le client doit implémenter lui-même la logique de reprise (stocker l'ID localement, relancer le fetch). Ce pattern est utile pour les streams longs critiques, pas pour le chat classique.

Voici l'implémentation côté serveur. Le client utilisant EventSource (GET) envoie automatiquement le header Last-Event-ID ; pour un client fetch POST, il doit l'envoyer manuellement.

# Format SSE avec ID — permet la reprise
id: 42\n
data: {"token": "Bonjour"}\n\n

id: 43\n
data: {"token": " monde"}\n\n
from fastapi import Header
from typing import Optional

# Stockage en mémoire des tokens déjà générés (Redis en prod)
stream_cache: dict[str, list[str]] = {}

async def resumable_sse_generator(
    stream_id: str,
    llm_generator,
    resume_from: Optional[int] = None
):
    tokens = stream_cache.setdefault(stream_id, [])
    token_id = len(tokens)

    # Rejouer les tokens déjà générés si reprise
    if resume_from is not None:
        for i, cached_token in enumerate(tokens[resume_from:], start=resume_from):
            yield f"id: {i}\ndata: {json.dumps({'token': cached_token})}\n\n"
        token_id = len(tokens)

    # Continuer la génération
    async for chunk in llm_generator:
        tokens.append(chunk)
        yield f"id: {token_id}\ndata: {json.dumps({'token': chunk})}\n\n"
        token_id += 1
    yield "data: [DONE]\n\n"

@app.post("/chat/resumable/{stream_id}")
async def resumable_stream(
    stream_id: str,
    body: ChatRequest,
    last_event_id: Optional[str] = Header(None, alias="Last-Event-ID"),
):
    resume_from = int(last_event_id) + 1 if last_event_id else None
    generator = openai_stream_generator(app.state.openai, body)
    return StreamingResponse(
        resumable_sse_generator(stream_id, generator, resume_from),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
    )

En production : remplacez stream_cache (dict en mémoire, perdu au redémarrage) par Redis avec TTL de 10 minutes. Chaque token est stocké sous stream:{stream_id}:{token_id}. La reprise fonctionne alors sur plusieurs instances du serveur (architecture horizontalement scalable).

13. Déploiement — NGINX, Caddy et configuration proxy

Le streaming fonctionne parfaitement en développement (uvicorn main:app --reload), mais casse silencieusement derrière un proxy si la configuration est incorrecte. NGINX bufférise les réponses par défaut — les tokens s'accumulent dans un tampon de 4 Ko ou 32 Ko avant d'être envoyés d'un bloc.

13.1 Configuration NGINX pour le streaming SSE

# /etc/nginx/sites-available/dev-ai
server {
    listen 443 ssl http2;
    server_name api.dev-ai.fr;

    location /chat/stream {
        proxy_pass http://127.0.0.1:8000;
        proxy_http_version 1.1;

        # CRITIQUE : désactive le buffering NGINX pour le streaming
        proxy_buffering off;
        proxy_cache off;

        # Timeouts adaptés aux longues générations (120s pour modèles lents)
        proxy_read_timeout 120s;
        proxy_send_timeout 120s;

        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Connection "";  # keep-alive pour HTTP/1.1
    }

    # WebSocket endpoint
    location /ws/ {
        proxy_pass http://127.0.0.1:8000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 3600s;  # WebSocket : connexion longue durée
    }
}

13.2 Configuration Caddy (alternative recommandée)

# Caddyfile
api.dev-ai.fr {
    reverse_proxy /chat/stream localhost:8000 {
        flush_interval -1  # flush immédiat = pas de buffering
        transport http {
            read_timeout 120s
        }
    }

    reverse_proxy /ws/* localhost:8000 {
        transport http {
            read_timeout 1h
        }
    }

    reverse_proxy localhost:8000
}

Caddy vs NGINX pour le streaming : Caddy active automatiquement le flush immédiat sur les réponses text/event-stream sans configuration supplémentaire. Pour NGINX, le proxy_buffering off est obligatoire et facilement oublié. Si votre streaming "fonctionne en local mais pas en prod", c'est presque certainement ce paramètre NGINX.

13.3 Nombre de workers Uvicorn

Chaque connexion de streaming maintient une coroutine active pendant toute la durée de la génération. Avec FastAPI async, un seul worker Uvicorn est souvent suffisant pour OpenAI et Ollama tant que la charge reste principalement I/O-bound — les coroutines se suspendent pendant l'attente réseau, libérant le thread pour d'autres requêtes. Pour les modèles HuggingFace (CPU-bound), chaque génération monopolise le GIL — utilisez run_in_executor ou plusieurs workers.

# Pour OpenAI / Ollama (IO-bound) : 1 worker suffit
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1

# Pour HuggingFace local (CPU-bound) : 1 seul worker car le modèle est en mémoire
# Plusieurs workers = plusieurs copies du modèle en RAM
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1 --loop uvloop

Quelle stack choisir selon votre cas ?

Prototype / dev local

  • Ollama + AsyncClient
  • StreamingResponse SSE
  • Pas de proxy — Uvicorn direct
  • 1 worker, modèle en lifespan

API publique (< 50 users simultanés)

  • AsyncOpenAI ou Ollama distant
  • SSE + JWT OAuth2
  • Caddy (flush_interval -1)
  • Prometheus TTFT/TPS

Serving haute perf (> 50 users)

  • vLLM OpenAI-compatible server
  • FastAPI = proxy léger devant vLLM
  • NGINX + proxy_buffering off
  • Continuous batching, PagedAttention

Conversations interactives (stop/interrupt)

  • WebSocket bidirectionnel
  • asyncio.Event pour le stop signal
  • NGINX proxy_read_timeout 3600s
  • Vérifier is_disconnected() à chaque token

FAQ

Quelle est la différence entre SSE et WebSocket pour le streaming LLM ?

SSE (Server-Sent Events) est unidirectionnel — le serveur envoie des tokens au client via une connexion HTTP persistante. WebSocket est bidirectionnel. Pour le chat IA classique, SSE est plus simple et suffisant. Utilisez WebSocket uniquement si l'utilisateur doit pouvoir interrompre la génération en cours ou envoyer des messages pendant que le modèle répond.

FastAPI StreamingResponse fonctionne-t-il derrière NGINX ?

Oui, mais NGINX bufférise les réponses par défaut — les tokens s'accumulent en tampon et arrivent d'un bloc côté client. Il faut obligatoirement ajouter proxy_buffering off dans le bloc location. Ajoutez aussi le header X-Accel-Buffering: no dans votre réponse FastAPI comme double protection. Sans ça, le streaming ne fonctionne pas en production même si tout va bien en local.

Qu'est-ce que le TTFT et comment le réduire ?

Le TTFT (Time To First Token) mesure le délai entre l'envoi de la requête et la réception du premier token. Il dépend du prefill (encodage du prompt) et de la file d'attente du modèle. Pour le réduire : raccourcir le prompt système, utiliser un modèle plus petit, activer Flash Attention 2 (Ollama : OLLAMA_FLASH_ATTENTION=1), et éviter le queuing (un seul modèle par GPU). Cible : < 500 ms pour une expérience fluide.

Comment gérer les erreurs dans un StreamingResponse FastAPI ?

Les exceptions levées après l'envoi du premier chunk ne peuvent pas être transformées en réponse HTTP d'erreur (le status 200 est déjà envoyé). La bonne pratique : envelopper le générateur dans un try/except et envoyer un chunk SSE d'erreur (data: {"error": "..."}\n\n) suivi de data: [DONE]\n\n. Le client doit vérifier la présence du champ error dans chaque chunk reçu.

Peut-on utiliser StreamingResponse avec plusieurs LLM en parallèle ?

Oui. FastAPI async gère naturellement des centaines de connexions SSE simultanées (chaque connection est une coroutine légère). Pour OpenAI ou Ollama (IO-bound), un seul worker Uvicorn suffit. Pour des modèles HuggingFace locaux (CPU/GPU-bound), les requêtes se mettent en file d'attente car le GPU ne peut exécuter qu'un batch à la fois — considérez vLLM ou Text Generation Inference pour le serving haute performance.

Articles liés

Consulting DEV-AI

Vous déployez un LLM en production ?

Architecture streaming, optimisation TTFT, sécurisation JWT, monitoring Prometheus — accompagnement expert pour vos projets IA.

Contacter l'équipe →

Newsletter IA

Restez à jour sur l'IA & le Machine Learning

Actus, tutos, outils — chaque semaine en français. Sans spam.