cd ../blog
PythonFastAPIRedisLLMAI/ML

LLM Inference Pipelines in Production: FastAPI, Redis Streams, and WebSocket Streaming

January 22, 20268 min read

Shipping an LLM-powered feature to production is different from running a notebook. Here's how we built a real-time inference pipeline for an automated detection system — handling job queuing, streaming responses, and failure recovery at the backend level.

The project: automate a verification workflow that was previously done manually by operators reviewing images. The client needed inference results streamed back in real time — operators couldn't stare at a loading spinner for 8 seconds while the model ran. The output needed to feel live. That meant streaming. ## The Pipeline Architecture ``` Client (WebSocket) ↓ FastAPI WS endpoint → Redis Stream (XADD job) ↓ Worker Process (model inference) ↓ Redis Stream (XADD result chunk) ↓ FastAPI consumer → WebSocket push to client ``` We used Redis Streams (not pub/sub) here intentionally. Streams give you persistent, ordered, consumer-group-based processing. If a worker crashes mid-inference, the job isn't lost — another worker picks it up via XAUTOCLAIM. ## FastAPI + Streaming Responses FastAPI's WebSocket support is genuinely good for this pattern. Here's the core of the WS endpoint: ```python @app.websocket("/ws/inference/{job_id}") async def inference_ws(websocket: WebSocket, job_id: str): await websocket.accept() # Subscribe to result stream for this job last_id = "0" while True: results = await redis.xread( {f"results:{job_id}": last_id}, block=200, count=10 ) if results: for _, messages in results: for msg_id, data in messages: last_id = msg_id await websocket.send_json(data) if data.get("done"): await websocket.close() return ``` The worker runs the model inference in a separate process (we used Ray for worker management), writing result chunks to the stream as they're generated. The WS endpoint just polls and pushes. ## Handling the Hard Parts **Backpressure.** LLM inference is slow relative to WebSocket throughput. We added a simple token bucket on the worker side to prevent Redis stream buildup when a batch of jobs arrived simultaneously. **Job timeouts.** Every job gets a 30-second TTL in a Redis sorted set. A background task sweeps expired jobs and sends a `{done: true, error: "timeout"}` event so the client isn't left hanging. **Model cold starts.** Running on AWS ECS, the first inference after a scale-up could take 4–6 seconds for model loading. We pre-warmed the container by sending a dummy inference on startup. Not elegant, but it worked. ## What 70% Accuracy Improvement Actually Means The baseline was manual operator review — subjectively accurate but slow and inconsistent across operators. The LLM-assisted workflow surfaced the most likely classification first, with a confidence score, and operators confirmed or overrode. The 70% improvement is against a benchmark set the client defined based on their historical error rate. The key insight: you don't need the model to be right 100% of the time if the workflow is designed around human confirmation. The model's job is to reduce cognitive load, not replace judgment. ## FastAPI vs. Flask for This Use Case We chose FastAPI specifically for `async` support throughout. Flask's sync model would have required a thread per WebSocket connection, which doesn't scale. FastAPI's async WebSocket handling means a single event loop manages hundreds of concurrent connections efficiently. The Pydantic validation on request/response bodies was also genuinely useful — it caught a surprising number of integration bugs early.

$ ls ../