Skip to main content

LangGraph v3 Event Streaming: Typed Projections Over a Content-Block Protocol

· 13 min read
Vadim Nicolai
Senior Software Engineer

Streaming an LLM to a user is easy. Consuming the stream on the server — token deltas, reasoning deltas, tool-call chunks, per-node state, subgraph events, usage metadata — is the part that turns into a pile of if chunk["type"] == ... branches. I shipped a streaming endpoint last week on LangGraph version="v2", because that is what's installed (1.1.8 locally, 1.2.4 on the server). The hand-rolled consumer was about twenty lines of fragile branching, a keepalive hack to stop a proxy from dropping the connection during DeepSeek's silent reasoning phase, and a manual accumulator that reset whenever langgraph_node changed.

LangGraph's version="v3" event-streaming API is what I'd reach for next, and the diff is the interesting part: it deletes most of that parsing. Instead of one undifferentiated event firehose you branch on, v3 gives you typed, per-channel projections you iterate independently, built on a content-block protocol that makes text, reasoning, tool-call, and multimodal boundaries explicit. v1 and v2 are unchanged. This is a walk through what v3 actually is, what it removes from your code, and where it still leaves work for you.

The problem v3 is aimed at

Every application that streams a graph back to a client confronts the same mess. The raw output is a sequence of events: messages, state snapshots, tool outputs, lifecycle transitions. Without a protocol, your code branches on the shape of each event, unpacks nested dictionaries, correlates tool calls by id, and accumulates tokens across node boundaries by hand. It works for a two-node graph and falls apart the moment you add a subgraph or a second LLM call.

The earlier versions exposed that raw stream directly. As described in the LangGraph streaming docs, version="v1" yields (mode, data) tuples; version="v2" yields StreamPart dicts you branch on by chunk["type"]. Both leave the bookkeeping to the consumer. v3 introspects the graph's execution channels and exposes typed projectionsrun.messages, run.values, run.lifecycle, run.subgraphs — over a content-block protocol that models LLM output as discrete blocks with explicit start, delta, and finish boundaries. The framework tracks the LLM calls, correlates the tool chunks, and separates reasoning from text, so you don't.

A walk through the v2 streaming code

Here is the shape of the v2 consumer I actually run, in a FastAPI endpoint behind a Next.js front end serving an email-compose graph:

async for part in graph.astream(payload, config, stream_mode=["messages", "values"], version="v2"):
if part["type"] == "messages":
msg, meta = part["data"] # (message_chunk, metadata)
node = (meta or {}).get("langgraph_node")
if node != current_phase: # reset accumulator on draft -> refine
current_phase = node
accumulated = ""
delta = getattr(msg, "content", "") or ""
if delta:
accumulated += delta
yield sse({"type": "chunk", "accumulated": accumulated, "phase": node})
elif part["type"] == "values":
last_values = part["data"] # keep latest; the last one is final

Every line is overhead the framework could own. Branch on part["type"]. Unpack the tuple. Track the current node to know when to reset the accumulator (the graph drafts in one node and refines in another, and you do not want the two concatenated). On top of this loop sits a second mechanism I'll come back to: the run is driven through an asyncio.Queue consumed with a timeout so the endpoint can emit a : keepalive comment during the model's silent reasoning phase — reasoning tokens never land on .content, so without it the stream looks idle and an intermediate proxy drops it. The first byte is a : open SSE comment, flushed before the graph produces anything, so the browser sees HTTP 200 immediately instead of a bodyless 504 on a cold backend.

The v3 equivalent collapses the message handling to this:

stream = await graph.astream_events(payload, version="v3")
async for message in stream.messages: # one ChatModelStream per LLM call
async for token in message.text:
await sse({"type": "chunk", "text": token, "node": message.node})
final = stream.output

No part["type"] branch, no (msg, meta) unpack, no accumulator, no phase reset. Because run.messages yields one ChatModelStream per LLM call, the draft and refine passes are already separate streams — the bookkeeping that the phase-reset hack did by hand is structural now. Reasoning deltas move to message.reasoning, and usage moves to message.output.usage_metadata.

Two layers and a content-block protocol

