Skip to content

Streaming

Token-level streaming components.

TokenEvent

TokenEvent dataclass

TokenEvent(
    token: str,
    node_name: str,
    index: int,
    is_final: bool = False,
    attempt: int = 1,
)

A single token or token batch from LLM streaming.

Attributes:

Name Type Description
token str

The token text.

node_name str

The graph node that produced this token.

index int

Token position within this LLM call.

is_final bool

Whether this is the last token in the LLM call.

attempt int

Activity attempt number (for deduplication on retry).

to_dict

to_dict() -> dict[str, str | int]

Serialize to a dict safe for Redis Streams.

Redis Streams only accept str, int, float, or bytes values — not bool. Booleans are converted to "1"/"0" strings.

Source code in deepagent_temporal/streaming.py
def to_dict(self) -> dict[str, str | int]:
    """Serialize to a dict safe for Redis Streams.

    Redis Streams only accept ``str``, ``int``, ``float``, or
    ``bytes`` values — not ``bool``. Booleans are converted to
    ``"1"``/``"0"`` strings.
    """
    return {
        "type": "token",
        "token": self.token,
        "node_name": self.node_name,
        "index": self.index,
        "is_final": "1" if self.is_final else "0",
        "attempt": self.attempt,
    }

TokenCapturingHandler

TokenCapturingHandler

TokenCapturingHandler(
    node_name: str,
    publish: Callable[[TokenEvent], None],
    *,
    heartbeat_fn: Callable[..., None] | None = None,
    heartbeat_interval: int = 50,
    attempt: int = 1,
)

Bases: BaseCallbackHandler

LangChain callback handler that captures LLM tokens.

Intercepts on_llm_new_token events from chat models running inside a LangGraph node's ainvoke(). Each token is wrapped as a TokenEvent and forwarded to the configured publish callback.

Modeled on LangGraph's StreamMessagesHandler from langgraph.pregel._messages.

Parameters:

Name Type Description Default
node_name str

Name of the graph node being executed.

required
publish Callable[[TokenEvent], None]

Callback invoked for each token event.

required
heartbeat_fn Callable[..., None] | None

Optional callable for Activity heartbeats. Called every heartbeat_interval tokens.

None
heartbeat_interval int

Number of tokens between heartbeats.

50
attempt int

Activity attempt number for deduplication.

1
Source code in deepagent_temporal/streaming.py
def __init__(
    self,
    node_name: str,
    publish: Callable[[TokenEvent], None],
    *,
    heartbeat_fn: Callable[..., None] | None = None,
    heartbeat_interval: int = 50,
    attempt: int = 1,
) -> None:
    super().__init__()
    self._node_name = node_name
    self._publish = publish
    self._heartbeat_fn = heartbeat_fn
    self._heartbeat_interval = heartbeat_interval
    self._attempt = attempt
    self._index = 0
    self._llm_call_count = 0

token_count property

token_count: int

Number of tokens captured so far.

on_llm_new_token

on_llm_new_token(
    token: str,
    *,
    chunk: ChatGenerationChunk | None = None,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    tags: list[str] | None = None,
    **kwargs: Any,
) -> None

Capture a token from the LLM and publish it.

Source code in deepagent_temporal/streaming.py
def on_llm_new_token(  # type: ignore[override]
    self,
    token: str,
    *,
    chunk: ChatGenerationChunk | None = None,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    tags: list[str] | None = None,
    **kwargs: Any,
) -> None:
    """Capture a token from the LLM and publish it."""
    if not isinstance(chunk, ChatGenerationChunk):
        return

    event = TokenEvent(
        token=token,
        node_name=self._node_name,
        index=self._index,
        attempt=self._attempt,
    )
    self._index += 1
    self._publish(event)

    # Heartbeat every N tokens to prevent Activity timeout
    if self._heartbeat_fn and self._index % self._heartbeat_interval == 0:
        try:
            self._heartbeat_fn(f"node={self._node_name} tokens={self._index}")
        except RuntimeError:
            # Not in Activity context (e.g., unit tests)
            pass

