Skip to content

TemporalDeepAgent

The primary entry point for running Deep Agents on Temporal.

TemporalDeepAgent

TemporalDeepAgent

TemporalDeepAgent(
    agent: Pregel,
    client: Client,
    *,
    task_queue: str = "deep-agents",
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    subagent_task_queue: str | None = None,
    subagent_execution_timeout: timedelta | None = None,
    node_activity_options: dict[str, ActivityOptions]
    | None = None,
    node_retry_policies: dict[str, RetryPolicyConfig]
    | None = None,
    enable_token_streaming: bool = False,
    redis_stream_backend: RedisStreamBackend | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
)

Wraps a Deep Agent for durable execution on Temporal.

Composes TemporalGraph for standard LangGraph-to-Temporal mapping, adding Deep Agent-specific behavior: - Worker affinity via worker-specific task queues - Sub-agent dispatch via Child Workflows - Tool-level human-in-the-loop via interrupt detection

Parameters:

Name Type Description Default
agent Pregel

A compiled Pregel graph (output of create_deep_agent()).

required
client Client

A Temporal client instance.

required
task_queue str

Default task queue for the workflow and activities.

'deep-agents'
use_worker_affinity bool

When True, the Workflow discovers a worker-specific task queue at startup and pins all Activities to that worker. Follows the Temporal worker-specific task queues pattern.

False
worker_queue_file str | None

Path to persist the worker-specific queue name. On restart, the worker re-registers on the same queue so that in-flight Activities resume on this worker. If None, a new queue name is generated each time.

None
subagent_task_queue str | None

Task queue for sub-agent Child Workflows. Defaults to task_queue if not specified.

None
subagent_execution_timeout timedelta | None

Maximum execution time for sub-agent Child Workflows.

None
node_activity_options dict[str, ActivityOptions] | None

Per-node Activity configuration overrides.

None
node_retry_policies dict[str, RetryPolicyConfig] | None

Per-node retry policy configuration. Keys are node names (e.g., call_model, tools), values are RetryPolicyConfig instances. Set on the compiled graph so langgraph-temporal includes them in the WorkflowInput. Use recommended_retry_policies() for sensible defaults.

None
enable_token_streaming bool

When True, wraps graph nodes with StreamingNodeWrapper at worker creation time, enabling token-level capture via LangChain callback injection. Tokens are delivered through stream_mode="tokens" in astream().

False
redis_stream_backend RedisStreamBackend | None

Optional RedisStreamBackend for real-time token delivery. When set, tokens are published to Redis Streams as they arrive from the LLM (Phase 2). Without Redis, tokens are buffered and delivered after Activity completion (Phase 1).

None
workflow_execution_timeout timedelta | None

Maximum time for entire workflow execution including retries.

None
workflow_run_timeout timedelta | None

Maximum time for a single workflow run.

None
stream_backend StreamBackend | None

Backend for streaming events.

None
Source code in deepagent_temporal/agent.py
def __init__(
    self,
    agent: Pregel,
    client: TemporalClient,
    *,
    task_queue: str = "deep-agents",
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    subagent_task_queue: str | None = None,
    subagent_execution_timeout: timedelta | None = None,
    node_activity_options: dict[str, ActivityOptions] | None = None,
    node_retry_policies: dict[str, RetryPolicyConfig] | None = None,
    enable_token_streaming: bool = False,
    redis_stream_backend: RedisStreamBackend | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
) -> None:
    # Attach retry policies to the graph so _build_workflow_input picks
    # them up via hasattr(graph, "retry_policies").
    if node_retry_policies is not None:
        agent.retry_policies = node_retry_policies  # type: ignore[attr-defined]

    self._enable_token_streaming = enable_token_streaming
    self._redis_stream_backend = redis_stream_backend

    self._temporal_graph = TemporalGraph(
        agent,
        client,
        task_queue=task_queue,
        node_activity_options=node_activity_options,
        workflow_execution_timeout=workflow_execution_timeout,
        workflow_run_timeout=workflow_run_timeout,
        stream_backend=stream_backend,
    )
    self._use_worker_affinity = use_worker_affinity
    self._worker_queue_file = worker_queue_file
    self._task_queue = task_queue
    self._subagent_task_queue = subagent_task_queue or task_queue
    self._subagent_execution_timeout = subagent_execution_timeout or timedelta(
        minutes=30
    )

ainvoke async

