Skip to main content

Featured

FastAPI Observability :  Correlation IDs & ContextVars (2026)

FastAPI Observability : Correlation IDs & ContextVars (2026)

BACKEND SERIES Day 26: The Blind Architect — Observability Part 1 11 min read Series: Logic & Legacy Day 26 / 50 (Part 1 of 3) Level: Senior / SRE ⏳ Context: You deploy a highly concurrent FastAPI microservice. At 3:14 AM, the production alerts fire. You open your terminal, tail the logs, and see: Error 500: Database timeout while fetching user . Which user? Which request payload? You have 10,000 logs streaming per minute. Five different users hit the endpoint at the exact same millisecond. Without a way to group those logs together, you aren't debugging; you are guessing. Today, we cure your system's blindness. This is Part 1 of our Observability trilogy, where we manually wire the nervous system of our application. Debugging in Asynchronous Python Applications 1. The Correlation ID (The Needle in the Haystack) In a synchronous, single-threaded application (like old-school Django or Flask), you could almost get away with sequential logging. But ...

FastAPI WebSockets: Async Connections, Scaling, The Multi-Worker Nightmare (2026)

BACKEND SERIES

Day 23: FastAPI WebSockets & The Multi-Worker Mirage

Series: Logic & Legacy
Day 23 / 50
Level: Senior / Architect

Context: I remember the exact moment the illusion shattered. I had just built a beautiful real-time dashboard using FastAPI's built-in WebSockets. It was flawless on my machine. It was flawless in staging (running a single Uvicorn worker). Then, we deployed to production, scaled to 4 workers, and suddenly, users in the same chat room couldn't see each other's messages. Half the broadcasts vanished into the void. That's the day I learned the brutal difference between handling a single async connection and managing stateful, long-lived sockets across a distributed backend. Today, we're tearing down the "Hello World" WebSocket tutorials and building for reality.

Steampunk infographic detailing FastAPI WebSocket challenges: comparing stateful connections to switchboard operators, explaining first-message authentication with a "speakeasy" analogy, and illustrating how Redis Pub/Sub synchronizes state across multiple worker nodes to prevent data loss.






1. The "Hello World" Trap

FastAPI’s WebSocket support is inherited directly from Starlette. It’s elegant. It uses standard async/await syntax. It feels just like writing a normal HTTP endpoint. But this simplicity is incredibly dangerous because it masks the fundamental difference between HTTP and WebSockets. HTTP is stateless: request in, response out, forget the client. WebSockets are stateful: you are holding an open TCP connection in memory. If you don't actively manage the lifecycle of that connection, it will leak memory, block your event loop, and crash your server silently.

Most tutorials show you a basic echo server: accept connection, receive text, send text. They almost always omit the crucial exception handling required to detect when a client ungracefully drops the connection (like closing a laptop lid or walking into a subway tunnel).

The Minimum Viable (Production) Connection
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import logging

logger = logging.getLogger(__name__)
app = FastAPI()

# NEVER skip the try/except block. A dropped connection WILL crash the route.
@app.websocket("/ws/echo")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    client_id = f"{websocket.client.host}:{websocket.client.port}"
    logger.info(f"Client {client_id} connected.")
    
    try:
        while True:
            # This awaits indefinitely until a message arrives
            data = await websocket.receive_text()
            await websocket.send_text(f"Server Echo: {data}")
            
    except WebSocketDisconnect as e:
        # This is expected behavior when a client leaves. Handle it cleanly.
        logger.info(f"Client {client_id} disconnected gracefully. Code: {e.code}")
    except Exception as e:
        # Catch everything else to prevent the worker thread from dying
        logger.error(f"Unexpected error with client {client_id}: {e}")
    finally:
        # Ensure cleanup happens even if the loop breaks unexpectedly
        logger.debug(f"Cleanup complete for {client_id}.")

2. The Authentication Conundrum

This is where seasoned backend developers usually hit their first major roadblock with WebSockets. You are accustomed to securing HTTP endpoints using an `Authorization: Bearer ` header. You intuitively try to apply the same pattern to your new WebSocket endpoint.

It will fail. The browser's native WebSocket API explicitly prohibits setting custom headers during the initial connection handshake. You cannot pass a Bearer token in the header of a browser-initiated WebSocket request. You are forced into one of three compromises, and choosing the wrong one compromises your entire security model.