on_llm_end

on_llm_end(
    response: Any,
    *,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    **kwargs: Any,
) -> None

Emit a final token event marking the end of the LLM call.

Source code in deepagent_temporal/streaming.py
def on_llm_end(
    self,
    response: Any,
    *,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    **kwargs: Any,
) -> None:
    """Emit a final token event marking the end of the LLM call."""
    event = TokenEvent(
        token="",
        node_name=self._node_name,
        index=self._index,
        is_final=True,
        attempt=self._attempt,
    )
    self._publish(event)

on_chat_model_start

on_chat_model_start(
    serialized: dict[str, Any],
    messages: list[list[Any]],
    *,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    **kwargs: Any,
) -> None

Track LLM call count for SummarizationMiddleware handling.

SummarizationMiddleware may trigger two LLM calls (summarization + actual response). We reset the token index on each new call so the client receives clean token sequences.

Source code in deepagent_temporal/streaming.py
def on_chat_model_start(
    self,
    serialized: dict[str, Any],
    messages: list[list[Any]],
    *,
    run_id: UUID,
    parent_run_id: UUID | None = None,
    **kwargs: Any,
) -> None:
    """Track LLM call count for SummarizationMiddleware handling.

    SummarizationMiddleware may trigger two LLM calls (summarization
    + actual response). We reset the token index on each new call so
    the client receives clean token sequences.
    """
    self._llm_call_count += 1
    self._index = 0

RedisStreamBackend

RedisStreamBackend

RedisStreamBackend(
    redis_url: str = "redis://localhost:6379",
    channel_prefix: str = "deepagent:stream:",
    stream_maxlen: int = 5000,
    stream_ttl_seconds: int = 300,
)

Real-time token streaming via Redis Streams.

Publishes token events to a Redis Stream keyed by workflow ID. Clients subscribe via XREAD for real-time delivery. Temporal handles durable state; Redis handles low-latency token delivery.

Requires redis[hiredis]>=5.0.0 (optional dependency).

Parameters:

Name Type Description Default
redis_url str

Redis connection URL.

'redis://localhost:6379'
channel_prefix str

Prefix for Redis Stream keys.

'deepagent:stream:'
stream_maxlen int

Approximate max entries per stream (MAXLEN ~).

5000
stream_ttl_seconds int

TTL for stream keys after completion.

300
Source code in deepagent_temporal/streaming.py
def __init__(
    self,
    redis_url: str = "redis://localhost:6379",
    channel_prefix: str = "deepagent:stream:",
    stream_maxlen: int = 5000,
    stream_ttl_seconds: int = 300,
) -> None:
    self._redis_url = redis_url
    self._channel_prefix = channel_prefix
    self._stream_maxlen = stream_maxlen
    self._stream_ttl_seconds = stream_ttl_seconds
    self._redis: Any = None

publish async

publish(
    workflow_id: str, event: TokenEvent | dict[str, Any]
) -> None

Publish a token event to the Redis Stream.

Silently logs errors on connection failure (graceful degradation).

Source code in deepagent_temporal/streaming.py
async def publish(
    self, workflow_id: str, event: TokenEvent | dict[str, Any]
) -> None:
    """Publish a token event to the Redis Stream.

    Silently logs errors on connection failure (graceful degradation).
    """
    try:
        r = await self._get_redis()
        key = self._stream_key(workflow_id)
        data = event.to_dict() if isinstance(event, TokenEvent) else event
        await r.xadd(
            key,
            data,
            maxlen=self._stream_maxlen,
            approximate=True,
        )
    except Exception:
        logger.warning(
            "Failed to publish token event to Redis for workflow %s",
            workflow_id,
            exc_info=True,
        )

publish_complete async

publish_complete(workflow_id: str) -> None

Publish a stream-complete sentinel and set TTL.