ainvoke(
    input: Any,
    config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> dict[str, Any]

Execute the Deep Agent as a Temporal Workflow.

Source code in deepagent_temporal/agent.py
async def ainvoke(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the Deep Agent as a Temporal Workflow."""
    config = self._inject_temporal_config(config)
    return await self._temporal_graph.ainvoke(input, config, **kwargs)

astream async

astream(
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> AsyncIterator[Any]

Stream Deep Agent execution events.

Parameters:

Name Type Description Default
input Any

Input to the agent.

required
config dict[str, Any] | None

RunnableConfig with thread_id and other settings.

None
stream_mode str

One of "values", "updates", "custom", or "tokens". The "tokens" mode yields TokenEvent dicts from LLM streaming (requires enable_token_streaming).

'values'
**kwargs Any

Additional arguments passed to TemporalGraph.astream.

{}
Source code in deepagent_temporal/agent.py
async def astream(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    *,
    stream_mode: str = "values",
    **kwargs: Any,
) -> AsyncIterator[Any]:
    """Stream Deep Agent execution events.

    Args:
        input: Input to the agent.
        config: RunnableConfig with thread_id and other settings.
        stream_mode: One of ``"values"``, ``"updates"``, ``"custom"``,
            or ``"tokens"``. The ``"tokens"`` mode yields ``TokenEvent``
            dicts from LLM streaming (requires ``enable_token_streaming``).
        **kwargs: Additional arguments passed to TemporalGraph.astream.
    """
    config = self._inject_temporal_config(config)

    if stream_mode == "tokens" and self._redis_stream_backend is not None:
        # Phase 2: Real-time token delivery via Redis Streams
        handle = await self._temporal_graph.astart(input, config, **kwargs)
        workflow_id = handle.id
        async for event in self._redis_stream_backend.subscribe(workflow_id):
            yield event
    elif stream_mode == "tokens":
        # Phase 1: Token events arrive via custom_data after Activity
        # completes. Filter for token-type events in custom mode.
        async for event in self._temporal_graph.astream(
            input, config, stream_mode="custom", **kwargs
        ):
            if isinstance(event, dict) and event.get("type") == "token":
                yield event
    else:
        async for event in self._temporal_graph.astream(
            input, config, stream_mode=stream_mode, **kwargs
        ):
            yield event

astart async

astart(
    input: Any,
    config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> WorkflowHandle

Start a Deep Agent Workflow (non-blocking).

Source code in deepagent_temporal/agent.py
async def astart(
    self,
    input: Any,
    config: dict[str, Any] | None = None,
    **kwargs: Any,
) -> WorkflowHandle:
    """Start a Deep Agent Workflow (non-blocking)."""
    config = self._inject_temporal_config(config)
    return await self._temporal_graph.astart(input, config, **kwargs)

get_state async

get_state(config: dict[str, Any]) -> dict[str, Any]

Query current agent state.

Source code in deepagent_temporal/agent.py
async def get_state(self, config: dict[str, Any]) -> dict[str, Any]:
    """Query current agent state."""
    return await self._temporal_graph.get_state(config)

resume async

resume(config: dict[str, Any], value: Any) -> None

Send a resume Signal for HITL approval.

Source code in deepagent_temporal/agent.py
async def resume(self, config: dict[str, Any], value: Any) -> None:
    """Send a resume Signal for HITL approval."""
    await self._temporal_graph.resume(config, value)

create_worker

create_worker(**kwargs: Any) -> Any

Create a Temporal Worker configured for Deep Agent execution.

When use_worker_affinity is True, returns a WorkerGroup with two workers: one on the shared queue (Workflows + discovery) and one on a worker-specific queue (node Activities).

If worker_queue_file was set, the worker-specific queue name is persisted to disk so a restarted worker re-registers on the same queue (preserving affinity for in-flight Activities).

When enable_token_streaming is True, wraps graph nodes with StreamingNodeWrapper before registering the worker.

Source code in deepagent_temporal/agent.py
def create_worker(self, **kwargs: Any) -> Any:
    """Create a Temporal Worker configured for Deep Agent execution.

    When `use_worker_affinity` is True, returns a `WorkerGroup` with
    two workers: one on the shared queue (Workflows + discovery) and
    one on a worker-specific queue (node Activities).

    If `worker_queue_file` was set, the worker-specific queue name is
    persisted to disk so a restarted worker re-registers on the same
    queue (preserving affinity for in-flight Activities).

    When `enable_token_streaming` is True, wraps graph nodes with
    ``StreamingNodeWrapper`` before registering the worker.
    """
    from langgraph.temporal.worker import create_worker

    graph = self._temporal_graph.graph

    if self._enable_token_streaming:
        from deepagent_temporal.activity import wrap_graph_for_streaming

        wrap_graph_for_streaming(
            graph,
            redis_backend=self._redis_stream_backend,
        )

    return create_worker(
        graph,
        self._temporal_graph.client,
        self._task_queue,
        use_worker_affinity=self._use_worker_affinity,
        worker_queue_file=self._worker_queue_file,
        **kwargs,
    )

local async classmethod

local(
    agent: Pregel,
    *,
    task_queue: str = "deep-agents",
    **kwargs: Any,
) -> TemporalDeepAgent

Factory for local development with Temporal test server.

Parameters:

Name Type Description Default
agent Pregel

A compiled Pregel graph instance.

required
task_queue str

Default task queue name.

'deep-agents'
**kwargs Any

Additional arguments passed to TemporalDeepAgent.

{}

Returns:

Type Description
TemporalDeepAgent

A TemporalDeepAgent configured with the local test server client.

Source code in deepagent_temporal/agent.py
@classmethod
async def local(
    cls,
    agent: Pregel,
    *,
    task_queue: str = "deep-agents",
    **kwargs: Any,
) -> TemporalDeepAgent:
    """Factory for local development with Temporal test server.

    Args:
        agent: A compiled Pregel graph instance.
        task_queue: Default task queue name.
        **kwargs: Additional arguments passed to TemporalDeepAgent.

    Returns:
        A TemporalDeepAgent configured with the local test server client.
    """
    from temporalio.testing import WorkflowEnvironment

    env = await WorkflowEnvironment.start_local()
    return cls(agent, env.client, task_queue=task_queue, **kwargs)

create_temporal_deep_agent

create_temporal_deep_agent

create_temporal_deep_agent(
    agent: Pregel,
    client: Client,
    *,
    task_queue: str = "deep-agents",
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    subagent_task_queue: str | None = None,
    subagent_execution_timeout: timedelta | None = None,
    node_activity_options: dict[str, ActivityOptions]
    | None = None,
    node_retry_policies: dict[str, RetryPolicyConfig]
    | None = None,
    enable_token_streaming: bool = False,
    redis_stream_backend: RedisStreamBackend | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
) -> TemporalDeepAgent

Factory function to create a TemporalDeepAgent.

Convenience wrapper around TemporalDeepAgent constructor.

Parameters:

Name Type Description Default
agent Pregel

A compiled Pregel graph (output of create_deep_agent()).

required
client Client

A Temporal client instance.

required
task_queue str

Default task queue.

'deep-agents'
use_worker_affinity bool

Enable worker-specific task queue affinity.

False
worker_queue_file str | None

Path to persist the worker-specific queue name.

None
subagent_task_queue str | None

Task queue for sub-agent Child Workflows.

None
subagent_execution_timeout timedelta | None

Max execution time for sub-agents.

None
node_activity_options dict[str, ActivityOptions] | None

Per-node Activity configuration.

None
node_retry_policies dict[str, RetryPolicyConfig] | None

Per-node retry policy configuration.

None
enable_token_streaming bool

Enable token-level LLM streaming.

False
redis_stream_backend RedisStreamBackend | None

Redis backend for real-time token delivery.

None
workflow_execution_timeout timedelta | None

Max time for entire workflow.

None
workflow_run_timeout timedelta | None

Max time for a single workflow run.

None
stream_backend StreamBackend | None

Backend for streaming events.

None

Returns:

Type Description
TemporalDeepAgent

A configured TemporalDeepAgent instance.

Source code in deepagent_temporal/agent.py
def create_temporal_deep_agent(
    agent: Pregel,
    client: TemporalClient,
    *,
    task_queue: str = "deep-agents",
    use_worker_affinity: bool = False,
    worker_queue_file: str | None = None,
    subagent_task_queue: str | None = None,
    subagent_execution_timeout: timedelta | None = None,
    node_activity_options: dict[str, ActivityOptions] | None = None,
    node_retry_policies: dict[str, RetryPolicyConfig] | None = None,
    enable_token_streaming: bool = False,
    redis_stream_backend: RedisStreamBackend | None = None,
    workflow_execution_timeout: timedelta | None = None,
    workflow_run_timeout: timedelta | None = None,
    stream_backend: StreamBackend | None = None,
) -> TemporalDeepAgent:
    """Factory function to create a TemporalDeepAgent.

    Convenience wrapper around TemporalDeepAgent constructor.

    Args:
        agent: A compiled Pregel graph (output of `create_deep_agent()`).
        client: A Temporal client instance.
        task_queue: Default task queue.
        use_worker_affinity: Enable worker-specific task queue affinity.
        worker_queue_file: Path to persist the worker-specific queue name.
        subagent_task_queue: Task queue for sub-agent Child Workflows.
        subagent_execution_timeout: Max execution time for sub-agents.
        node_activity_options: Per-node Activity configuration.
        node_retry_policies: Per-node retry policy configuration.
        enable_token_streaming: Enable token-level LLM streaming.
        redis_stream_backend: Redis backend for real-time token delivery.
        workflow_execution_timeout: Max time for entire workflow.
        workflow_run_timeout: Max time for a single workflow run.
        stream_backend: Backend for streaming events.

    Returns:
        A configured TemporalDeepAgent instance.
    """
    return TemporalDeepAgent(
        agent,
        client,
        task_queue=task_queue,
        use_worker_affinity=use_worker_affinity,
        worker_queue_file=worker_queue_file,
        subagent_task_queue=subagent_task_queue,
        subagent_execution_timeout=subagent_execution_timeout,
        node_activity_options=node_activity_options,
        node_retry_policies=node_retry_policies,
        enable_token_streaming=enable_token_streaming,
        redis_stream_backend=redis_stream_backend,
        workflow_execution_timeout=workflow_execution_timeout,
        workflow_run_timeout=workflow_run_timeout,
        stream_backend=stream_backend,
    )