Token Streaming¶
This guide explains how to enable token-level LLM streaming with deepagent-temporal.
Background¶
Temporal Activities are request-response: the entire LLM call runs to completion inside an Activity before the result is returned. By default, astream emits Activity-level events (node started, node completed) — not individual LLM tokens.
deepagent-temporal solves this by injecting a LangChain callback handler (TokenCapturingHandler) that intercepts on_llm_new_token events from the chat model running inside ainvoke(). This captures tokens without changing the Activity execution model.
Phase 1: Buffered Token Streaming (No Extra Infrastructure)¶
Enable enable_token_streaming=True to capture individual tokens. Without Redis, tokens are buffered in the Activity result and delivered after the Activity completes:
from deepagent_temporal import TemporalDeepAgent
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="my-agents",
enable_token_streaming=True,
)
# On the worker side, create_worker() wraps graph nodes automatically
worker = temporal_agent.create_worker(
workflow_runner=UnsandboxedWorkflowRunner(),
)
# On the client side, use stream_mode="tokens"
async for event in temporal_agent.astream(
{"messages": [HumanMessage(content="Hello")]},
config={"configurable": {"thread_id": "t1"}},
stream_mode="tokens",
):
print(event["token"], end="", flush=True)
Each event is a dict with:
| Field | Type | Description |
|---|---|---|
type |
str |
Always "token" |
token |
str |
The token text |
node_name |
str |
Graph node that produced this token |
index |
int |
Token position within this LLM call |
is_final |
str |
"1" if this is the last event, "0" otherwise |
attempt |
int |
Activity attempt number (for deduplication) |
Latency: Tokens arrive after the full LLM call completes. This gives token-level data granularity but not real-time delivery.
Phase 2: Real-Time Streaming via Redis¶
For real-time token delivery (~10-50ms per token), add a RedisStreamBackend:
from deepagent_temporal import TemporalDeepAgent, RedisStreamBackend
redis_backend = RedisStreamBackend(redis_url="redis://localhost:6379")
temporal_agent = TemporalDeepAgent(
agent, client,
task_queue="my-agents",
enable_token_streaming=True,
redis_stream_backend=redis_backend,
)
With Redis configured:
- The
TokenCapturingHandlerpublishes each token to a Redis Stream viaXADDas it arrives from the LLM - The client subscribes via
XREADand receives tokens in real-time - Temporal still handles durable state (the Activity result contains a summary, not individual tokens)
Redis Configuration¶
redis_backend = RedisStreamBackend(
redis_url="redis://localhost:6379",
channel_prefix="deepagent:stream:", # Redis key prefix
stream_maxlen=5000, # Approximate max entries per stream
stream_ttl_seconds=300, # TTL after stream completes
)
Standalone Streaming Worker¶
For advanced setups, use create_streaming_worker directly:
from deepagent_temporal import create_streaming_worker
worker = create_streaming_worker(
graph,
client,
task_queue="my-agents",
redis_url="redis://localhost:6379",
use_worker_affinity=True,
node_names=["call_model"], # Only wrap LLM-calling nodes
)
Graceful Degradation¶
If Redis is unavailable:
RedisStreamBackend.publishcatches connection errors and logs a warning- Tokens are still captured by the callback handler
- The client falls back to receiving tokens from the Temporal stream buffer (Phase 1 behavior)
No crash, no data loss on the durable path.
Middleware Compatibility¶
The callback handler works with all Deep Agent middleware:
- SummarizationMiddleware — May trigger two LLM calls (summarization + response). The handler tracks
_llm_call_countand resets the token index on each new call, so clients receive clean token sequences. - PatchToolCallsMiddleware — Patches tool call IDs after LLM completion. Streamed tool call chunks may have different IDs than the patched final result. Use the final
AIMessagefrom the Activity result for authoritative tool call IDs. - AnthropicPromptCachingMiddleware — Modifies the request only. No interaction with streaming.
Event History Considerations¶
When using Phase 1 (no Redis), individual token events are stored in NodeActivityOutput.custom_data, which is part of Temporal's Event History. A 2000-token response adds ~100KB per Activity result. Over hundreds of steps, this can approach Temporal's 50MB Event History limit.
Mitigations:
- Use
SummarizationMiddlewareto keep conversations shorter - Use
validate_payload_size()to monitor state size - Upgrade to Phase 2 (Redis) — only a small summary is stored in Event History
How It Works Internally¶
TemporalDeepAgent.create_worker()callswrap_graph_for_streaming()which wraps each node'sboundattribute with aStreamingNodeWrapper- When upstream
_execute_node_implcallsnode.bound.ainvoke(), the wrapper intercepts the call - The wrapper injects a
TokenCapturingHandlerintoconfig["callbacks"] - The LLM fires
on_llm_new_tokencallbacks as it generates tokens - The handler captures each token and publishes it (to Redis or to the Activity's
custom_data) - The original
ainvoke()result is returned unchanged — the Activity completes normally - The client receives tokens via Redis subscription or Temporal Query polling
This design avoids forking upstream langgraph-temporal code. The StreamingNodeWrapper is transparent to _execute_node_impl.