Source code in deepagent_temporal/streaming.py
async def publish_complete(self, workflow_id: str) -> None:
    """Publish a stream-complete sentinel and set TTL."""
    try:
        r = await self._get_redis()
        key = self._stream_key(workflow_id)
        await r.xadd(key, {"type": "stream_complete"})
        await r.expire(key, self._stream_ttl_seconds)
    except Exception:
        logger.warning(
            "Failed to publish stream_complete for workflow %s",
            workflow_id,
            exc_info=True,
        )

subscribe async

subscribe(
    workflow_id: str,
    *,
    last_id: str = "0-0",
    block_ms: int = 1000,
) -> AsyncIterator[dict[str, Any]]

Subscribe to token events from a Redis Stream.

Yields token event dicts. Stops on stream_complete sentinel.

Parameters:

Name Type Description Default
workflow_id str

The workflow to subscribe to.

required
last_id str

Redis Stream ID to start reading from.

'0-0'
block_ms int

Milliseconds to block on XREAD.

1000
Source code in deepagent_temporal/streaming.py
async def subscribe(
    self,
    workflow_id: str,
    *,
    last_id: str = "0-0",
    block_ms: int = 1000,
) -> AsyncIterator[dict[str, Any]]:
    """Subscribe to token events from a Redis Stream.

    Yields token event dicts. Stops on ``stream_complete`` sentinel.

    Args:
        workflow_id: The workflow to subscribe to.
        last_id: Redis Stream ID to start reading from.
        block_ms: Milliseconds to block on XREAD.
    """
    r = await self._get_redis()
    key = self._stream_key(workflow_id)
    current_id = last_id

    while True:
        entries = await r.xread({key: current_id}, block=block_ms, count=100)
        if not entries:
            continue
        for _stream_name, messages in entries:
            for msg_id, data in messages:
                current_id = msg_id
                if data.get("type") == "stream_complete":
                    return
                yield data

cleanup async

cleanup(workflow_id: str) -> None

Delete the Redis Stream for a workflow.

Source code in deepagent_temporal/streaming.py
async def cleanup(self, workflow_id: str) -> None:
    """Delete the Redis Stream for a workflow."""
    try:
        r = await self._get_redis()
        await r.delete(self._stream_key(workflow_id))
    except Exception:
        logger.warning(
            "Failed to cleanup Redis stream for workflow %s",
            workflow_id,
            exc_info=True,
        )

close async

close() -> None

Close the Redis connection.

Source code in deepagent_temporal/streaming.py
async def close(self) -> None:
    """Close the Redis connection."""
    if self._redis is not None:
        await self._redis.aclose()
        self._redis = None

StreamingNodeWrapper

StreamingNodeWrapper

StreamingNodeWrapper(
    original_bound: Any,
    node_name: str,
    token_sink: Callable[[TokenEvent], None] | None = None,
    redis_backend: RedisStreamBackend | None = None,
)

Wraps a LangGraph node's bound runnable to inject token capture.

When ainvoke() is called (by upstream _execute_node_impl), the wrapper injects a TokenCapturingHandler into the config's callback list, then delegates to the original runnable. The handler intercepts on_llm_new_token events from the chat model.

Parameters:

Name Type Description Default
original_bound Any

The original node runnable (node.bound).

required
node_name str

Name of the graph node.

required
token_sink Callable[[TokenEvent], None] | None

Callback invoked for each TokenEvent.

None
redis_backend RedisStreamBackend | None

Optional Redis backend for real-time publishing.

None
Source code in deepagent_temporal/activity.py
def __init__(
    self,
    original_bound: Any,
    node_name: str,
    token_sink: Callable[[TokenEvent], None] | None = None,
    redis_backend: RedisStreamBackend | None = None,
) -> None:
    self._original = original_bound
    self._node_name = node_name
    self._token_sink = token_sink
    self._redis_backend = redis_backend

ainvoke async

ainvoke(
    input: Any, config: Any = None, **kwargs: Any
) -> Any

Invoke the node with token capture callbacks injected.

Adds a TokenCapturingHandler to config["callbacks"] before delegating to the original runnable's ainvoke.

