Skip to content

Configuration

Dataclasses for configuring the LangGraph-Temporal integration.

ActivityOptions

ActivityOptions dataclass

ActivityOptions(
    start_to_close_timeout: timedelta = (
        lambda: timedelta(seconds=300)
    )(),
    heartbeat_timeout: timedelta | None = None,
    schedule_to_close_timeout: timedelta | None = None,
)

Per-node Activity configuration options.

Attributes:

Name Type Description
start_to_close_timeout timedelta

Maximum time for a single Activity execution.

heartbeat_timeout timedelta | None

Maximum time between heartbeats before Activity is considered failed.

schedule_to_close_timeout timedelta | None

Maximum time from scheduling to completion, including retries.

SubAgentConfig

SubAgentConfig dataclass

SubAgentConfig(
    task_queue: str | None = None,
    sticky_task_queue: str | None = None,
    execution_timeout_seconds: float = 1800.0,
)

Configuration for sub-agent Child Workflow dispatch.

Attributes:

Name Type Description
task_queue str | None

Task queue for sub-agent Child Workflows.

sticky_task_queue str | None

Sticky task queue for sub-agent worker affinity.

execution_timeout_seconds float

Maximum execution time for sub-agent Child Workflows in seconds.

RetryPolicyConfig

RetryPolicyConfig dataclass

RetryPolicyConfig(
    initial_interval_seconds: float = 1.0,
    backoff_coefficient: float = 2.0,
    max_interval_seconds: float = 100.0,
    max_attempts: int = 0,
    non_retryable_error_types: list[str] | None = None,
)

Serializable retry policy configuration.

Maps LangGraph's RetryPolicy to Temporal's RetryPolicy parameters.

Attributes:

Name Type Description
initial_interval_seconds float

Initial retry interval in seconds.

backoff_coefficient float

Multiplier for retry interval.

max_interval_seconds float

Maximum retry interval in seconds.

max_attempts int

Maximum number of retry attempts (0 = unlimited).

non_retryable_error_types list[str] | None

Exception type names that should not be retried.

WorkflowInput

WorkflowInput dataclass

WorkflowInput(
    graph_definition_ref: str,
    input_data: dict[str, Any] | None = None,
    recursion_limit: int = 25,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    restored_state: RestoredState | None = None,
    node_task_queues: dict[str, str] | None = None,
    node_activity_options: dict[str, ActivityOptions]
    | None = None,
    node_retry_policies: dict[str, RetryPolicyConfig]
    | None = None,
    sticky_task_queue: str | None = None,
    use_worker_affinity: bool = False,
    subagent_config: SubAgentConfig | None = None,
)

Input to the LangGraphWorkflow Temporal Workflow.

Attributes:

Name Type Description
graph_definition_ref str

Reference to the registered graph in GraphRegistry.

input_data dict[str, Any] | None

The user-provided input to the graph. None on continue-as-new.

recursion_limit int

Maximum number of steps before stopping.

interrupt_before list[str] | None

Node names to pause before executing.

interrupt_after list[str] | None

Node names to pause after executing.

restored_state RestoredState | None

State from a previous workflow run (continue-as-new).

node_task_queues dict[str, str] | None

Per-node task queue overrides.

node_activity_options dict[str, ActivityOptions] | None

Per-node Activity configuration (serialized).

node_retry_policies dict[str, RetryPolicyConfig] | None

Per-node retry policy configuration.

sticky_task_queue str | None

Sticky task queue for worker affinity. When set, overrides per-node task queue routing for all Activities. Can be set explicitly or discovered at runtime via get_available_task_queue when use_worker_affinity is True.

use_worker_affinity bool

When True, the Workflow calls the get_available_task_queue activity at startup to discover a worker-specific queue. All subsequent Activities are dispatched to that queue. The discovered queue is forwarded on continue-as-new via sticky_task_queue.

subagent_config SubAgentConfig | None

Configuration for sub-agent Child Workflow dispatch.

WorkflowOutput

WorkflowOutput dataclass

WorkflowOutput(channel_values: dict[str, Any], step: int)

Output from the LangGraphWorkflow Temporal Workflow.

Attributes:

Name Type Description
channel_values dict[str, Any]

