Streaming¶
Token-level streaming components.
TokenEvent¶
TokenEvent
dataclass
¶
A single token or token batch from LLM streaming.
Attributes:
| Name | Type | Description |
|---|---|---|
token |
str
|
The token text. |
node_name |
str
|
The graph node that produced this token. |
index |
int
|
Token position within this LLM call. |
is_final |
bool
|
Whether this is the last token in the LLM call. |
attempt |
int
|
Activity attempt number (for deduplication on retry). |
to_dict
¶
Serialize to a dict safe for Redis Streams.
Redis Streams only accept str, int, float, or
bytes values — not bool. Booleans are converted to
"1"/"0" strings.
Source code in deepagent_temporal/streaming.py
TokenCapturingHandler¶
TokenCapturingHandler
¶
TokenCapturingHandler(
node_name: str,
publish: Callable[[TokenEvent], None],
*,
heartbeat_fn: Callable[..., None] | None = None,
heartbeat_interval: int = 50,
attempt: int = 1,
)
Bases: BaseCallbackHandler
LangChain callback handler that captures LLM tokens.
Intercepts on_llm_new_token events from chat models running
inside a LangGraph node's ainvoke(). Each token is wrapped as a
TokenEvent and forwarded to the configured publish callback.
Modeled on LangGraph's StreamMessagesHandler from
langgraph.pregel._messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_name
|
str
|
Name of the graph node being executed. |
required |
publish
|
Callable[[TokenEvent], None]
|
Callback invoked for each token event. |
required |
heartbeat_fn
|
Callable[..., None] | None
|
Optional callable for Activity heartbeats.
Called every |
None
|
heartbeat_interval
|
int
|
Number of tokens between heartbeats. |
50
|
attempt
|
int
|
Activity attempt number for deduplication. |
1
|
Source code in deepagent_temporal/streaming.py
on_llm_new_token
¶
on_llm_new_token(
token: str,
*,
chunk: ChatGenerationChunk | None = None,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None
Capture a token from the LLM and publish it.
Source code in deepagent_temporal/streaming.py
on_llm_end
¶
on_llm_end(
response: Any,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None
Emit a final token event marking the end of the LLM call.
Source code in deepagent_temporal/streaming.py
on_chat_model_start
¶
on_chat_model_start(
serialized: dict[str, Any],
messages: list[list[Any]],
*,
run_id: UUID,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None
Track LLM call count for SummarizationMiddleware handling.
SummarizationMiddleware may trigger two LLM calls (summarization + actual response). We reset the token index on each new call so the client receives clean token sequences.
Source code in deepagent_temporal/streaming.py
RedisStreamBackend¶
RedisStreamBackend
¶
RedisStreamBackend(
redis_url: str = "redis://localhost:6379",
channel_prefix: str = "deepagent:stream:",
stream_maxlen: int = 5000,
stream_ttl_seconds: int = 300,
)
Real-time token streaming via Redis Streams.
Publishes token events to a Redis Stream keyed by workflow ID.
Clients subscribe via XREAD for real-time delivery. Temporal
handles durable state; Redis handles low-latency token delivery.
Requires redis[hiredis]>=5.0.0 (optional dependency).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
redis_url
|
str
|
Redis connection URL. |
'redis://localhost:6379'
|
channel_prefix
|
str
|
Prefix for Redis Stream keys. |
'deepagent:stream:'
|
stream_maxlen
|
int
|
Approximate max entries per stream ( |
5000
|
stream_ttl_seconds
|
int
|
TTL for stream keys after completion. |
300
|
Source code in deepagent_temporal/streaming.py
publish
async
¶
publish(
workflow_id: str, event: TokenEvent | dict[str, Any]
) -> None
Publish a token event to the Redis Stream.
Silently logs errors on connection failure (graceful degradation).
Source code in deepagent_temporal/streaming.py
publish_complete
async
¶
Publish a stream-complete sentinel and set TTL.
Source code in deepagent_temporal/streaming.py
subscribe
async
¶
subscribe(
workflow_id: str,
*,
last_id: str = "0-0",
block_ms: int = 1000,
) -> AsyncIterator[dict[str, Any]]
Subscribe to token events from a Redis Stream.
Yields token event dicts. Stops on stream_complete sentinel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow_id
|
str
|
The workflow to subscribe to. |
required |
last_id
|
str
|
Redis Stream ID to start reading from. |
'0-0'
|
block_ms
|
int
|
Milliseconds to block on XREAD. |
1000
|
Source code in deepagent_temporal/streaming.py
cleanup
async
¶
Delete the Redis Stream for a workflow.
Source code in deepagent_temporal/streaming.py
StreamingNodeWrapper¶
StreamingNodeWrapper
¶
StreamingNodeWrapper(
original_bound: Any,
node_name: str,
token_sink: Callable[[TokenEvent], None] | None = None,
redis_backend: RedisStreamBackend | None = None,
)
Wraps a LangGraph node's bound runnable to inject token capture.
When ainvoke() is called (by upstream _execute_node_impl), the
wrapper injects a TokenCapturingHandler into the config's callback
list, then delegates to the original runnable. The handler intercepts
on_llm_new_token events from the chat model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
original_bound
|
Any
|
The original node runnable ( |
required |
node_name
|
str
|
Name of the graph node. |
required |
token_sink
|
Callable[[TokenEvent], None] | None
|
Callback invoked for each |
None
|
redis_backend
|
RedisStreamBackend | None
|
Optional Redis backend for real-time publishing. |
None
|
Source code in deepagent_temporal/activity.py
ainvoke
async
¶
Invoke the node with token capture callbacks injected.
Adds a TokenCapturingHandler to config["callbacks"] before
delegating to the original runnable's ainvoke.
Source code in deepagent_temporal/activity.py
Helper Functions¶
create_token_handler
¶
create_token_handler(
node_name: str,
sink: Callable[[TokenEvent], None],
*,
heartbeat_fn: Callable[..., None] | None = None,
heartbeat_interval: int = 50,
attempt: int = 1,
) -> TokenCapturingHandler
Factory for creating a configured TokenCapturingHandler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_name
|
str
|
Name of the graph node being executed. |
required |
sink
|
Callable[[TokenEvent], None]
|
Callback that receives each |
required |
heartbeat_fn
|
Callable[..., None] | None
|
Optional Activity heartbeat callable. |
None
|
heartbeat_interval
|
int
|
Tokens between heartbeats. |
50
|
attempt
|
int
|
Activity attempt number. |
1
|
Source code in deepagent_temporal/streaming.py
wrap_graph_for_streaming
¶
wrap_graph_for_streaming(
graph: Pregel,
*,
token_sink: Callable[[TokenEvent], None] | None = None,
redis_backend: RedisStreamBackend | None = None,
node_names: list[str] | None = None,
) -> Pregel
Wrap graph nodes with StreamingNodeWrapper for token capture.
Modifies the graph in-place by replacing each target node's bound
attribute with a StreamingNodeWrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
Pregel
|
Compiled LangGraph Pregel graph. |
required |
token_sink
|
Callable[[TokenEvent], None] | None
|
Callback for token events (Phase 1 / fallback). |
None
|
redis_backend
|
RedisStreamBackend | None
|
Redis backend for real-time publishing (Phase 2). |
None
|
node_names
|
list[str] | None
|
Specific nodes to wrap. Defaults to all nodes. |
None
|
Returns:
| Type | Description |
|---|---|
Pregel
|
The modified graph (same object, mutated in-place). |
Source code in deepagent_temporal/activity.py
create_streaming_worker
¶
create_streaming_worker(
graph: Pregel,
client: Client,
task_queue: str = "deep-agents",
*,
redis_url: str | None = None,
redis_stream_backend: RedisStreamBackend | None = None,
use_worker_affinity: bool = False,
worker_queue_file: str | None = None,
node_names: list[str] | None = None,
**kwargs: Any,
) -> Any
Create a Temporal Worker with streaming-enabled graph nodes.
Wraps target graph nodes with StreamingNodeWrapper for token-level
capture, then delegates to langgraph.temporal.worker.create_worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
graph
|
Pregel
|
Compiled LangGraph Pregel graph. |
required |
client
|
Client
|
Temporal client instance. |
required |
task_queue
|
str
|
Default task queue name. |
'deep-agents'
|
redis_url
|
str | None
|
Redis connection URL for real-time streaming.
Convenience alternative to passing a |
None
|
redis_stream_backend
|
RedisStreamBackend | None
|
Pre-configured Redis backend instance. |
None
|
use_worker_affinity
|
bool
|
Enable worker-specific task queue affinity. |
False
|
worker_queue_file
|
str | None
|
Path to persist worker queue name. |
None
|
node_names
|
list[str] | None
|
Specific nodes to enable streaming on. Defaults to all nodes. |
None
|
**kwargs
|
Any
|
Additional arguments passed to |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
A Temporal Worker (or |