Configuration¶
Dataclasses for configuring the LangGraph-Temporal integration.
ActivityOptions¶
ActivityOptions
dataclass
¶
ActivityOptions(
start_to_close_timeout: timedelta = (
lambda: timedelta(seconds=300)
)(),
heartbeat_timeout: timedelta | None = None,
schedule_to_close_timeout: timedelta | None = None,
)
Per-node Activity configuration options.
Attributes:
| Name | Type | Description |
|---|---|---|
start_to_close_timeout |
timedelta
|
Maximum time for a single Activity execution. |
heartbeat_timeout |
timedelta | None
|
Maximum time between heartbeats before Activity is considered failed. |
schedule_to_close_timeout |
timedelta | None
|
Maximum time from scheduling to completion, including retries. |
SubAgentConfig¶
SubAgentConfig
dataclass
¶
SubAgentConfig(
task_queue: str | None = None,
sticky_task_queue: str | None = None,
execution_timeout_seconds: float = 1800.0,
)
Configuration for sub-agent Child Workflow dispatch.
Attributes:
| Name | Type | Description |
|---|---|---|
task_queue |
str | None
|
Task queue for sub-agent Child Workflows. |
sticky_task_queue |
str | None
|
Sticky task queue for sub-agent worker affinity. |
execution_timeout_seconds |
float
|
Maximum execution time for sub-agent Child Workflows in seconds. |
RetryPolicyConfig¶
RetryPolicyConfig
dataclass
¶
RetryPolicyConfig(
initial_interval_seconds: float = 1.0,
backoff_coefficient: float = 2.0,
max_interval_seconds: float = 100.0,
max_attempts: int = 0,
non_retryable_error_types: list[str] | None = None,
)
Serializable retry policy configuration.
Maps LangGraph's RetryPolicy to Temporal's RetryPolicy parameters.
Attributes:
| Name | Type | Description |
|---|---|---|
initial_interval_seconds |
float
|
Initial retry interval in seconds. |
backoff_coefficient |
float
|
Multiplier for retry interval. |
max_interval_seconds |
float
|
Maximum retry interval in seconds. |
max_attempts |
int
|
Maximum number of retry attempts (0 = unlimited). |
non_retryable_error_types |
list[str] | None
|
Exception type names that should not be retried. |
WorkflowInput¶
WorkflowInput
dataclass
¶
WorkflowInput(
graph_definition_ref: str,
input_data: dict[str, Any] | None = None,
recursion_limit: int = 25,
interrupt_before: list[str] | None = None,
interrupt_after: list[str] | None = None,
restored_state: RestoredState | None = None,
node_task_queues: dict[str, str] | None = None,
node_activity_options: dict[str, ActivityOptions]
| None = None,
node_retry_policies: dict[str, RetryPolicyConfig]
| None = None,
sticky_task_queue: str | None = None,
use_worker_affinity: bool = False,
subagent_config: SubAgentConfig | None = None,
)
Input to the LangGraphWorkflow Temporal Workflow.
Attributes:
| Name | Type | Description |
|---|---|---|
graph_definition_ref |
str
|
Reference to the registered graph in GraphRegistry. |
input_data |
dict[str, Any] | None
|
The user-provided input to the graph. None on continue-as-new. |
recursion_limit |
int
|
Maximum number of steps before stopping. |
interrupt_before |
list[str] | None
|
Node names to pause before executing. |
interrupt_after |
list[str] | None
|
Node names to pause after executing. |
restored_state |
RestoredState | None
|
State from a previous workflow run (continue-as-new). |
node_task_queues |
dict[str, str] | None
|
Per-node task queue overrides. |
node_activity_options |
dict[str, ActivityOptions] | None
|
Per-node Activity configuration (serialized). |
node_retry_policies |
dict[str, RetryPolicyConfig] | None
|
Per-node retry policy configuration. |
sticky_task_queue |
str | None
|
Sticky task queue for worker affinity. When set,
overrides per-node task queue routing for all Activities.
Can be set explicitly or discovered at runtime via
|
use_worker_affinity |
bool
|
When True, the Workflow calls the
|
subagent_config |
SubAgentConfig | None
|
Configuration for sub-agent Child Workflow dispatch. |
WorkflowOutput¶
WorkflowOutput
dataclass
¶
Output from the LangGraphWorkflow Temporal Workflow.
Attributes:
| Name | Type | Description |
|---|---|---|
channel_values |
dict[str, Any]
|
The final channel state values. |
step |
int
|
The final step counter. |
NodeActivityInput¶
NodeActivityInput
dataclass
¶
NodeActivityInput(
node_name: str,
input_state: dict[str, Any],
graph_definition_ref: str,
resume_values: list[Any] | None = None,
task_path: tuple[str | int, ...] = (),
send_input: Any | None = None,
triggers: list[str] = list(),
)
Input to the execute_node Activity.
Attributes:
| Name | Type | Description |
|---|---|---|
node_name |
str
|
Name of the node to execute. |
input_state |
dict[str, Any]
|
Serialized channel values for the node's input. |
graph_definition_ref |
str
|
Reference to retrieve the graph from GraphRegistry. |
resume_values |
list[Any] | None
|
Resume values for interrupt() support. |
task_path |
tuple[str | int, ...]
|
Deterministic path for ordering writes. |
send_input |
Any | None
|
Input override for dynamic Send (PUSH) tasks. |
triggers |
list[str]
|
Channel names that triggered this node. |
NodeActivityOutput¶
NodeActivityOutput
dataclass
¶
NodeActivityOutput(
node_name: str,
writes: list[tuple[str, Any]],
triggers: list[str] = list(),
task_path: tuple[str | int, ...] = (),
interrupts: list[dict[str, Any]] | None = None,
push_sends: list[dict[str, Any]] | None = None,
command: dict[str, Any] | None = None,
custom_data: list[Any] | None = None,
child_workflow_requests: list[dict[str, Any]]
| None = None,
)
Output from the execute_node Activity.
Provides enough information for the Workflow to construct a
WritesProtocol-compatible object for apply_writes.
Attributes:
| Name | Type | Description |
|---|---|---|
node_name |
str
|
Name of the node that executed. |
writes |
list[tuple[str, Any]]
|
Channel writes produced by the node. |
triggers |
list[str]
|
Channels that triggered this node. |
task_path |
tuple[str | int, ...]
|
Deterministic path for ordering writes. |
interrupts |
list[dict[str, Any]] | None
|
Interrupt payloads if node called interrupt(). |
push_sends |
list[dict[str, Any]] | None
|
Dynamic Send objects emitted during execution. |
command |
dict[str, Any] | None
|
Command metadata if node returned a Command. |
child_workflow_requests |
list[dict[str, Any]] | None
|
Requests for Child Workflow dispatch (e.g., sub-agent invocations collected via context variable). |
StateQueryResult¶
StateQueryResult
dataclass
¶
StateQueryResult(
channel_values: dict[str, Any],
channel_versions: dict[str, Any],
versions_seen: dict[str, dict[str, Any]],
step: int,
status: str,
interrupts: list[dict[str, Any]],
)
Result of querying workflow state.
Attributes:
| Name | Type | Description |
|---|---|---|
channel_values |
dict[str, Any]
|
Current channel state values. |
channel_versions |
dict[str, Any]
|
Channel version tracking dict. |
versions_seen |
dict[str, dict[str, Any]]
|
Per-node channel versions seen. |
step |
int
|
Current step counter. |
status |
str
|
Workflow status string. |
interrupts |
list[dict[str, Any]]
|
Any pending interrupts. |
StreamQueryResult¶
StreamQueryResult
dataclass
¶
Result of polling the stream buffer.
Attributes:
| Name | Type | Description |
|---|---|---|
events |
list[Any]
|
Stream events since the cursor. |
next_cursor |
int
|
Cursor position for the next poll. |
StateUpdatePayload¶
StateUpdatePayload
dataclass
¶
Payload for update_state Signal.
Attributes:
| Name | Type | Description |
|---|---|---|
writes |
list[tuple[str, Any]]
|
Channel writes to apply. |
StreamEvent¶
StreamEvent
dataclass
¶
A single stream event.
Attributes:
| Name | Type | Description |
|---|---|---|
mode |
str
|
Stream mode (values, updates, custom, messages). |
data |
Any
|
Event data payload. |
node_name |
str | None
|
Node that produced the event, if applicable. |
step |
int
|
Step at which the event was produced. |
RestoredState¶
RestoredState
dataclass
¶
State carried across continue-as-new boundaries.
Attributes:
| Name | Type | Description |
|---|---|---|
checkpoint |
dict[str, Any]
|
The serialized Checkpoint TypedDict. |
step |
int
|
The current step counter. |
sticky_task_queue |
str | None
|
Sticky task queue name for worker affinity, preserved across continue-as-new. |