Under the hood, v3 is two layers. The streaming layer is the Pregel engine emitting raw graph-execution events on named channels: values, updates, messages, tools, lifecycle, checkpoints, input, tasks, custom. The event-streaming layer normalizes those into protocol events, routes each through a stack of transformers, and exposes the typed projections your code consumes. The router is the bridge: it passes each normalized event through the registered transformers, the built-in ones producing run.messages, run.values, run.lifecycle, and run.subgraphs. Multiple consumers can read different projections concurrently — reading run.messages does not consume the events run.values needs.

If you drop to the raw layer, every event is a ProtocolEvent envelope — the same shape defined in the Agent Protocol:

class ProtocolEvent(TypedDict):
seq: int # strictly increasing within a run; use this for ordering
method: str # channel: "messages" | "values" | "updates" | "tools" | ...
params: ProtocolEventParams # namespace, timestamp, channel-specific data

params.namespace is the path from the root graph to the emitting scope. The root is []; a nested tool call inside a subgraph looks like ["researcher:6f4d", "tools:91ac"], where the name before the colon is the stable node/graph name and the suffix is a per-invocation id. The run.subgraphs projection does that namespace filtering for you, which is the whole point — you rarely want to parse those strings by hand.

The messages channel is the content-block protocol. Each message arrives as a sequence of blocks, and data.event is one of message-start, content-block-start, content-block-delta, content-block-finish, message-finish. A block starts, emits zero or more deltas, and finishes before the next block in the same message begins. That explicit boundary is what makes text, reasoning, tool-call arguments, and multimodal content distinguishable without provider-specific parsing. If you want the raw deltas instead of the projection, you filter by block type:

for event in stream:
if event["method"] != "messages":
continue
data = event["params"]["data"][0]
if not isinstance(data, dict) or data.get("event") != "content-block-delta":
continue
block = data.get("delta") or {}
if block.get("type") == "text-delta":
print(block.get("text", ""), end="", flush=True)
elif block.get("type") == "reasoning-delta":
print(f"[thinking]{block.get('reasoning', '')}", end="", flush=True)

message-finish may carry token usage; an unrecoverable model-call failure arrives as a message error event rather than a thrown exception mid-stream.

Typed projections in practice

The projections are where v3 earns its keep. run.messages yields one ChatModelStream per LLM call, exposing .text, .reasoning, .tool_calls, and .output.usage_metadata. message.text is iterable for token-by-token output, or you call str(message.text) for the whole thing:

for message in stream.messages:
for token in message.text:
print(token, end="", flush=True)
usage = message.output.usage_metadata
final_state = stream.output

run.values streams full state snapshots after each step, and stream.output resolves to the final value. run.lifecycle emits started / running / completed / failed / interrupted per run, subgraph, and subagent, with an optional graph_name, error, and cause. run.subgraphs surfaces nested executions as their own objects (.graph_name, .path, .messages). For concurrent consumption in async code you await graph.astream_events(...) and asyncio.gather over the projections; for ordered consumption in sync code you use stream.interleave("values", "messages", "subgraphs"), which yields items in strict arrival order.

The evolution across versions, side by side:

Scenariov1 (default)v2v3
Single stream moderaw data (dict)StreamPart dict {type, ns, data}typed projection iterators
Multiple modes(mode, data) tuplessame StreamPart, branch on chunk["type"]iterate run.messages / run.values / …
LLM output(message_chunk, metadata)same, inside StreamPart.dataChatModelStream with .text / .reasoning / .tool_calls / .output.usage_metadata
Consumer codebranch on shapebranch on typeiterate the projection you want

And the channels, mapped to what you'd actually use them for:

ChannelProjectionUse
messagesrun.messageschat-model tokens, reasoning, tool-call args, usage
valuesrun.valuesfull state snapshots; stream.output is the final value
lifecyclerun.lifecyclerun / subgraph / subagent status transitions
(nested)run.subgraphsnested graph executions without namespace parsing
updates / custom / checkpoints / tasks / debugopt-in transformersper-node deltas, custom events, time-travel, task/debug detail

Custom projections via transformers

When the projection you want does not exist, you write a StreamTransformer: init(), process(event), finalize(), and fail(err). process sees every ProtocolEvent and returns True to pass it through or False to suppress it. A transformer declares required_stream_modes so the runtime knows which raw channels to emit — the runtime takes the union across all registered transformers, and a mode no transformer requests is never produced.