Source code in deepagent_temporal/activity.py
async def ainvoke(self, input: Any, config: Any = None, **kwargs: Any) -> Any:
    """Invoke the node with token capture callbacks injected.

    Adds a ``TokenCapturingHandler`` to ``config["callbacks"]`` before
    delegating to the original runnable's ``ainvoke``.
    """
    config = dict(config) if config else {}

    # Determine sink: Redis publish or CONFIG_KEY_STREAM passthrough
    if self._redis_backend is not None:
        sink = self._make_redis_sink()
    elif self._token_sink is not None:
        sink = self._token_sink
    else:
        sink = self._make_stream_handler_sink(config)

    # Get heartbeat function if in Activity context
    heartbeat_fn = self._get_heartbeat_fn()

    # Get attempt number for deduplication
    attempt = self._get_attempt_number()

    handler = TokenCapturingHandler(
        self._node_name,
        sink,
        heartbeat_fn=heartbeat_fn,
        attempt=attempt,
    )

    # Inject callback handler into config
    existing_callbacks = config.get("callbacks") or []
    config["callbacks"] = list(existing_callbacks) + [handler]

    return await self._original.ainvoke(input, config, **kwargs)

Helper Functions

create_token_handler

create_token_handler(
    node_name: str,
    sink: Callable[[TokenEvent], None],
    *,
    heartbeat_fn: Callable[..., None] | None = None,
    heartbeat_interval: int = 50,
    attempt: int = 1,
) -> TokenCapturingHandler

Factory for creating a configured TokenCapturingHandler.

Parameters:

Name Type Description Default
node_name str

Name of the graph node being executed.

required
sink Callable[[TokenEvent], None]

Callback that receives each TokenEvent.

required
heartbeat_fn Callable[..., None] | None

Optional Activity heartbeat callable.

None
heartbeat_interval int

Tokens between heartbeats.

50
attempt int

Activity attempt number.

1
Source code in deepagent_temporal/streaming.py
def create_token_handler(
    node_name: str,
    sink: Callable[[TokenEvent], None],
    *,
    heartbeat_fn: Callable[..., None] | None = None,
    heartbeat_interval: int = 50,
    attempt: int = 1,
) -> TokenCapturingHandler:
    """Factory for creating a configured ``TokenCapturingHandler``.

    Args:
        node_name: Name of the graph node being executed.
        sink: Callback that receives each ``TokenEvent``.
        heartbeat_fn: Optional Activity heartbeat callable.
        heartbeat_interval: Tokens between heartbeats.
        attempt: Activity attempt number.
    """
    return TokenCapturingHandler(
        node_name,
        sink,
        heartbeat_fn=heartbeat_fn,
        heartbeat_interval=heartbeat_interval,
        attempt=attempt,
    )

wrap_graph_for_streaming

wrap_graph_for_streaming(
    graph: Pregel,
    *,
    token_sink: Callable[[TokenEvent], None] | None = None,
    redis_backend: RedisStreamBackend | None = None,
    node_names: list[str] | None = None,
) -> Pregel

Wrap graph nodes with StreamingNodeWrapper for token capture.

Modifies the graph in-place by replacing each target node's bound attribute with a StreamingNodeWrapper.

Parameters:

Name Type Description Default
graph Pregel

Compiled LangGraph Pregel graph.

required
token_sink Callable[[TokenEvent], None] | None

Callback for token events (Phase 1 / fallback).

None
redis_backend RedisStreamBackend | None

Redis backend for real-time publishing (Phase 2).

None
node_names list[str] | None

Specific nodes to wrap. Defaults to all nodes.

None

Returns:

Type Description
Pregel

The modified graph (same object, mutated in-place).

