Skip to content

TemporalGraph

The primary entry point for running LangGraph graphs on Temporal.

TemporalGraph

TemporalGraph(
    graph: Pregel,
    client: Client,
    *,
    task_queue: str = "langgraph-default",
    node_task_queues: dict[str, str] | None = None,
    node_activity_options: dict[str, ActivityOptions]
    | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
)

Wraps a compiled LangGraph graph for execution on Temporal.

This is the primary entry point for using LangGraph with Temporal. It preserves the familiar invoke() / stream() API while executing the graph as a Temporal Workflow with durable execution guarantees.

Parameters:

Name Type Description Default
graph Pregel

A compiled Pregel graph (output of StateGraph.compile()).

required
client Client

A Temporal client instance.

required
task_queue str

Default task queue for Activities.

'langgraph-default'
node_task_queues dict[str, str] | None

Per-node task queue overrides.

None
node_activity_options dict[str, ActivityOptions] | None

Per-node Activity configuration.

None
workflow_execution_timeout timedelta | None

Maximum time for the entire workflow execution including retries and continue-as-new.

None
workflow_run_timeout timedelta | None

Maximum time for a single workflow run.

None
stream_backend StreamBackend | None

Backend for streaming events.

None
Source code in langgraph/temporal/graph.py
def __init__(
    self,
    graph: Pregel,
    client: TemporalClient,
    *,
    task_queue: str = "langgraph-default",
    node_task_queues: dict[str, str] | None = None,
    node_activity_options: dict[str, ActivityOptions] | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
) -> None:
    self.graph = graph
    self.client = client
    self.task_queue = task_queue
    self.node_task_queues = node_task_queues or {}
    self.node_activity_options = node_activity_options or {}
    self.workflow_execution_timeout = workflow_execution_timeout
    self.workflow_run_timeout = workflow_run_timeout
    self.stream_backend = stream_backend or PollingStreamBackend()

    # Register graph in the GraphRegistry
    self._graph_ref = GraphRegistry.get_instance().register(graph)

ainvoke async

ainvoke(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, Any]

Execute the graph as a Temporal Workflow and return the result.

Parameters:

Name Type Description Default
input Any

Input to the graph.

required
config dict[str, Any] | None

RunnableConfig with thread_id and other settings.

None
interrupt_before list[str] | None

Node names to pause before executing.

None
interrupt_after list[str] | None

Node names to pause after executing.

None

Returns:

Type Description
dict[str, Any]

The final channel state values.

