Skip to content

TemporalCheckpointSaver

Read-oriented checkpoint saver that queries Temporal Workflows for state.

TemporalCheckpointSaver

TemporalCheckpointSaver(client: Client)

Bases: BaseCheckpointSaver

CheckpointSaver that reads state from Temporal Workflows.

This is a read-oriented adapter. Writes are handled implicitly by Temporal's event history. The primary use case is enabling get_state() and get_state_history() to work transparently.

Parameters:

Name Type Description Default
client Client

A Temporal client instance.

required
Source code in langgraph/temporal/checkpoint.py
def __init__(self, client: TemporalClient) -> None:
    super().__init__()
    self.client = client

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

Retrieve current state by querying the Temporal Workflow.

Source code in langgraph/temporal/checkpoint.py
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
    """Retrieve current state by querying the Temporal Workflow."""
    import asyncio

    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            raise RuntimeError("Use aget_tuple in async context")
        return loop.run_until_complete(self.aget_tuple(config))
    except RuntimeError:
        return None

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

Async version of get_tuple.

Source code in langgraph/temporal/checkpoint.py
async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
    """Async version of get_tuple."""
    thread_id = config["configurable"]["thread_id"]
    handle = self.client.get_workflow_handle(workflow_id=thread_id)

    try:
        state = await handle.query(LangGraphWorkflow.get_current_state)
    except RPCError as e:
        if e.status == RPCStatusCode.NOT_FOUND:
            return None
        raise

    checkpoint = Checkpoint(
        v=4,
        id=str(state.step),
        ts=datetime.now(timezone.utc).isoformat(),
        channel_values=state.channel_values,
        channel_versions=state.channel_versions,
        versions_seen=state.versions_seen,
    )
    metadata = CheckpointMetadata(
        source="loop",
        step=state.step,
        parents={},
    )

    return CheckpointTuple(
        config=config,
        checkpoint=checkpoint,
        metadata=metadata,
    )

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[CheckpointTuple]

List checkpoints from Temporal event history.

Currently returns at most the current state. Full event history traversal (including continue-as-new chains) is planned for v0.3.

Source code in langgraph/temporal/checkpoint.py
def list(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[CheckpointTuple]:
    """List checkpoints from Temporal event history.

    Currently returns at most the current state. Full event history
    traversal (including continue-as-new chains) is planned for v0.3.
    """
    if config is None:
        return

    try:
        result = self.get_tuple(config)
        if result:
            yield result
    except Exception:
        return

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> AsyncIterator[CheckpointTuple]

Async version of list.

Source code in langgraph/temporal/checkpoint.py
async def alist(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> AsyncIterator[CheckpointTuple]:
    """Async version of list."""
    if config is None:
        return

    try:
        result = await self.aget_tuple(config)
        if result:
            yield result
    except Exception:
        return

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

No-op. Temporal handles state persistence via event history.

Source code in langgraph/temporal/checkpoint.py
def put(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig:
    """No-op. Temporal handles state persistence via event history."""
    return config

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig

No-op. Temporal handles state persistence via event history.

Source code in langgraph/temporal/checkpoint.py
async def aput(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: ChannelVersions,
) -> RunnableConfig:
    """No-op. Temporal handles state persistence via event history."""
    return config

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

No-op. Temporal handles write persistence via event history.

Source code in langgraph/temporal/checkpoint.py
def put_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None:
    """No-op. Temporal handles write persistence via event history."""

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

No-op. Temporal handles write persistence via event history.

Source code in langgraph/temporal/checkpoint.py
async def aput_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None:
    """No-op. Temporal handles write persistence via event history."""