Skip to content

Streaming

Stream backends for receiving execution events from Temporal Workflows.

StreamBackend

StreamBackend

Bases: ABC

Abstract base class for stream backends.

Stream backends provide the mechanism for transmitting execution events from the Temporal Workflow to the client across process boundaries.

publish abstractmethod async

publish(workflow_id: str, event: Any) -> None

Publish a stream event.

Parameters:

Name Type Description Default
workflow_id str

The Temporal Workflow ID.

required
event Any

The stream event to publish.

required
Source code in langgraph/temporal/streaming.py
@abstractmethod
async def publish(self, workflow_id: str, event: Any) -> None:
    """Publish a stream event.

    Args:
        workflow_id: The Temporal Workflow ID.
        event: The stream event to publish.
    """

subscribe abstractmethod

subscribe(workflow_id: str) -> AsyncIterator[Any]

Subscribe to stream events for a workflow.

Parameters:

Name Type Description Default
workflow_id str

The Temporal Workflow ID.

required

Yields:

Type Description
AsyncIterator[Any]

Stream events as they arrive.

Source code in langgraph/temporal/streaming.py
@abstractmethod
def subscribe(self, workflow_id: str) -> AsyncIterator[Any]:
    """Subscribe to stream events for a workflow.

    Args:
        workflow_id: The Temporal Workflow ID.

    Yields:
        Stream events as they arrive.
    """

PollingStreamBackend

PollingStreamBackend

PollingStreamBackend(poll_interval: float = 0.1)

Bases: StreamBackend

Stream backend using Temporal Query with cursor-based pagination.

This is the default streaming implementation. It polls the Workflow's get_stream_buffer query at regular intervals to retrieve new events.

Parameters:

Name Type Description Default
poll_interval float

Seconds between polls. Default 0.1.

0.1
Source code in langgraph/temporal/streaming.py
def __init__(self, poll_interval: float = 0.1) -> None:
    self.poll_interval = poll_interval

poll_stream async

poll_stream(
    handle: Any,
    query_method: Any,
    *,
    timeout: float | None = None,
) -> AsyncIterator[Any]

Poll a workflow's stream buffer using Temporal Query.

Parameters:

Name Type Description Default
handle Any

The Temporal WorkflowHandle.

required
query_method Any

The query method reference (e.g., LangGraphWorkflow.get_stream_buffer).

required
timeout float | None

Maximum time to poll before stopping.

None

Yields:

Type Description
AsyncIterator[Any]

Stream events as they arrive.

Source code in langgraph/temporal/streaming.py
async def poll_stream(
    self,
    handle: Any,
    query_method: Any,
    *,
    timeout: float | None = None,
) -> AsyncIterator[Any]:
    """Poll a workflow's stream buffer using Temporal Query.

    Args:
        handle: The Temporal WorkflowHandle.
        query_method: The query method reference (e.g.,
            `LangGraphWorkflow.get_stream_buffer`).
        timeout: Maximum time to poll before stopping.

    Yields:
        Stream events as they arrive.
    """
    cursor = 0
    start_time = time.monotonic()

    while True:
        if timeout and (time.monotonic() - start_time) > timeout:
            break

        try:
            result: StreamQueryResult = await handle.query(query_method, cursor)
            for event in result.events:
                yield event
            cursor = result.next_cursor
        except Exception:
            # Workflow may have completed
            break

        await asyncio.sleep(self.poll_interval)