A token-usage aggregator is the obvious first one, and it doubles as an observability hook:

class StatsTransformer(StreamTransformer):
required_stream_modes = ("messages",)
def __init__(self, scope=()):
super().__init__(scope)
self.total = 0
self.log = StreamChannel[int]()
def init(self):
return {"total_tokens": self.log}
def process(self, event):
d = event["params"]["data"]
if isinstance(d, dict):
self.total += (d.get("usage") or {}).get("output_tokens") or 0
return True
def finalize(self):
self.log.push(self.total)
self.log.close()

Register it per call — stream_events(inp, version="v3", transformers=[StatsTransformer]) — or at compile time so every run produces the projection. StreamChannel is the projection primitive. A named channel (StreamChannel("total_tokens")) is exposed under stream.extensions and each push() also flows into the main stream as a custom:total_tokens event, so its payload must be serializable. An unnamed channel is a side projection only, which is where you keep in-process handles that can't be serialized. The built-in ToolCallTransformer uses exactly this contract to expose stream.tool_calls.

What this does for tracing and observability

The reason I care about projections is not ergonomics for its own sake — it's that the data I want for telemetry stops being something I scrape out of logs. The system I run already carries two observability planes: LangSmith tracing natively, and OpenTelemetry over OTLP, with the TypeScript client injecting W3C traceparent and langsmith-trace headers so a single trace spans the browser, the Vercel route, the FastAPI process, and the graph. The missing piece, on v2, is structured per-call signal inside the stream. v3's projections are that signal:

  • Per-call token cost and the reasoning split. message.output.usage_metadata gives input/output tokens per LLM call. Thinking-mode cost is otherwise close to invisible, because .content stays empty while the model reasons; message.reasoning makes those tokens a first-class projection you can meter.
  • Time-to-first-token. Timestamp the first item out of message.text against the run's start. On v2 you reconstruct this from a noisy delta stream; here it's one event.
  • Per-node and per-subgraph latency. run.lifecycle emits started and completed transitions, and seq on each ProtocolEvent gives you a reliable order without trusting wall-clock timestamps that can drift.
  • Spans that match the graph. run.subgraphs mirrors the execution topology, so an OpenTelemetry span tree can follow the actual DAG instead of a flattened log.

A small StreamTransformer that pushes usage and lifecycle events into your metrics exporter is the natural bridge: the transformer observes the raw events, and a named channel forwards the derived numbers into the same trace the rest of your request already belongs to.

The parts v3 does not do for you

Streaming over SSE still has sharp edges, and the framework owning the parsing doesn't make them disappear. The keepalive problem is the clearest example: a model that reasons silently produces no message.text tokens for tens of seconds, and an idle SSE connection gets dropped by whatever proxy sits between you and the browser. You still emit a periodic comment frame to keep the connection warm — v3 just makes the reason for the silence legible, because the reasoning deltas are a projection you can surface as a "thinking…" status instead of dead air. Error frames (message-finish carrying an error, or a failed lifecycle event) need handling so a mid-stream model failure becomes a clean event to your client rather than a truncated response. And client disconnects should abort the upstream run, or you pay for tokens nobody reads.

Honest limits

v3 requires a recent langgraph release; older versions expose only v1/v2 and the async astream_events, so plenty of running systems (mine included) still stream on v2. Before migrating, the edges worth knowing:

  • Structured output streams as characters, not prose. If a node uses JSON mode and the model emits {"subject": "...", "body": "..."}, the token stream is braces and quotes. The projection gives you tokens; the authoritative parsed result still comes from the final state, not from reassembling the stream. This is true on every version — it's a property of structured generation, not of v3.
  • Reasoning is separate from text. Read only message.text and you miss message.reasoning. If you want the full output you consume both.
  • Sync and async iterate differentlyfor versus async for over message.text. On Python < 3.11, async consumption needs explicit RunnableConfig propagation and a writer= argument instead of get_stream_writer, because that runtime can't carry the context automatically.
  • Named StreamChannel payloads must be serializable, since they become custom:<name> events on the main stream. Promises, async iterables, and class instances belong in unnamed channels.