The final channel state values.

step int

The final step counter.

NodeActivityInput

NodeActivityInput dataclass

NodeActivityInput(
    node_name: str,
    input_state: dict[str, Any],
    graph_definition_ref: str,
    resume_values: list[Any] | None = None,
    task_path: tuple[str | int, ...] = (),
    send_input: Any | None = None,
    triggers: list[str] = list(),
)

Input to the execute_node Activity.

Attributes:

Name Type Description
node_name str

Name of the node to execute.

input_state dict[str, Any]

Serialized channel values for the node's input.

graph_definition_ref str

Reference to retrieve the graph from GraphRegistry.

resume_values list[Any] | None

Resume values for interrupt() support.

task_path tuple[str | int, ...]

Deterministic path for ordering writes.

send_input Any | None

Input override for dynamic Send (PUSH) tasks.

triggers list[str]

Channel names that triggered this node.

NodeActivityOutput

NodeActivityOutput dataclass

NodeActivityOutput(
    node_name: str,
    writes: list[tuple[str, Any]],
    triggers: list[str] = list(),
    task_path: tuple[str | int, ...] = (),
    interrupts: list[dict[str, Any]] | None = None,
    push_sends: list[dict[str, Any]] | None = None,
    command: dict[str, Any] | None = None,
    custom_data: list[Any] | None = None,
    child_workflow_requests: list[dict[str, Any]]
    | None = None,
)

Output from the execute_node Activity.

Provides enough information for the Workflow to construct a WritesProtocol-compatible object for apply_writes.

Attributes:

Name Type Description
node_name str

Name of the node that executed.

writes list[tuple[str, Any]]

Channel writes produced by the node.

triggers list[str]

Channels that triggered this node.

task_path tuple[str | int, ...]

Deterministic path for ordering writes.

interrupts list[dict[str, Any]] | None

Interrupt payloads if node called interrupt().

push_sends list[dict[str, Any]] | None

Dynamic Send objects emitted during execution.

command dict[str, Any] | None

Command metadata if node returned a Command.

child_workflow_requests list[dict[str, Any]] | None

Requests for Child Workflow dispatch (e.g., sub-agent invocations collected via context variable).

StateQueryResult

StateQueryResult dataclass

StateQueryResult(
    channel_values: dict[str, Any],
    channel_versions: dict[str, Any],
    versions_seen: dict[str, dict[str, Any]],
    step: int,
    status: str,
    interrupts: list[dict[str, Any]],
)

Result of querying workflow state.

Attributes:

Name Type Description
channel_values dict[str, Any]

Current channel state values.

channel_versions dict[str, Any]

Channel version tracking dict.

versions_seen dict[str, dict[str, Any]]

Per-node channel versions seen.

step int

Current step counter.

status str

Workflow status string.

interrupts list[dict[str, Any]]

Any pending interrupts.

StreamQueryResult

StreamQueryResult dataclass

StreamQueryResult(events: list[Any], next_cursor: int)

Result of polling the stream buffer.

Attributes:

Name Type Description
events list[Any]

Stream events since the cursor.

next_cursor int

Cursor position for the next poll.

StateUpdatePayload

StateUpdatePayload dataclass

StateUpdatePayload(writes: list[tuple[str, Any]])

Payload for update_state Signal.

Attributes:

Name Type Description
writes list[tuple[str, Any]]

Channel writes to apply.

StreamEvent

StreamEvent dataclass

StreamEvent(
    mode: str,
    data: Any,
    node_name: str | None = None,
    step: int = 0,
)

A single stream event.

Attributes:

Name Type Description
mode str

Stream mode (values, updates, custom, messages).

data Any

Event data payload.

node_name str | None

Node that produced the event, if applicable.

step int

Step at which the event was produced.

RestoredState

RestoredState dataclass

RestoredState(
    checkpoint: dict[str, Any],
    step: int,
    sticky_task_queue: str | None = None,
)

State carried across continue-as-new boundaries.

Attributes:

Name Type Description
checkpoint dict[str, Any]

The serialized Checkpoint TypedDict.

step int

The current step counter.

sticky_task_queue str | None

Sticky task queue name for worker affinity, preserved across continue-as-new.