LangGraph v3 Event Streaming: Typed Projections Over a Content-Block Protocol
Streaming an LLM to a user is easy. Consuming the stream on the server — token deltas, reasoning deltas, tool-call chunks, per-node state, subgraph events, usage metadata — is the part that turns into a pile of if chunk["type"] == ... branches. I shipped a streaming endpoint last week on LangGraph version="v2", because that is what's installed (1.1.8 locally, 1.2.4 on the server). The hand-rolled consumer was about twenty lines of fragile branching, a keepalive hack to stop a proxy from dropping the connection during DeepSeek's silent reasoning phase, and a manual accumulator that reset whenever langgraph_node changed.
LangGraph's version="v3" event-streaming API is what I'd reach for next, and the diff is the interesting part: it deletes most of that parsing. Instead of one undifferentiated event firehose you branch on, v3 gives you typed, per-channel projections you iterate independently, built on a content-block protocol that makes text, reasoning, tool-call, and multimodal boundaries explicit. v1 and v2 are unchanged. This is a walk through what v3 actually is, what it removes from your code, and where it still leaves work for you.
The problem v3 is aimed at
Every application that streams a graph back to a client confronts the same mess. The raw output is a sequence of events: messages, state snapshots, tool outputs, lifecycle transitions. Without a protocol, your code branches on the shape of each event, unpacks nested dictionaries, correlates tool calls by id, and accumulates tokens across node boundaries by hand. It works for a two-node graph and falls apart the moment you add a subgraph or a second LLM call.
The earlier versions exposed that raw stream directly. As described in the LangGraph streaming docs, version="v1" yields (mode, data) tuples; version="v2" yields StreamPart dicts you branch on by chunk["type"]. Both leave the bookkeeping to the consumer. v3 introspects the graph's execution channels and exposes typed projections — run.messages, run.values, run.lifecycle, run.subgraphs — over a content-block protocol that models LLM output as discrete blocks with explicit start, delta, and finish boundaries. The framework tracks the LLM calls, correlates the tool chunks, and separates reasoning from text, so you don't.
A walk through the v2 streaming code
Here is the shape of the v2 consumer I actually run, in a FastAPI endpoint behind a Next.js front end serving an email-compose graph:
async for part in graph.astream(payload, config, stream_mode=["messages", "values"], version="v2"):
if part["type"] == "messages":
msg, meta = part["data"] # (message_chunk, metadata)
node = (meta or {}).get("langgraph_node")
if node != current_phase: # reset accumulator on draft -> refine
current_phase = node
accumulated = ""
delta = getattr(msg, "content", "") or ""
if delta:
accumulated += delta
yield sse({"type": "chunk", "accumulated": accumulated, "phase": node})
elif part["type"] == "values":
last_values = part["data"] # keep latest; the last one is final
Every line is overhead the framework could own. Branch on part["type"]. Unpack the tuple. Track the current node to know when to reset the accumulator (the graph drafts in one node and refines in another, and you do not want the two concatenated). On top of this loop sits a second mechanism I'll come back to: the run is driven through an asyncio.Queue consumed with a timeout so the endpoint can emit a : keepalive comment during the model's silent reasoning phase — reasoning tokens never land on .content, so without it the stream looks idle and an intermediate proxy drops it. The first byte is a : open SSE comment, flushed before the graph produces anything, so the browser sees HTTP 200 immediately instead of a bodyless 504 on a cold backend.
The v3 equivalent collapses the message handling to this:
stream = await graph.astream_events(payload, version="v3")
async for message in stream.messages: # one ChatModelStream per LLM call
async for token in message.text:
await sse({"type": "chunk", "text": token, "node": message.node})
final = stream.output
No part["type"] branch, no (msg, meta) unpack, no accumulator, no phase reset. Because run.messages yields one ChatModelStream per LLM call, the draft and refine passes are already separate streams — the bookkeeping that the phase-reset hack did by hand is structural now. Reasoning deltas move to message.reasoning, and usage moves to message.output.usage_metadata.
Two layers and a content-block protocol
Under the hood, v3 is two layers. The streaming layer is the Pregel engine emitting raw graph-execution events on named channels: values, updates, messages, tools, lifecycle, checkpoints, input, tasks, custom. The event-streaming layer normalizes those into protocol events, routes each through a stack of transformers, and exposes the typed projections your code consumes. The router is the bridge: it passes each normalized event through the registered transformers, the built-in ones producing run.messages, run.values, run.lifecycle, and run.subgraphs. Multiple consumers can read different projections concurrently — reading run.messages does not consume the events run.values needs.
If you drop to the raw layer, every event is a ProtocolEvent envelope — the same shape defined in the Agent Protocol:
class ProtocolEvent(TypedDict):
seq: int # strictly increasing within a run; use this for ordering
method: str # channel: "messages" | "values" | "updates" | "tools" | ...
params: ProtocolEventParams # namespace, timestamp, channel-specific data
params.namespace is the path from the root graph to the emitting scope. The root is []; a nested tool call inside a subgraph looks like ["researcher:6f4d", "tools:91ac"], where the name before the colon is the stable node/graph name and the suffix is a per-invocation id. The run.subgraphs projection does that namespace filtering for you, which is the whole point — you rarely want to parse those strings by hand.
The messages channel is the content-block protocol. Each message arrives as a sequence of blocks, and data.event is one of message-start, content-block-start, content-block-delta, content-block-finish, message-finish. A block starts, emits zero or more deltas, and finishes before the next block in the same message begins. That explicit boundary is what makes text, reasoning, tool-call arguments, and multimodal content distinguishable without provider-specific parsing. If you want the raw deltas instead of the projection, you filter by block type:
for event in stream:
if event["method"] != "messages":
continue
data = event["params"]["data"][0]
if not isinstance(data, dict) or data.get("event") != "content-block-delta":
continue
block = data.get("delta") or {}
if block.get("type") == "text-delta":
print(block.get("text", ""), end="", flush=True)
elif block.get("type") == "reasoning-delta":
print(f"[thinking]{block.get('reasoning', '')}", end="", flush=True)
message-finish may carry token usage; an unrecoverable model-call failure arrives as a message error event rather than a thrown exception mid-stream.
Typed projections in practice
The projections are where v3 earns its keep. run.messages yields one ChatModelStream per LLM call, exposing .text, .reasoning, .tool_calls, and .output.usage_metadata. message.text is iterable for token-by-token output, or you call str(message.text) for the whole thing:
for message in stream.messages:
for token in message.text:
print(token, end="", flush=True)
usage = message.output.usage_metadata
final_state = stream.output
run.values streams full state snapshots after each step, and stream.output resolves to the final value. run.lifecycle emits started / running / completed / failed / interrupted per run, subgraph, and subagent, with an optional graph_name, error, and cause. run.subgraphs surfaces nested executions as their own objects (.graph_name, .path, .messages). For concurrent consumption in async code you await graph.astream_events(...) and asyncio.gather over the projections; for ordered consumption in sync code you use stream.interleave("values", "messages", "subgraphs"), which yields items in strict arrival order.
The evolution across versions, side by side:
| Scenario | v1 (default) | v2 | v3 |
|---|---|---|---|
| Single stream mode | raw data (dict) | StreamPart dict {type, ns, data} | typed projection iterators |
| Multiple modes | (mode, data) tuples | same StreamPart, branch on chunk["type"] | iterate run.messages / run.values / … |
| LLM output | (message_chunk, metadata) | same, inside StreamPart.data | ChatModelStream with .text / .reasoning / .tool_calls / .output.usage_metadata |
| Consumer code | branch on shape | branch on type | iterate the projection you want |
And the channels, mapped to what you'd actually use them for:
| Channel | Projection | Use |
|---|---|---|
messages | run.messages | chat-model tokens, reasoning, tool-call args, usage |
values | run.values | full state snapshots; stream.output is the final value |
lifecycle | run.lifecycle | run / subgraph / subagent status transitions |
| (nested) | run.subgraphs | nested graph executions without namespace parsing |
updates / custom / checkpoints / tasks / debug | opt-in transformers | per-node deltas, custom events, time-travel, task/debug detail |
Custom projections via transformers
When the projection you want does not exist, you write a StreamTransformer: init(), process(event), finalize(), and fail(err). process sees every ProtocolEvent and returns True to pass it through or False to suppress it. A transformer declares required_stream_modes so the runtime knows which raw channels to emit — the runtime takes the union across all registered transformers, and a mode no transformer requests is never produced.
A token-usage aggregator is the obvious first one, and it doubles as an observability hook:
class StatsTransformer(StreamTransformer):
required_stream_modes = ("messages",)
def __init__(self, scope=()):
super().__init__(scope)
self.total = 0
self.log = StreamChannel[int]()
def init(self):
return {"total_tokens": self.log}
def process(self, event):
d = event["params"]["data"]
if isinstance(d, dict):
self.total += (d.get("usage") or {}).get("output_tokens") or 0
return True
def finalize(self):
self.log.push(self.total)
self.log.close()
Register it per call — stream_events(inp, version="v3", transformers=[StatsTransformer]) — or at compile time so every run produces the projection. StreamChannel is the projection primitive. A named channel (StreamChannel("total_tokens")) is exposed under stream.extensions and each push() also flows into the main stream as a custom:total_tokens event, so its payload must be serializable. An unnamed channel is a side projection only, which is where you keep in-process handles that can't be serialized. The built-in ToolCallTransformer uses exactly this contract to expose stream.tool_calls.
What this does for tracing and observability
The reason I care about projections is not ergonomics for its own sake — it's that the data I want for telemetry stops being something I scrape out of logs. The system I run already carries two observability planes: LangSmith tracing natively, and OpenTelemetry over OTLP, with the TypeScript client injecting W3C traceparent and langsmith-trace headers so a single trace spans the browser, the Vercel route, the FastAPI process, and the graph. The missing piece, on v2, is structured per-call signal inside the stream. v3's projections are that signal:
- Per-call token cost and the reasoning split.
message.output.usage_metadatagives input/output tokens per LLM call. Thinking-mode cost is otherwise close to invisible, because.contentstays empty while the model reasons;message.reasoningmakes those tokens a first-class projection you can meter. - Time-to-first-token. Timestamp the first item out of
message.textagainst the run's start. On v2 you reconstruct this from a noisy delta stream; here it's one event. - Per-node and per-subgraph latency.
run.lifecycleemitsstartedandcompletedtransitions, andseqon eachProtocolEventgives you a reliable order without trusting wall-clock timestamps that can drift. - Spans that match the graph.
run.subgraphsmirrors the execution topology, so an OpenTelemetry span tree can follow the actual DAG instead of a flattened log.
A small StreamTransformer that pushes usage and lifecycle events into your metrics exporter is the natural bridge: the transformer observes the raw events, and a named channel forwards the derived numbers into the same trace the rest of your request already belongs to.
The parts v3 does not do for you
Streaming over SSE still has sharp edges, and the framework owning the parsing doesn't make them disappear. The keepalive problem is the clearest example: a model that reasons silently produces no message.text tokens for tens of seconds, and an idle SSE connection gets dropped by whatever proxy sits between you and the browser. You still emit a periodic comment frame to keep the connection warm — v3 just makes the reason for the silence legible, because the reasoning deltas are a projection you can surface as a "thinking…" status instead of dead air. Error frames (message-finish carrying an error, or a failed lifecycle event) need handling so a mid-stream model failure becomes a clean event to your client rather than a truncated response. And client disconnects should abort the upstream run, or you pay for tokens nobody reads.
Honest limits
v3 requires a recent langgraph release; older versions expose only v1/v2 and the async astream_events, so plenty of running systems (mine included) still stream on v2. Before migrating, the edges worth knowing:
- Structured output streams as characters, not prose. If a node uses JSON mode and the model emits
{"subject": "...", "body": "..."}, the token stream is braces and quotes. The projection gives you tokens; the authoritative parsed result still comes from the final state, not from reassembling the stream. This is true on every version — it's a property of structured generation, not of v3. - Reasoning is separate from text. Read only
message.textand you missmessage.reasoning. If you want the full output you consume both. - Sync and async iterate differently —
forversusasync forovermessage.text. On Python < 3.11, async consumption needs explicitRunnableConfigpropagation and awriter=argument instead ofget_stream_writer, because that runtime can't carry the context automatically. - Named
StreamChannelpayloads must be serializable, since they becomecustom:<name>events on the main stream. Promises, async iterables, and class instances belong in unnamed channels.
When to reach for it
| Situation | What I'd do |
|---|---|
| New LangGraph project, no streaming code yet | Build on v3. The projections are simpler than the v2 branching, and the simplicity compounds as the graph grows subgraphs and parallel nodes. |
| Existing v1/v2 system in production | Stay on v2 until the v3 surface you depend on has settled. The ergonomic win is real, but it does not by itself justify reworking a path you already ship. |
| You need reasoning deltas, tool-call argument streaming, or per-call usage | v3 is the clean path; v2 makes you reconstruct all three by hand. |
| Python < 3.11 with async | Budget time for the explicit RunnableConfig / writer= pattern before you commit. |
| Observability is a first-order requirement | v3's run.lifecycle and run.subgraphs turn per-node latency and failure data into projections instead of log archaeology. |
The shape of the change
The redesign reflects a move from "stream the tokens" to "treat a graph execution as a structured protocol." Branching on chunk["type"] is fine for a two-node graph; it grows badly the moment you add subgraphs, parallel nodes, and multiple coordinated LLM calls. v3 pulls that complexity into the framework, and the consumer code ends up doing what it says — iterate the messages, read the usage, await the output. I'm still on v2 in production, but I've already rewritten the consumer in a branch to see the diff, and the diff is the argument. The next time you write a streaming endpoint, you'll want to know which version you're reaching for.
References
- LangGraph (OSS) — Streaming: stream modes and
version="v2". https://docs.langchain.com/oss/python/langgraph/streaming - LangGraph (OSS) — Event streaming:
version="v3", typed projections, transformers, theProtocolEventenvelope and channel list. https://docs.langchain.com/oss/python/langgraph/event-streaming - Agent Protocol — wire-level event and command formats:
langchain-protocol(PyPI),@langchain/protocol(npm), andlangchain-ai/agent-protocolon GitHub. - LangSmith — tracing quickstart (native LangGraph tracing).
- OpenTelemetry — OTLP exporter and span model (vendor-neutral tracing referenced in the observability section).