When to reach for it

SituationWhat I'd do
New LangGraph project, no streaming code yetBuild on v3. The projections are simpler than the v2 branching, and the simplicity compounds as the graph grows subgraphs and parallel nodes.
Existing v1/v2 system in productionStay on v2 until the v3 surface you depend on has settled. The ergonomic win is real, but it does not by itself justify reworking a path you already ship.
You need reasoning deltas, tool-call argument streaming, or per-call usagev3 is the clean path; v2 makes you reconstruct all three by hand.
Python < 3.11 with asyncBudget time for the explicit RunnableConfig / writer= pattern before you commit.
Observability is a first-order requirementv3's run.lifecycle and run.subgraphs turn per-node latency and failure data into projections instead of log archaeology.

The shape of the change

The redesign reflects a move from "stream the tokens" to "treat a graph execution as a structured protocol." Branching on chunk["type"] is fine for a two-node graph; it grows badly the moment you add subgraphs, parallel nodes, and multiple coordinated LLM calls. v3 pulls that complexity into the framework, and the consumer code ends up doing what it says — iterate the messages, read the usage, await the output. I'm still on v2 in production, but I've already rewritten the consumer in a branch to see the diff, and the diff is the argument. The next time you write a streaming endpoint, you'll want to know which version you're reaching for.


References

  1. LangGraph (OSS) — Streaming: stream modes and version="v2". https://docs.langchain.com/oss/python/langgraph/streaming
  2. LangGraph (OSS) — Event streaming: version="v3", typed projections, transformers, the ProtocolEvent envelope and channel list. https://docs.langchain.com/oss/python/langgraph/event-streaming
  3. Agent Protocol — wire-level event and command formats: langchain-protocol (PyPI), @langchain/protocol (npm), and langchain-ai/agent-protocol on GitHub.
  4. LangSmith — tracing quickstart (native LangGraph tracing).
  5. OpenTelemetry — OTLP exporter and span model (vendor-neutral tracing referenced in the observability section).

Observable AI Memory: mem0, LangGraph, and Qdrant with Enterprise-Grade Telemetry

· 13 min read
Vadim Nicolai
Senior Software Engineer

Most "AI memory" demos stop at memory.add() and memory.search(). That works on a laptop. It does not survive contact with production. The real questions are: When this recall is slow, which store is to blame? When a graph's spend triples overnight, which feature caused it? When a customer asks "what did your agent remember about me, and when?", can you answer from an audit log instead of a shrug?

TL;DR — This field report shows how to build an agent memory layer where every operation honors a contract: fail-open, PII-safe, and fully instrumented. Three stores (mem0, Qdrant, LangGraph) are funneled through single chokepoints, and each chokepoint fans out to five telemetry sinks. The result is a stack that answers the hard production questions without guesswork.

Multi-Probe Bayesian Spam Gating: Filtering Junk Before Spending Compute

· 44 min read
Vadim Nicolai
Senior Software Engineer

In a B2B lead generation pipeline, every email that arrives costs compute. Scoring it for buyer intent, extracting entities, predicting reply probability, matching it against your ideal customer profile — each module is a DeBERTa forward pass. If 40% of inbound email is template spam, AI-generated slop, or mass-sent campaigns, you are burning 40% of your GPU budget on garbage.

The solution is a gating module: a spam classifier that sits at stage 2 of the pipeline and filters junk before anything else runs. But a binary spam/not-spam classifier is too blunt. You need to know why something is spam (template? AI-generated? role account?), how confident you are (is it ambiguous, or have you never seen this pattern before?), and which provider will block it (Gmail is stricter than Yahoo on link density).

This article documents a hierarchical Bayesian spam gating system with 4 aspect-specific attention probes, information-theoretic AI detection features, uncertainty decomposition, and a full Rust distillation path. The Python model trains on DeBERTa-v3-base. The Rust classifier runs at batch speed with 24 features and zero ML dependencies.

Building a ZoomInfo Alternative with Qwen and MLX: Local Buyer Intent Detection

· 11 min read
Vadim Nicolai
Senior Software Engineer

