Skip to content

Worker

Helpers for creating Temporal Workers configured for LangGraph execution.

create_worker

create_worker

create_worker(
    graph: Pregel,
    client: Client,
    task_queue: str = "langgraph-default",
    *,
    use_worker_affinity: bool = False,
    worker_queue_file: Path | str | None = None,
    **kwargs: Any,
) -> Worker | WorkerGroup

Create a Temporal Worker configured for a LangGraph graph.

Registers the LangGraphWorkflow as a Workflow and execute_node / evaluate_conditional_edge as Activities. The graph is registered in the GraphRegistry for Activity-side lookup.

When use_worker_affinity is True, returns a WorkerGroup with two workers following the Temporal worker-specific task queues pattern: - A shared worker on task_queue (Workflows + discovery Activity) - A worker-specific worker on a unique queue (node Activities)

Parameters:

Name Type Description Default
graph Pregel

A compiled Pregel graph instance.

required
client Client

A Temporal client instance.

required
task_queue str

The task queue to listen on.

'langgraph-default'
use_worker_affinity bool

When True, create a dual-worker setup for worker-specific task queue affinity.

False
worker_queue_file Path | str | None

Path to persist the worker-specific queue name. On restart, the worker re-registers on the same queue so that in-flight Activities resume on this worker. If None, a new queue name is generated each time (no restart recovery).

None
**kwargs Any

Additional Worker configuration (e.g., max_concurrent_activities, max_concurrent_workflow_tasks).

{}

Returns:

Type Description
Worker | WorkerGroup

A configured Temporal Worker or WorkerGroup instance.

Source code in langgraph/temporal/worker.py
def create_worker(
    graph: Pregel,
    client: TemporalClient,
    task_queue: str = "langgraph-default",
    *,
    use_worker_affinity: bool = False,
    worker_queue_file: Path | str | None = None,
    **kwargs: Any,
) -> Worker | WorkerGroup:
    """Create a Temporal Worker configured for a LangGraph graph.

    Registers the `LangGraphWorkflow` as a Workflow and `execute_node` /
    `evaluate_conditional_edge` as Activities. The graph is registered in
    the `GraphRegistry` for Activity-side lookup.

    When `use_worker_affinity` is True, returns a `WorkerGroup` with two
    workers following the Temporal worker-specific task queues pattern:
    - A shared worker on `task_queue` (Workflows + discovery Activity)
    - A worker-specific worker on a unique queue (node Activities)

    Args:
        graph: A compiled Pregel graph instance.
        client: A Temporal client instance.
        task_queue: The task queue to listen on.
        use_worker_affinity: When True, create a dual-worker setup for
            worker-specific task queue affinity.
        worker_queue_file: Path to persist the worker-specific queue name.
            On restart, the worker re-registers on the same queue so that
            in-flight Activities resume on this worker. If None, a new
            queue name is generated each time (no restart recovery).
        **kwargs: Additional Worker configuration (e.g.,
            `max_concurrent_activities`, `max_concurrent_workflow_tasks`).

    Returns:
        A configured Temporal Worker or WorkerGroup instance.
    """
    # Ensure graph is registered
    GraphRegistry.get_instance().register(graph)

    if not use_worker_affinity:
        return Worker(
            client,
            task_queue=task_queue,
            workflows=[LangGraphWorkflow],
            activities=[
                execute_node,
                dynamic_execute_node,
                evaluate_conditional_edge,
            ],
            **kwargs,
        )

    # Worker-affinity mode: two workers following the Temporal
    # worker-specific task queues pattern.
    worker_specific_queue = _resolve_worker_queue(task_queue, worker_queue_file)
    set_worker_task_queue(worker_specific_queue)

    # Worker 1: shared queue — Workflows + get_available_task_queue
    shared_worker = Worker(
        client,
        task_queue=task_queue,
        workflows=[LangGraphWorkflow],
        activities=[get_available_task_queue],
        **kwargs,
    )

    # Worker 2: worker-specific queue — node execution Activities
    specific_worker = Worker(
        client,
        task_queue=worker_specific_queue,
        activities=[
            execute_node,
            dynamic_execute_node,
            evaluate_conditional_edge,
        ],
    )

    return WorkerGroup([shared_worker, specific_worker])

WorkerGroup

WorkerGroup

WorkerGroup(workers: list[Worker])

Manages multiple Temporal Workers as a single async context manager.

Used for worker-affinity mode where two workers are needed: one on the shared queue (Workflows + discovery Activity) and one on a worker-specific queue (node execution Activities).

Source code in langgraph/temporal/worker.py
def __init__(self, workers: list[Worker]) -> None:
    self._workers = workers

__aenter__ async

__aenter__() -> WorkerGroup
Source code in langgraph/temporal/worker.py
async def __aenter__(self) -> WorkerGroup:
    for w in self._workers:
        await w.__aenter__()
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None
Source code in langgraph/temporal/worker.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    for w in reversed(self._workers):
        await w.__aexit__(exc_type, exc_val, exc_tb)

run async

run() -> None

Run all workers. Blocks until shutdown.

Source code in langgraph/temporal/worker.py
async def run(self) -> None:
    """Run all workers. Blocks until shutdown."""
    import asyncio

    await asyncio.gather(*[w.run() for w in self._workers])