Source code in langgraph/temporal/graph.py
async def ainvoke(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the graph as a Temporal Workflow and return the result.

    Args:
        input: Input to the graph.
        config: RunnableConfig with thread_id and other settings.
        interrupt_before: Node names to pause before executing.
        interrupt_after: Node names to pause after executing.

    Returns:
        The final channel state values.
    """
    workflow_id = self._get_workflow_id(config)
    workflow_input = self._build_workflow_input(
        input,
        config,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
    )

    result: WorkflowOutput = await self.client.execute_workflow(
        LangGraphWorkflow.run,
        workflow_input,
        id=workflow_id,
        task_queue=self.task_queue,
        execution_timeout=self.workflow_execution_timeout,
        run_timeout=self.workflow_run_timeout,
    )

    return result.channel_values

invoke

invoke(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, Any]

Execute the graph as a Temporal Workflow and return the result.

Synchronous wrapper around ainvoke().

Source code in langgraph/temporal/graph.py
def invoke(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the graph as a Temporal Workflow and return the result.

    Synchronous wrapper around `ainvoke()`.
    """
    return asyncio.get_event_loop().run_until_complete(
        self.ainvoke(
            input,
            config,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            **kwargs,
        )
    )

astream async

astream(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> AsyncIterator[Any]

Stream graph execution events from a Temporal Workflow.

Parameters:

Name Type Description Default
input Any

Input to the graph.

required
config dict[str, Any] | None

RunnableConfig with thread_id and other settings.

None
stream_mode str

Stream mode (values, updates, custom, messages).

'values'

Yields:

Type Description
AsyncIterator[Any]

Stream events matching the requested mode.

Source code in langgraph/temporal/graph.py
async def astream(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> AsyncIterator[Any]:
    """Stream graph execution events from a Temporal Workflow.

    Args:
        input: Input to the graph.
        config: RunnableConfig with thread_id and other settings.
        stream_mode: Stream mode (values, updates, custom, messages).

    Yields:
        Stream events matching the requested mode.
    """
    handle = await self.astart(input, config, **kwargs)

    if isinstance(self.stream_backend, PollingStreamBackend):
        async for event in self.stream_backend.poll_stream(
            handle,
            LangGraphWorkflow.get_stream_buffer,
        ):
            if hasattr(event, "mode") and event.mode == stream_mode:
                yield event.data
            elif not hasattr(event, "mode"):
                yield event

stream

stream(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> Iterator[Any]

Stream graph execution events (synchronous).

Synchronous wrapper around astream().

Source code in langgraph/temporal/graph.py
def stream(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> Iterator[Any]:
    """Stream graph execution events (synchronous).

    Synchronous wrapper around `astream()`.
    """

    async def _collect() -> list[Any]:
        events = []
        async for event in self.astream(
            input, config, stream_mode=stream_mode, **kwargs
        ):
            events.append(event)
        return events

    events = asyncio.get_event_loop().run_until_complete(_collect())
    yield from events

astart async

astart(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
) -> WorkflowHandle

Start a Workflow without waiting for completion.

Parameters:

Name Type Description Default
input Any

Input to the graph.

required
config dict[str, Any] | None

RunnableConfig with thread_id and other settings.

None

Returns:

Type Description
WorkflowHandle

A Temporal WorkflowHandle for querying/signaling the workflow.

Source code in langgraph/temporal/graph.py
async def astart(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
) -> WorkflowHandle:
    """Start a Workflow without waiting for completion.

    Args:
        input: Input to the graph.
        config: RunnableConfig with thread_id and other settings.

    Returns:
        A Temporal WorkflowHandle for querying/signaling the workflow.
    """
    workflow_id = self._get_workflow_id(config)
    workflow_input = self._build_workflow_input(
        input,
        config,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
    )

    handle = await self.client.start_workflow(
        LangGraphWorkflow.run,
        workflow_input,
        id=workflow_id,
        task_queue=self.task_queue,
        execution_timeout=self.workflow_execution_timeout,
        run_timeout=self.workflow_run_timeout,
    )

    return handle

get_state async

get_state(config: dict[str, Any]) -> dict[str, Any]

Query the Workflow for current state.

Parameters:

Name Type Description Default
config dict[str, Any]

RunnableConfig with thread_id.

required

Returns:

Type Description
dict[str, Any]

The current state query result.

Source code in langgraph/temporal/graph.py
async def get_state(self, config: dict[str, Any]) -> dict[str, Any]:
    """Query the Workflow for current state.

    Args:
        config: RunnableConfig with thread_id.

    Returns:
        The current state query result.
    """
    workflow_id = self._get_workflow_id(config)
    handle = self.client.get_workflow_handle(workflow_id)
    result = await handle.query(LangGraphWorkflow.get_current_state)
    return {
        "values": result.channel_values,
        "step": result.step,
        "status": result.status,
        "interrupts": result.interrupts,
    }

get_state_history async

get_state_history(
    config: dict[str, Any],
) -> list[dict[str, Any]]

Retrieve state history from the Temporal Workflow.

Currently returns the current state only. In the future, this will traverse the continue-as-new chain for full history.

Parameters:

Name Type Description Default
config dict[str, Any]

RunnableConfig with thread_id.

required

Returns:

Type Description
list[dict[str, Any]]

List of state snapshots.

Source code in langgraph/temporal/graph.py
async def get_state_history(self, config: dict[str, Any]) -> list[dict[str, Any]]:
    """Retrieve state history from the Temporal Workflow.

    Currently returns the current state only. In the future, this will
    traverse the continue-as-new chain for full history.

    Args:
        config: RunnableConfig with thread_id.

    Returns:
        List of state snapshots.
    """
    state = await self.get_state(config)
    return [state]

update_state async

update_state(
    config: dict[str, Any],
    values: dict[str, Any],
    *,
    as_node: str | None = None,
) -> None

Send a state update Signal to a running Workflow.

Parameters:

Name Type Description Default
config dict[str, Any]

RunnableConfig with thread_id.

required
values dict[str, Any]

Channel values to update.

required
as_node str | None

Node name to attribute the update to (for trigger tracking).

None
Source code in langgraph/temporal/graph.py
async def update_state(
    self,
    config: dict[str, Any],
    values: dict[str, Any],
    *,
    as_node: str | None = None,
) -> None:
    """Send a state update Signal to a running Workflow.

    Args:
        config: RunnableConfig with thread_id.
        values: Channel values to update.
        as_node: Node name to attribute the update to (for trigger tracking).
    """
    workflow_id = self._get_workflow_id(config)
    handle = self.client.get_workflow_handle(workflow_id)
    writes = [(k, v) for k, v in values.items()]
    await handle.signal(
        LangGraphWorkflow.update_state_signal,
        StateUpdatePayload(writes=writes),
    )

resume async

resume(config: dict[str, Any], value: Any) -> None

Send a resume Signal to a paused Workflow.

Parameters:

Name Type Description Default
config dict[str, Any]

RunnableConfig with thread_id.

required
value Any

The resume value to send.

required
Source code in langgraph/temporal/graph.py
async def resume(self, config: dict[str, Any], value: Any) -> None:
    """Send a resume Signal to a paused Workflow.

    Args:
        config: RunnableConfig with thread_id.
        value: The resume value to send.
    """
    workflow_id = self._get_workflow_id(config)
    handle = self.client.get_workflow_handle(workflow_id)
    await handle.signal(LangGraphWorkflow.resume_signal, value)

create_worker

create_worker(**kwargs: Any) -> Any

Create a Temporal Worker configured for this graph.

Returns:

Type Description
Any

A Temporal Worker instance ready to run.

Source code in langgraph/temporal/graph.py
def create_worker(self, **kwargs: Any) -> Any:
    """Create a Temporal Worker configured for this graph.

    Returns:
        A Temporal Worker instance ready to run.
    """
    return create_worker(
        self.graph,
        self.client,
        self.task_queue,
        **kwargs,
    )

local async classmethod

local(
    graph: Pregel,
    *,
    task_queue: str = "langgraph-default",
    **kwargs: Any,
) -> TemporalGraph

Create a TemporalGraph using Temporal's local test server.

This is a convenience factory for local development and testing. It starts an in-process Temporal test server, removing the need for a full Temporal server deployment.

Parameters:

Name Type Description Default
graph Pregel

A compiled Pregel graph instance.

required
task_queue str

Default task queue name.

'langgraph-default'
**kwargs Any

Additional arguments passed to TemporalGraph.

{}

Returns:

Type Description
TemporalGraph

A TemporalGraph configured with the local test server client.

Example
temporal_graph = await TemporalGraph.local(compiled_graph)
result = await temporal_graph.ainvoke({"messages": ["hello"]})
Source code in langgraph/temporal/graph.py
@classmethod
async def local(
    cls,
    graph: Pregel,
    *,
    task_queue: str = "langgraph-default",
    **kwargs: Any,
) -> TemporalGraph:
    """Create a TemporalGraph using Temporal's local test server.

    This is a convenience factory for local development and testing.
    It starts an in-process Temporal test server, removing the need
    for a full Temporal server deployment.

    Args:
        graph: A compiled Pregel graph instance.
        task_queue: Default task queue name.
        **kwargs: Additional arguments passed to TemporalGraph.

    Returns:
        A TemporalGraph configured with the local test server client.

    Example:
        ```python
        temporal_graph = await TemporalGraph.local(compiled_graph)
        result = await temporal_graph.ainvoke({"messages": ["hello"]})
        ```
    """
    from temporalio.testing import WorkflowEnvironment

    env = await WorkflowEnvironment.start_local()
    return cls(
        graph,
        env.client,
        task_queue=task_queue,
        **kwargs,
    )