TemporalDeepAgent¶
The primary entry point for running Deep Agents on Temporal.
TemporalDeepAgent¶
TemporalDeepAgent
¶
TemporalDeepAgent(
agent: Pregel,
client: Client,
*,
task_queue: str = "deep-agents",
use_worker_affinity: bool = False,
worker_queue_file: str | None = None,
subagent_task_queue: str | None = None,
subagent_execution_timeout: timedelta | None = None,
node_activity_options: dict[str, ActivityOptions]
| None = None,
node_retry_policies: dict[str, RetryPolicyConfig]
| None = None,
enable_token_streaming: bool = False,
redis_stream_backend: RedisStreamBackend | None = None,
workflow_execution_timeout: timedelta | None = None,
workflow_run_timeout: timedelta | None = None,
stream_backend: StreamBackend | None = None,
)
Wraps a Deep Agent for durable execution on Temporal.
Composes TemporalGraph for standard LangGraph-to-Temporal mapping,
adding Deep Agent-specific behavior:
- Worker affinity via worker-specific task queues
- Sub-agent dispatch via Child Workflows
- Tool-level human-in-the-loop via interrupt detection
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
Pregel
|
A compiled Pregel graph (output of |
required |
client
|
Client
|
A Temporal client instance. |
required |
task_queue
|
str
|
Default task queue for the workflow and activities. |
'deep-agents'
|
use_worker_affinity
|
bool
|
When True, the Workflow discovers a worker-specific task queue at startup and pins all Activities to that worker. Follows the Temporal worker-specific task queues pattern. |
False
|
worker_queue_file
|
str | None
|
Path to persist the worker-specific queue name. On restart, the worker re-registers on the same queue so that in-flight Activities resume on this worker. If None, a new queue name is generated each time. |
None
|
subagent_task_queue
|
str | None
|
Task queue for sub-agent Child Workflows.
Defaults to |
None
|
subagent_execution_timeout
|
timedelta | None
|
Maximum execution time for sub-agent Child Workflows. |
None
|
node_activity_options
|
dict[str, ActivityOptions] | None
|
Per-node Activity configuration overrides. |
None
|
node_retry_policies
|
dict[str, RetryPolicyConfig] | None
|
Per-node retry policy configuration. Keys are
node names (e.g., |
None
|
enable_token_streaming
|
bool
|
When True, wraps graph nodes with
|
False
|
redis_stream_backend
|
RedisStreamBackend | None
|
Optional |
None
|
workflow_execution_timeout
|
timedelta | None
|
Maximum time for entire workflow execution including retries. |
None
|
workflow_run_timeout
|
timedelta | None
|
Maximum time for a single workflow run. |
None
|
stream_backend
|
StreamBackend | None
|
Backend for streaming events. |
None
|
Source code in deepagent_temporal/agent.py
ainvoke
async
¶
Execute the Deep Agent as a Temporal Workflow.
Source code in deepagent_temporal/agent.py
astream
async
¶
astream(
input: Any,
config: dict[str, Any] | None = None,
*,
stream_mode: str = "values",
**kwargs: Any,
) -> AsyncIterator[Any]
Stream Deep Agent execution events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input
|
Any
|
Input to the agent. |
required |
config
|
dict[str, Any] | None
|
RunnableConfig with thread_id and other settings. |
None
|
stream_mode
|
str
|
One of |
'values'
|
**kwargs
|
Any
|
Additional arguments passed to TemporalGraph.astream. |
{}
|
Source code in deepagent_temporal/agent.py
astart
async
¶
Start a Deep Agent Workflow (non-blocking).
Source code in deepagent_temporal/agent.py
get_state
async
¶
resume
async
¶
create_worker
¶
Create a Temporal Worker configured for Deep Agent execution.
When use_worker_affinity is True, returns a WorkerGroup with
two workers: one on the shared queue (Workflows + discovery) and
one on a worker-specific queue (node Activities).
If worker_queue_file was set, the worker-specific queue name is
persisted to disk so a restarted worker re-registers on the same
queue (preserving affinity for in-flight Activities).
When enable_token_streaming is True, wraps graph nodes with
StreamingNodeWrapper before registering the worker.
Source code in deepagent_temporal/agent.py
local
async
classmethod
¶
local(
agent: Pregel,
*,
task_queue: str = "deep-agents",
**kwargs: Any,
) -> TemporalDeepAgent
Factory for local development with Temporal test server.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
Pregel
|
A compiled Pregel graph instance. |
required |
task_queue
|
str
|
Default task queue name. |
'deep-agents'
|
**kwargs
|
Any
|
Additional arguments passed to TemporalDeepAgent. |
{}
|
Returns:
| Type | Description |
|---|---|
TemporalDeepAgent
|
A TemporalDeepAgent configured with the local test server client. |
Source code in deepagent_temporal/agent.py
create_temporal_deep_agent¶
create_temporal_deep_agent
¶
create_temporal_deep_agent(
agent: Pregel,
client: Client,
*,
task_queue: str = "deep-agents",
use_worker_affinity: bool = False,
worker_queue_file: str | None = None,
subagent_task_queue: str | None = None,
subagent_execution_timeout: timedelta | None = None,
node_activity_options: dict[str, ActivityOptions]
| None = None,
node_retry_policies: dict[str, RetryPolicyConfig]
| None = None,
enable_token_streaming: bool = False,
redis_stream_backend: RedisStreamBackend | None = None,
workflow_execution_timeout: timedelta | None = None,
workflow_run_timeout: timedelta | None = None,
stream_backend: StreamBackend | None = None,
) -> TemporalDeepAgent
Factory function to create a TemporalDeepAgent.
Convenience wrapper around TemporalDeepAgent constructor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
Pregel
|
A compiled Pregel graph (output of |
required |
client
|
Client
|
A Temporal client instance. |
required |
task_queue
|
str
|
Default task queue. |
'deep-agents'
|
use_worker_affinity
|
bool
|
Enable worker-specific task queue affinity. |
False
|
worker_queue_file
|
str | None
|
Path to persist the worker-specific queue name. |
None
|
subagent_task_queue
|
str | None
|
Task queue for sub-agent Child Workflows. |
None
|
subagent_execution_timeout
|
timedelta | None
|
Max execution time for sub-agents. |
None
|
node_activity_options
|
dict[str, ActivityOptions] | None
|
Per-node Activity configuration. |
None
|
node_retry_policies
|
dict[str, RetryPolicyConfig] | None
|
Per-node retry policy configuration. |
None
|
enable_token_streaming
|
bool
|
Enable token-level LLM streaming. |
False
|
redis_stream_backend
|
RedisStreamBackend | None
|
Redis backend for real-time token delivery. |
None
|
workflow_execution_timeout
|
timedelta | None
|
Max time for entire workflow. |
None
|
workflow_run_timeout
|
timedelta | None
|
Max time for a single workflow run. |
None
|
stream_backend
|
StreamBackend | None
|
Backend for streaming events. |
None
|
Returns:
| Type | Description |
|---|---|
TemporalDeepAgent
|
A configured TemporalDeepAgent instance. |