ZoomInfo charges $300+ per user per month for intent data — buying signals that tell sales teams which companies are actively in-market. It is the platform's number one feature and the reason enterprises pay six figures annually for access. But the underlying technology — classifying company content into intent categories — is a text classification problem. One that a 3-billion-parameter open-source model can solve on a single laptop.

Fine-Tune Qwen3 with LoRA for AI Cold Email Outreach

· 27 min read
Vadim Nicolai
Senior Software Engineer

An AI cold email engine does one thing: it reads what you know about a company and writes a personalized outreach email — automatically, at scale. If you've ever spent an afternoon manually tweaking 50 nearly-identical emails, you understand the problem. If you've paid for Instantly, Smartlead, or Apollo, you've already solved it — just not on your own terms.

Those SaaS tools charge $30-200/month, send your prospect list to their servers, and give you a black-box model you can't touch. You can't train it on your best-performing emails. You can't add custom quality gates. You can't run it offline. For engineers and technical founders, that's a bad deal.

This system is the alternative: a locally-run pipeline where you own every layer — model weights, scoring logic, and approval gates. The core is Qwen3-1.7B, fine-tuned with LoRA adapters on MLX (Apple's framework for M1/M2 Metal acceleration). A Rust orchestration layer drives the full batch loop: pulling company records, invoking the model, running quality filters, and surfacing emails for human review before anything sends.

The result is not a toy. On a single M1 MacBook Pro, the pipeline generates 200+ personalized emails per batch in under 10 seconds — no GPU cloud, no API latency, no per-email cost. Fine-tuning converges in under 30 minutes on the same machine.

TurboQuant: 3-Bit KV Caches with Zero Accuracy Loss

· 16 min read
Vadim Nicolai
Senior Software Engineer

Every token your LLM generates forces it to reread its entire conversational history. That history -- the Key-Value cache -- is the single largest memory bottleneck during inference. A Llama-3.1-70B serving a 128K-token context in FP16 burns through ~40 GB of VRAM on KV cache alone, leaving almost nothing for weights on a single 80 GB H100. The standard remedies -- eviction (SnapKV, PyramidKV) and sparse attention -- trade accuracy for memory. They throw tokens away.

TurboQuant, published at ICLR 2026 by Zandieh, Daliri, Hadian, and Mirrokni from Google Research, takes the opposite approach: keep every token, compress every value. At 3 bits per coordinate it delivers 6x memory reduction. At 4 bits it delivers up to 8x speedup in computing attention logits on H100 GPUs. The headline result: on LongBench with Llama-3.1-8B-Instruct, the 3.5-bit configuration scores 50.06 -- identical to the 16-bit baseline. No retraining. No fine-tuning. No calibration data.

ScrapeGraphAI Qwen3-1.7B: Fine-Tuned Web Extraction Model and 100k Dataset

· 59 min read
Vadim Nicolai
Senior Software Engineer

Leading cloud extraction APIs are orders of magnitude larger than the model that just beat them at structured web extraction. This isn't a marginal win — it's a 3.4 percentage point lead on the de facto standard SWDE benchmark. The secret isn't a novel architecture; it's domain-specific fine-tuning on a 100,000-example dataset of real scraping trajectories. The ScrapeGraphAI team's release of a fine-tuned Qwen3-1.7B model flips the conventional scaling law on its head and delivers a complete open-source stack (model and dataset under Apache 2.0, library under MIT) for production. This is a blueprint for how narrow, expert models will outperform generalist giants — if you have the right data.

How Novelty Drives an RL Web Crawler

· 14 min read
Vadim Nicolai
Senior Software Engineer

The most dangerous assumption in applied Reinforcement Learning (RL) is that useful exploration requires massive scale—cloud GPU clusters, terabytes of experience, and billion-parameter models. I built a system that proves the opposite. The core innovation of a production-grade, B2B lead generation web crawler isn't its performance, but its location: it runs entirely on an Apple M1 MacBook, with zero cloud dependencies. Its ability to navigate the sparse-reward desert of the web emerges not from brute force, but from a meticulously orchestrated multi-timescale novelty engine. This architecture, where intrinsic curiosity, predictive uncertainty, and a self-adjusting curriculum interlock, provides a general blueprint for building autonomous agents that must find needles in the world's largest haystacks.