Source code in deepagent_temporal/activity.py
def wrap_graph_for_streaming(
    graph: Pregel,
    *,
    token_sink: Callable[[TokenEvent], None] | None = None,
    redis_backend: RedisStreamBackend | None = None,
    node_names: list[str] | None = None,
) -> Pregel:
    """Wrap graph nodes with ``StreamingNodeWrapper`` for token capture.

    Modifies the graph in-place by replacing each target node's ``bound``
    attribute with a ``StreamingNodeWrapper``.

    Args:
        graph: Compiled LangGraph Pregel graph.
        token_sink: Callback for token events (Phase 1 / fallback).
        redis_backend: Redis backend for real-time publishing (Phase 2).
        node_names: Specific nodes to wrap. Defaults to all nodes.

    Returns:
        The modified graph (same object, mutated in-place).
    """
    targets = node_names or list(graph.nodes.keys())

    for name in targets:
        if name not in graph.nodes:
            logger.warning("Node '%s' not found in graph, skipping wrapper", name)
            continue

        node = graph.nodes[name]
        if isinstance(node.bound, StreamingNodeWrapper):
            continue  # Already wrapped

        node.bound = StreamingNodeWrapper(  # type: ignore[assignment]
            node.bound,
            node_name=name,
            token_sink=token_sink,
            redis_backend=redis_backend,
        )

    return graph

create_streaming_worker

create_streaming_worker(
    graph: Pregel,
    client: Client,
    task_queue: str = "deep-agents",
    *,
    redis_url: str | None = None,
    redis_stream_backend: RedisStreamBackend | None = None,
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    node_names: list[str] | None = None,
    **kwargs: Any,
) -> Any

Create a Temporal Worker with streaming-enabled graph nodes.

Wraps target graph nodes with StreamingNodeWrapper for token-level capture, then delegates to langgraph.temporal.worker.create_worker.

Parameters:

Name Type Description Default
graph Pregel

Compiled LangGraph Pregel graph.

required
client Client

Temporal client instance.

required
task_queue str

Default task queue name.

'deep-agents'
redis_url str | None

Redis connection URL for real-time streaming. Convenience alternative to passing a redis_stream_backend.

None
redis_stream_backend RedisStreamBackend | None

Pre-configured Redis backend instance.

None
use_worker_affinity bool

Enable worker-specific task queue affinity.

False
worker_queue_file str | None

Path to persist worker queue name.

None
node_names list[str] | None

Specific nodes to enable streaming on. Defaults to all nodes.

None
**kwargs Any

Additional arguments passed to create_worker.

{}

Returns:

Type Description
Any

A Temporal Worker (or WorkerGroup if affinity enabled).

Source code in deepagent_temporal/worker.py
def create_streaming_worker(
    graph: Pregel,
    client: TemporalClient,
    task_queue: str = "deep-agents",
    *,
    redis_url: str | None = None,
    redis_stream_backend: RedisStreamBackend | None = None,
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    node_names: list[str] | None = None,
    **kwargs: Any,
) -> Any:
    """Create a Temporal Worker with streaming-enabled graph nodes.

    Wraps target graph nodes with ``StreamingNodeWrapper`` for token-level
    capture, then delegates to ``langgraph.temporal.worker.create_worker``.

    Args:
        graph: Compiled LangGraph Pregel graph.
        client: Temporal client instance.
        task_queue: Default task queue name.
        redis_url: Redis connection URL for real-time streaming.
            Convenience alternative to passing a ``redis_stream_backend``.
        redis_stream_backend: Pre-configured Redis backend instance.
        use_worker_affinity: Enable worker-specific task queue affinity.
        worker_queue_file: Path to persist worker queue name.
        node_names: Specific nodes to enable streaming on.
            Defaults to all nodes.
        **kwargs: Additional arguments passed to ``create_worker``.

    Returns:
        A Temporal Worker (or ``WorkerGroup`` if affinity enabled).
    """
    from langgraph.temporal.worker import create_worker

    backend = redis_stream_backend
    if backend is None and redis_url is not None:
        backend = RedisStreamBackend(redis_url=redis_url)

    wrap_graph_for_streaming(
        graph,
        redis_backend=backend,
        node_names=node_names,
    )

    return create_worker(
        graph,
        client,
        task_queue,
        use_worker_affinity=use_worker_affinity,
        worker_queue_file=worker_queue_file,
        **kwargs,
    )