Deep Dive - SSE (Server-Sent Events) in Strakly AI Chat
Deep Dive - SSE (Server-Sent Events) in Strakly AI Chat
Hey everyone! Today we are going to take a deep dive into SSE (Server-Sent Events) - this is how our AI chat shows text appearing word by word, just like ChatGPT!
If you ever wondered "how does that typing effect work?" - this is the episode for you!
What we will cover:
- What is SSE?
- SSE vs Normal API vs WebSockets
- SSE Data Format (the wire protocol)
- Backend: How Python sends SSE (FastAPI + AsyncGenerator)
- Frontend: How React reads SSE (fetch + ReadableStream)
- The Buffer Problem and how we solve it
- How tokens become a chat message on screen
- Complete Data Flow with real code
- Abort / Cancel a stream
- Fallback: What if SSE is not supported?
Q: What is SSE (Server-Sent Events)?
A: SSE is a way for the server to send data to the client continuously over a single HTTP connection. The server keeps the connection open and pushes data as it becomes available.
Normal API Call:
================
Client: "Give me the AI response"
Server: [processing... 5 seconds...]
Server: "Here's the COMPLETE response in one shot"
Client: Shows everything at once
Timeline:
─────────────────────────────────────────
0s 5s
|───waiting─────|── full response shown
User sees NOTHING for 5 seconds!
SSE (Server-Sent Events):
=========================
Client: "Give me the AI response"
Server: "Here's" ← 200ms
Server: " the" ← 400ms
Server: " first" ← 600ms
Server: " few" ← 800ms
Server: " words..." ← 1000ms
Server: [DONE] ← 1200ms
Timeline:
─────────────────────────────────────────
0s 0.2s 0.4s 0.6s 0.8s 1.2s
|─────|──────|──────|──────|──────|
H He Her Here Here's the first few words...
User sees text appearing in REAL TIME!
Think of it like this:
Analogy - Letter vs Phone Call: ================================ Normal API = Writing a LETTER - You write the entire letter - Put it in envelope - Send it - Receiver gets EVERYTHING at once SSE = PHONE CALL - You start talking - Receiver hears word by word - As you speak, they understand - Real-time communication!
SSE vs Normal API vs WebSockets
There are three main ways a server can send data to a client. Let's compare them!
1. NORMAL API (Request-Response): ================================== Client ──Request──→ Server Client ←─Response── Server Connection CLOSES. - Client asks, server answers, done - One request = one response - Good for: fetching data, submitting forms 2. SSE (Server-Sent Events): ============================== Client ──Request──→ Server Client ←─Event 1─── Server Client ←─Event 2─── Server Client ←─Event 3─── Server Client ←─Event N─── Server Connection CLOSES. - Client asks ONCE, server sends MANY events - ONE direction: server → client only - Client cannot send more data on same connection - Good for: streaming AI responses, live feeds, notifications 3. WEBSOCKETS: =============== Client ←──────────→ Server Client ←──────────→ Server Client ←──────────→ Server Connection stays OPEN. - BOTH directions: client ↔ server - Either side can send anytime - Persistent connection - Good for: chat apps, multiplayer games, real-time collaboration
| Feature | Normal API | SSE | WebSocket |
|---|---|---|---|
| Direction | Client → Server → Client (one time) | Server → Client (many times) | Both ways (any time) |
| Connection | Opens and closes per request | Stays open until server is done | Stays open permanently |
| Protocol | HTTP | HTTP (with text/event-stream) | ws:// (separate protocol) |
| Complexity | Simple | Medium | Complex |
| Use Case | CRUD operations | AI streaming, live feeds | Chat, gaming, real-time |
| Strakly uses | All normal API calls | AI Chat responses | Socket.io for live updates |
Q: Why SSE for AI chat and not WebSockets?
A: Because the AI response only flows one direction - server to client. The user sends ONE message, and the AI streams back a response. We don't need two-way communication during streaming. SSE is simpler and works over regular HTTP!
SSE Data Format - What goes over the wire
SSE has a very simple format. Each event is just plain text!
SSE Format Rules:
==================
1. Each event starts with "data: " (note the space after colon)
2. Each event ends with TWO newlines (\n\n)
3. Empty lines or lines starting with ":" are ignored (comments)
4. The payload after "data: " can be anything (we use JSON)
Example - Raw SSE stream:
==========================
data: {"conversation_id": "abc-123"}\n\n
data: {"token": "Hello"}\n\n
data: {"token": " there"}\n\n
data: {"token": "!"}\n\n
data: {"tools_used": ["get_clients_list"]}\n\n
data: {"action": {"type": "change_theme", "value": "dark"}}\n\n
data: {"suggested_questions": ["Show details", "List all"]}\n\n
data: [DONE]\n\n
In our app, each SSE event carries a JSON object with a specific key that tells the frontend what type of data it is:
Event Types in Our App:
========================
┌─────────────────────────────────────────────────────────┐
│ EVENT │ PURPOSE │
├──────────────────────────┼───────────────────────────────┤
│ {"conversation_id": "x"} │ Track which conversation │
│ │ Sent FIRST, before anything │
├──────────────────────────┼───────────────────────────────┤
│ {"tools_used": ["x"]} │ Bot is calling a tool │
│ │ Frontend shows "Fetching..." │
├──────────────────────────┼───────────────────────────────┤
│ {"token": "Hello"} │ One word/piece of the response │
│ │ Frontend appends to message │
├──────────────────────────┼───────────────────────────────┤
│ {"action": {...}} │ Frontend action (theme change) │
│ │ Frontend executes it │
├──────────────────────────┼───────────────────────────────┤
│ {"suggested_questions": │ Follow-up question buttons │
│ ["q1", "q2", "q3"]} │ Shown after response │
├──────────────────────────┼───────────────────────────────┤
│ [DONE] │ Stream is finished! │
│ │ Frontend calls onDone() │
└──────────────────────────┴───────────────────────────────┘
Real Example - User asks "how many clients?":
==============================================
data: {"conversation_id": "550e8400-e29b-41d4-a716-446655440000"}
data: {"tools_used": ["get_clients_stats"]}
data: {"token": "You"}
data: {"token": " have"}
data: {"token": " **"}
data: {"token": "42"}
data: {"token": "**"}
data: {"token": " active"}
data: {"token": " clients"}
data: {"token": "."}
data: {"suggested_questions": ["List all clients", "New clients this month", "Expired memberships"]}
data: [DONE]
What user sees:
"You have **42** active clients."
(appearing word by word like typing)
Backend: How Python Sends SSE
Let's see how the bot actually sends these events. There are two parts:
Part 1: FastAPI endpoint decides SSE vs JSON
File: strakly-bot/main.py
@app.post("/chat")
async def chat(request_body, raw_request, authorization):
token = authorization.removeprefix("Bearer ")
tenant = decode_token(token)
# Check what the frontend wants
accept = raw_request.headers.get("accept", "")
if "text/event-stream" in accept:
# Frontend sent: Accept: text/event-stream
# → Return SSE streaming response!
return StreamingResponse(
process_chat_stream(message, token, tenant, ...),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache", # Don't cache!
"X-Accel-Buffering": "no", # Don't buffer (for Nginx)
},
)
else:
# Frontend wants normal JSON
# → Return full response at once
result = await process_chat(message, token, tenant, ...)
return ChatResponse(...)
How the decision works:
========================
Frontend sends: Accept: text/event-stream
│
▼
FastAPI checks: "text/event-stream" in accept header?
│
├── YES → StreamingResponse(process_chat_stream())
│ Returns SSE stream (word by word)
│
└── NO → process_chat() → ChatResponse()
Returns full JSON (all at once)
Part 2: process_chat_stream - The AsyncGenerator
File: strakly-bot/agent.py
Q: What is an AsyncGenerator?
A: A function that can produce values one at a time using the yield keyword, instead of returning everything at once with return.
Normal function vs AsyncGenerator:
===================================
# Normal function - returns EVERYTHING at once
async def get_response():
result = await ai.generate("Hello")
return result # one shot, full response
# AsyncGenerator - yields ONE piece at a time
async def stream_response():
yield "Hello" # sends first piece
yield " world" # sends second piece
yield "!" # sends third piece
# Each yield sends data to client IMMEDIATELY!
Now here's the actual code for our streaming function:
async def process_chat_stream(message, token, tenant, ...):
"""This is an AsyncGenerator - it yields SSE events one by one"""
# Setup: prepare conversation, create LLM client
conversation_id, conversation, messages, llm = await _setup_chat(...)
# ┌──────────────────────────────────────────────────────┐
# │ EVENT 1: Send conversation_id immediately │
# │ Frontend needs this to track the conversation │
# └──────────────────────────────────────────────────────┘
yield f"data: {json.dumps({'conversation_id': conversation_id})}\n\n"
# Loop: AI might need to call tools, then respond
for _ in range(max_iterations):
# Stream the AI's response token by token
async for chunk in llm.astream(messages):
if chunk.content:
# ┌──────────────────────────────────────────┐
# │ EVENT 2: Send each text token │
# │ This is what creates the typing effect! │
# └──────────────────────────────────────────┘
yield f"data: {json.dumps({'token': chunk.content})}\n\n"
if chunk.tool_call_chunks:
# AI wants to call a tool (not text)
has_tool_calls = True
# If AI called tools, execute them
if full_response.tool_calls:
names, actions = await _execute_tool_calls(...)
# ┌──────────────────────────────────────────────┐
# │ EVENT 3: Tell frontend which tools were used │
# │ Frontend shows "Fetching data..." message │
# └──────────────────────────────────────────────┘
yield f"data: {json.dumps({'tools_used': names})}\n\n"
# Loop continues... AI will generate response using tool results
# ┌──────────────────────────────────────────────────────────┐
# │ EVENT 4: Send any frontend actions (like theme changes) │
# └──────────────────────────────────────────────────────────┘
for action in all_actions:
yield f"data: {json.dumps({'action': action})}\n\n"
# ┌──────────────────────────────────────────────────────────┐
# │ EVENT 5: Send suggested follow-up questions │
# └──────────────────────────────────────────────────────────┘
suggested_questions = await _generate_suggestions(message, final_text)
if suggested_questions:
yield f"data: {json.dumps({'suggested_questions': suggested_questions})}\n\n"
# ┌──────────────────────────────────────────────────────────┐
# │ EVENT 6: Signal that the stream is done │
# └──────────────────────────────────────────────────────────┘
yield "data: [DONE]\n\n"
The yield keyword is the key!
==============================
yield "data: {...}\n\n"
│
▼
FastAPI's StreamingResponse picks it up
│
▼
Sends it over HTTP to the frontend
│
▼
Frontend receives it IMMEDIATELY
│
▼
Next yield sends the next piece
│
▼
...and so on until the generator is done
Think of yield like a CONVEYOR BELT:
=====================================
┌──────────┐ yield yield yield yield
│ Agent │ ──→ 📦 ──→ 📦 ──→ 📦 ──→ 📦 ──→ Frontend
│ (Python) │ token token token [DONE]
└──────────┘
Each package (📦) goes to the frontend as soon as it's ready.
No waiting for ALL packages to be done!
Q: What is StreamingResponse?
A: It's a special response type from FastAPI that takes an AsyncGenerator and sends each yielded value to the client as it comes.
Normal Response vs StreamingResponse:
======================================
# Normal Response
@app.post("/chat")
async def chat():
result = await process_chat(...)
return result # Sends ENTIRE response at once
# StreamingResponse
@app.post("/chat")
async def chat():
return StreamingResponse(
process_chat_stream(...), # AsyncGenerator
media_type="text/event-stream", # Tell client: "this is SSE"
headers={
"Cache-Control": "no-cache", # Don't cache events
"X-Accel-Buffering": "no", # Don't buffer (important for Nginx!)
},
)
StreamingResponse does:
1. Gets next yield from the generator
2. Sends it to client immediately
3. Gets next yield...
4. Sends it...
5. Repeat until generator is done
6. Closes connection
Frontend: How React Reads SSE
Now the most important part - how does the frontend READ this stream?
File: strakly_frontend/src/services/api/chat.service.ts
We use the fetch API + ReadableStream. Not EventSource (because we need POST, not GET).
Q: Why not use EventSource?
A: The browser has a built-in EventSource API for SSE:
const source = new EventSource("/chat")
BUT EventSource only supports GET requests!
We need POST (to send the message in the body).
So we use fetch() + manually read the stream.
Here's how it works step by step:
STEP 1: Send the request with SSE headers
==========================================
const response = await fetch(BOT_API_URL + "/chat", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": "Bearer " + token,
"Accept": "text/event-stream", // ← Tell server we want SSE!
},
body: JSON.stringify({
message: "how many clients?",
conversation_id: "abc-123",
}),
signal, // AbortController signal for cancellation
})
STEP 2: Get a reader from the response body ============================================= const reader = response.body?.getReader() // ^^^^^^^^^ // This gives us a ReadableStream reader // We can read chunks of data as they arrive const decoder = new TextDecoder() // Converts raw bytes to readable text (UTF-8) let buffer = "" // Holds incomplete data between reads (explained below)
STEP 3: Read chunks in a loop
===============================
while (true) {
const { done, value } = await reader.read()
// ^^^^ ^^^^^
// | Raw bytes (Uint8Array)
// |
// true when stream is finished
if (done) break // Stream ended, exit loop
// Convert bytes to text and add to buffer
buffer += decoder.decode(value, { stream: true })
// ^^^^^^^^^^^^
// Tells decoder: "more data coming"
// Process the buffer...
}
The Buffer Problem
This is an important concept! Data arrives in random-sized chunks, not one clean event per chunk.
The Problem:
=============
What we EXPECT (one event per chunk):
Chunk 1: data: {"token": "Hello"}\n\n
Chunk 2: data: {"token": " world"}\n\n
What ACTUALLY arrives (split randomly):
Chunk 1: data: {"token": "Hel
Chunk 2: lo"}\n\ndata: {"tok
Chunk 3: en": " world"}\n\n
The chunks don't align with event boundaries!
An event can be SPLIT across multiple chunks!
Or multiple events can arrive in ONE chunk!
Solution: Use a buffer!
How the buffer works:
======================
buffer = ""
─── Chunk 1 arrives: 'data: {"token": "Hel' ───
buffer = 'data: {"token": "Hel'
Split by \n:
lines = ['data: {"token": "Hel']
buffer = '' (last element, might be incomplete)
Wait... 'data: {"token": "Hel' has no \n at end
So it stays in buffer!
buffer = 'data: {"token": "Hel'
─── Chunk 2 arrives: 'lo"}\n\ndata: {"tok' ───
buffer = 'data: {"token": "Hello"}\n\ndata: {"tok'
Split by \n:
lines = ['data: {"token": "Hello"}', '', 'data: {"tok']
buffer = 'data: {"tok' (last element, incomplete)
Process lines:
'data: {"token": "Hello"}' → COMPLETE! Parse it! ✅
'' → empty, skip
'data: {"tok' → went into buffer, wait for more
buffer = 'data: {"tok'
─── Chunk 3 arrives: 'en": " world"}\n\n' ───
buffer = 'data: {"token": " world"}\n\n'
Split by \n:
lines = ['data: {"token": " world"}', '', '']
buffer = '' (last element is empty)
Process lines:
'data: {"token": " world"}' → COMPLETE! Parse it! ✅
Here's the actual code:
// Process SSE events from the buffer
const lines = buffer.split("\n")
buffer = lines.pop() || "" // Last element might be incomplete → keep in buffer
// ^^^^^^^^^^
// IMPORTANT! The last line might be cut off.
// So we pop it and keep it for next chunk.
for (const line of lines) {
const trimmed = line.trim()
// Skip empty lines and SSE comments
if (!trimmed || trimmed.startsWith(":")) continue
if (trimmed.startsWith("data: ")) {
const payload = trimmed.slice(6) // Remove "data: " prefix
// ^^^^^
// "data: " is 6 characters
if (payload === "[DONE]") continue // Stream finished marker
try {
const parsed = JSON.parse(payload)
// Handle each event type
if (parsed.token) onChunk(parsed.token)
if (parsed.conversation_id) conversationId = parsed.conversation_id
if (parsed.suggested_questions) suggestedQuestions = parsed.suggested_questions
if (parsed.action && onAction) onAction(parsed.action)
} catch {
// Not valid JSON - treat as plain text
onChunk(payload)
}
}
}
Visual Summary of the Reading Loop:
=====================================
Raw bytes arrive from network
│
▼
TextDecoder converts to string
│
▼
Append to buffer
│
▼
Split buffer by "\n"
│
┌─────┴─────┐
│ │
Complete lines Last element
(process them) (keep in buffer)
│
▼
For each line:
│
├── Empty? → skip
├── Starts with ":"? → skip (SSE comment)
├── Starts with "data:"?
│ │
│ ▼
│ Remove "data: " prefix
│ │
│ ▼
│ Is it "[DONE]"? → skip
│ │
│ ▼
│ JSON.parse()
│ │
│ ├── has .token? → onChunk()
│ ├── has .conversation_id? → save it
│ ├── has .suggested_questions? → save them
│ └── has .action? → onAction()
│
└── Else? → skip
How Tokens Become a Chat Message on Screen
Now let's see how the onChunk callback turns individual tokens into a visible message!
File: strakly_frontend/src/pages/AIChat/useAIChatPage.ts
Step 1: Before sending, create an EMPTY assistant message
==========================================================
const assistantMsgId = crypto.randomUUID() // "msg-abc-123"
const placeholderMessage = {
id: assistantMsgId,
role: "assistant",
content: "", // ← EMPTY! Will be filled by streaming
timestamp: new Date(),
}
// Track which message is currently streaming
streamingMessageIdRef.current = assistantMsgId
// Add empty message to the list
setMessages(prev => [...prev, placeholderMessage])
// What the UI shows:
// ┌──────────────────────────┐
// │ 🔵 🔵 🔵 (pulsing dots) │ ← Empty message shows loading dots
// └──────────────────────────┘
Step 2: onChunk callback APPENDS each token
=============================================
// This runs every time a token arrives from SSE
onChunk = (text) => {
setMessages(prev =>
prev.map(msg =>
msg.id === assistantMsgId
? { ...msg, content: msg.content + text }
// ^^^^^^^^^^^^^^^^^
// APPEND new token to existing content!
: msg
)
)
}
Timeline of what happens:
==========================
Token arrives: "You"
→ content = "" + "You" = "You"
→ UI shows: "You"
Token arrives: " have"
→ content = "You" + " have" = "You have"
→ UI shows: "You have"
Token arrives: " **42**"
→ content = "You have" + " **42**" = "You have **42**"
→ UI shows: "You have **42**"
Token arrives: " active"
→ content = "You have **42**" + " active" = "You have **42** active"
→ UI shows: "You have **42** active"
Token arrives: " clients."
→ content = "You have **42** active" + " clients."
→ UI shows: "You have **42** active clients."
[DONE] arrives
→ onDone() called
→ streamingMessageIdRef.current = null
→ Loading dots disappear
→ Message is final!
Visual - What the user sees over time:
=======================================
0.0s ┌──────────────────────────┐
│ 🔵 🔵 🔵 │ (loading dots)
└──────────────────────────┘
0.2s ┌──────────────────────────┐
│ You │
└──────────────────────────┘
0.4s ┌──────────────────────────┐
│ You have │
└──────────────────────────┘
0.6s ┌──────────────────────────┐
│ You have 42 │
└──────────────────────────┘
0.8s ┌──────────────────────────┐
│ You have 42 active │
└──────────────────────────┘
1.0s ┌──────────────────────────┐
│ You have 42 active clients│
└──────────────────────────┘
1.2s Suggested questions appear:
[List all clients] [New this month] [Expired]
This is the "typing effect"!
Abort / Cancel a Stream
Q: What if the user switches to a different conversation while the AI is still streaming?
A: We use AbortController to cancel the stream!
How AbortController works:
===========================
// 1. Create an AbortController before starting the stream
abortControllerRef.current = new AbortController()
// 2. Pass its signal to fetch()
await fetch(url, {
...
signal: abortControllerRef.current.signal,
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
// fetch watches this signal
// If we abort, fetch STOPS reading
})
// 3. When user clicks "New Chat" or switches conversation:
abortControllerRef.current.abort()
// ^^^^^^^
// This fires the signal!
// fetch throws AbortError
// Stream stops immediately!
// 4. In the error handler:
catch (error) {
if (error.name === "AbortError") return // Expected! Not an error.
onError(error.message) // Real error
}
Visual:
=======
User sends: "show all clients"
│
▼
Stream starts: "You" → "have" → "42" → ...
│
│ User clicks "New Chat" button
│
▼
abortControllerRef.current.abort()
│
▼
fetch catches AbortError → stops reading
│
▼
Stream cancelled! New conversation starts clean.
Fallback: What if SSE is not supported?
The code handles the case where the server responds with regular JSON instead of SSE:
const contentType = response.headers.get("content-type") || ""
if (contentType.includes("text/event-stream") || contentType.includes("text/plain")) {
// SSE streaming - read with reader loop (explained above)
const reader = response.body?.getReader()
...
} else {
// FALLBACK: Regular JSON response
const json = await response.json()
if (json.response) onChunk(json.response) // Send full response as one chunk
onDone(json.conversation_id, json.suggested_questions)
}
This means:
- If server sends SSE → token by token streaming (typing effect)
- If server sends JSON → full response at once (no typing effect, but still works!)
Complete Data Flow - End to End
COMPLETE SSE FLOW IN STRAKLY:
==============================
┌────────────────────────────────────────────────────────────┐
│ 1. USER types "how many clients?" and hits Send │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ 2. useAIChatPage.ts: handleSendMessage() │
│ │
│ - Creates user message → adds to UI │
│ - Creates empty assistant message (placeholder) │
│ - Sets streamingMessageIdRef (for loading dots) │
│ - Creates AbortController (for cancellation) │
│ - Calls chatService.sendMessageStream() │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ 3. chat.service.ts: sendMessageStream() │
│ │
│ fetch(BOT_API_URL + "/chat", { │
│ method: "POST", │
│ headers: { Accept: "text/event-stream" }, │
│ body: { message, conversation_id, branch_id }, │
│ signal: abortController.signal │
│ }) │
└──────────────────────────┬─────────────────────────────────┘
│
INTERNET / NETWORK
│
▼
┌────────────────────────────────────────────────────────────┐
│ 4. FastAPI: main.py /chat endpoint │
│ │
│ Checks Accept header: │
│ "text/event-stream" found! │
│ → Returns StreamingResponse(process_chat_stream()) │
│ → media_type = "text/event-stream" │
│ → Cache-Control: no-cache │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ 5. agent.py: process_chat_stream() │
│ │
│ yield conversation_id event │
│ │ │
│ ▼ │
│ AI starts generating... calls tools if needed │
│ │ │
│ ▼ │
│ yield token event (for each word) │
│ yield token event │
│ yield token event │
│ ... │
│ │ │
│ ▼ │
│ yield action events (if any) │
│ yield suggested_questions event │
│ yield [DONE] │
└──────────────────────────┬─────────────────────────────────┘
│
NETWORK (streaming)
│
▼
┌────────────────────────────────────────────────────────────┐
│ 6. chat.service.ts: ReadableStream reader loop │
│ │
│ while (true) { │
│ { done, value } = await reader.read() │
│ buffer += decoder.decode(value) │
│ lines = buffer.split("\n") │
│ for each line: │
│ parsed = JSON.parse(payload) │
│ │ │
│ ├── token? → onChunk(parsed.token) │
│ ├── conv_id? → save conversationId │
│ ├── questions? → save suggestedQuestions │
│ ├── action? → onAction(parsed.action) │
│ └── [DONE] → break │
│ } │
│ onDone(conversationId, suggestedQuestions) │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ 7. useAIChatPage.ts: Callbacks execute │
│ │
│ onChunk("You") → content = "You" │
│ onChunk(" have") → content = "You have" │
│ onChunk(" 42") → content = "You have 42" │
│ ... │
│ onDone() → streamingRef = null, save to DB │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ 8. React re-renders on each setMessages() │
│ │
│ ChatArea component sees updated content │
│ dangerouslySetInnerHTML renders the markdown→HTML │
│ User sees text appearing word by word! │
└────────────────────────────────────────────────────────────┘
Quick Recap
| Concept | Description |
|---|---|
| SSE | Server sends events one by one over a single HTTP connection |
| AsyncGenerator | Python function that yields values one at a time (not all at once) |
| yield | Sends one SSE event to the client immediately |
| StreamingResponse | FastAPI response that streams AsyncGenerator output to client |
| Accept: text/event-stream | HTTP header that tells server "I want SSE, not JSON" |
| ReadableStream | Browser API to read streaming data chunk by chunk |
| TextDecoder | Converts raw bytes (Uint8Array) to readable text string |
| Buffer | Holds incomplete data between chunks (SSE events can split across chunks) |
| data: prefix | SSE format: every event line starts with "data: " |
| \n\n | Double newline marks end of one SSE event |
| [DONE] | Custom marker that means "stream is finished" |
| AbortController | Cancels an ongoing fetch/stream when user navigates away |
| onChunk | Callback that appends each token to the message content |
| onDone | Callback that finalizes the message and saves to database |
Interview Questions
Q: What is SSE and how is it different from WebSockets?
"SSE (Server-Sent Events) is a protocol where the server sends events to the client over a single HTTP connection. Unlike WebSockets which are bi-directional, SSE is one-way - only server to client. SSE works over regular HTTP, is simpler to implement, and is ideal when you only need server-to-client streaming, like AI chat responses."
Q: Why use fetch + ReadableStream instead of EventSource for SSE?
"The browser's built-in EventSource API only supports GET requests. Our AI chat needs POST requests to send the user's message in the body. So we use fetch() with Accept: text/event-stream header, then manually read the response body using ReadableStream's getReader(). We decode chunks with TextDecoder and parse the SSE format ourselves."
Q: What is the buffer problem in SSE?
"Data arrives in random-sized chunks from the network, not aligned with SSE event boundaries. One event might be split across two chunks, or one chunk might contain multiple events. We solve this by maintaining a buffer string. We split by newlines, process complete lines, and keep the last incomplete line in the buffer for the next chunk."
Q: How does the typing effect work in the AI chat?
"We create an empty assistant message as a placeholder. Each time a token arrives from the SSE stream, the onChunk callback appends it to the message content using setMessages. React re-renders on each state update, so the user sees the text growing character by character - creating the typing effect."
Q: What is an AsyncGenerator in Python?
"An AsyncGenerator is an async function that uses yield instead of return. It produces values one at a time. When used with FastAPI's StreamingResponse, each yield sends data to the client immediately instead of waiting for the entire response to be ready. This is what enables real-time streaming."
Q: How do you cancel an SSE stream?
"We use the AbortController API. Before starting the fetch, we create an AbortController and pass its signal to fetch. When the user navigates away or starts a new chat, we call abort() on the controller. This causes the fetch to throw an AbortError, which we catch and handle gracefully."
Q: What is StreamingResponse in FastAPI?
"StreamingResponse is a FastAPI response class that takes an AsyncGenerator and streams its yielded values to the client. We set media_type to text/event-stream for SSE, Cache-Control to no-cache to prevent caching, and X-Accel-Buffering to no so Nginx doesn't buffer the stream."
Q: What happens if the browser doesn't support SSE?
"Our implementation has a fallback. The frontend checks the response content-type. If it's text/event-stream, it reads the stream. Otherwise, it falls back to parsing the response as regular JSON and delivers the full response at once through onChunk. The user loses the typing effect but still gets the answer."
Key Points to Remember
- SSE = Server sends data piece by piece over one HTTP connection
- SSE format: each line starts with data: and ends with \n\n
- Backend uses AsyncGenerator + yield to send events one at a time
- FastAPI StreamingResponse sends yielded values to the client
- Accept: text/event-stream header tells server to use SSE
- Frontend uses fetch + ReadableStream (not EventSource, because we need POST)
- TextDecoder converts raw bytes to text
- Buffer is needed because chunks don't align with event boundaries
- lines.pop() keeps incomplete data in buffer for next chunk
- onChunk callback appends each token to the message → typing effect
- AbortController cancels the stream when user navigates away
- Fallback: if server returns JSON instead of SSE, full response delivered at once
- Our SSE carries 6 event types: conversation_id, tools_used, token, action, suggested_questions, [DONE]
What's Next?
Now you understand SSE from the ground up! This same pattern can be used for:
- Real-time notifications
- Live dashboard updates
- Progress bars for long operations
- Any scenario where server needs to push data to client
Keep coding, keep learning! See you in the next one!
Post a Comment