The safest approach for single-page applications is utilizing HttpOnly cookies. Since the WebSocket handshake begins as a standard HTTP Upgrade request, the browser will automatically include cookies scoped to the domain. However, if you are building a public API or a mobile client where cookies aren't feasible, the "First-Message Auth" pattern is the most robust architecture.

First-Message Authentication Pattern
import asyncio
from fastapi import status

async def verify_token(token: str) -> bool:
    # Implementation details...
    return token == "valid-secret-token"

@app.websocket("/ws/secure")
async def secure_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        # CRITICAL: Do not wait forever. If they don't auth fast, kill it.
        auth_msg = await asyncio.wait_for(
            websocket.receive_json(), 
            timeout=5.0
        )
        
        token = auth_msg.get("token")
        if not token or not await verify_token(token):
            # Custom 4000+ close codes signify application-level errors
            await websocket.close(code=4001, reason="Unauthorized: Invalid Token")
            return
            
    except asyncio.TimeoutError:
        # They connected but didn't send the password fast enough
        await websocket.close(code=4002, reason="Auth Timeout")
        return
    except Exception:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    # If we reach here, the connection is authenticated.
    # We can now enter the main message loop.
    await websocket.send_json({"status": "authenticated"})
    
    try:
        while True:
            data = await websocket.receive_text()
            # Process secure messages...
    except WebSocketDisconnect:
        pass

3. The Multi-Worker State Nightmare

If you take away one thing from this post, make it this: In-memory connection managers do not scale. Every tutorial online shows you a `class ConnectionManager:` holding a list of active WebSockets. This works flawlessly when you run `uvicorn main:app`.

But in production, you aren't running one worker. You are running Gunicorn managing four Uvicorn worker processes (`gunicorn -k uvicorn.workers.UvicornWorker -w 4`). Or you're running multiple pods in Kubernetes behind an Ingress controller. Processes do not share memory. If User A connects to Worker 1, and User B connects to Worker 3, Worker 1's ConnectionManager has absolutely no idea User B exists. When User A tries to send a message to User B, it fails silently.

Scaling with Redis Pub/Sub
import redis.asyncio as redis
import json
import asyncio
from typing import Dict

class RedisPubSubManager:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        # Local state for THIS specific worker process only
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self.active_connections[user_id] = websocket
        # Worker subscribes to a global channel upon first connection
        await self.pubsub.subscribe("global_chat")

    def disconnect(self, user_id: str):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    async def publish_message(self, message: dict):
        # PUSH message to Redis. We don't send to local clients directly here.
        await self.redis.publish("global_chat", json.dumps(message))

    async def listen_to_redis(self):
        # Background task that listens to Redis and broadcasts to LOCAL clients
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                payload = json.loads(message["data"].decode())
                
                # Broadcast to all connections managed by THIS worker
                dead_connections = []
                for uid, conn in self.active_connections.items():
                    try:
                        await conn.send_json(payload)
                    except Exception:
                        # Catch dead sockets during broadcast to prevent loop crashing
                        dead_connections.append(uid)
                
                # Cleanup dead connections
                for uid in dead_connections:
                    self.disconnect(uid)

manager = RedisPubSubManager()

# You MUST start the Redis listener task when the app starts
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(manager.listen_to_redis())

By implementing this pattern, every worker publishes messages to a central bus (Redis), and simultaneously, every worker listens to that bus, distributing messages to whichever clients happen to be connected to it. This allows infinite horizontal scaling across pods and nodes.

🛠️ Day 23 Project: The Distributed Dashboard

Prove you can handle state across process boundaries. Build a real-time system monitoring dashboard that works across multiple Uvicorn workers.

  • Spin up a FastAPI app and configure Uvicorn to run with `--workers 3`.
  • Create a background task (using `asyncio.create_task`) that generates a random "CPU Usage" metric every 2 seconds.
  • Connect multiple browser tabs to the WebSocket endpoint. Ensure every tab receives the exact same metrics at the exact same time, regardless of which worker process they connected to.
  • Implement Redis Pub/Sub to synchronize the metrics across the 3 workers. (If you don't use Redis, you'll see different tabs showing different data streams).
🔥 PRO UPGRADE / TEASER

Next time, we tackle the silent killer of async applications: Blocking the Event Loop. We'll look at how a single poorly placed `requests.get()` inside your WebSocket loop can crash your entire backend.

Architectural Consulting

If you are building a data-intensive AI application and require a Senior Engineer to architect your secure, high-concurrency backend, I am available for direct contracting.

Explore Enterprise Engagements →

Comments