Skip to content

Large Payloads

Codec for handling payloads that exceed Temporal's size limit.

LargePayloadCodec

LargePayloadCodec

LargePayloadCodec(
    store: BlobStore,
    *,
    size_threshold: int = DEFAULT_SIZE_THRESHOLD,
    prefix: str = "langgraph-temporal",
)

Bases: PayloadCodec

Temporal PayloadCodec that offloads large payloads to a blob store.

Payloads exceeding size_threshold bytes are stored in the blob store, and replaced with a reference payload in Temporal's event history.

Parameters:

Name Type Description Default
store BlobStore

A BlobStore implementation for storing large payloads.

required
size_threshold int

Payloads larger than this (in bytes) are offloaded. Defaults to 2MB.

DEFAULT_SIZE_THRESHOLD
prefix str

Key prefix for stored blobs.

'langgraph-temporal'
Source code in langgraph/temporal/_codec.py
def __init__(
    self,
    store: BlobStore,
    *,
    size_threshold: int = DEFAULT_SIZE_THRESHOLD,
    prefix: str = "langgraph-temporal",
) -> None:
    self._store = store
    self._size_threshold = size_threshold
    self._prefix = prefix

encode async

encode(payloads: Sequence[Payload]) -> list[Payload]

Encode payloads, offloading large ones to blob store.

Source code in langgraph/temporal/_codec.py
async def encode(self, payloads: Sequence[Payload]) -> list[Payload]:
    """Encode payloads, offloading large ones to blob store."""
    result = []
    for payload in payloads:
        serialized = payload.SerializeToString()
        if len(serialized) > self._size_threshold:
            result.append(await self._offload_payload(payload, serialized))
        else:
            result.append(payload)
    return result

decode async

decode(payloads: Sequence[Payload]) -> list[Payload]

Decode payloads, fetching offloaded ones from blob store.

Source code in langgraph/temporal/_codec.py
async def decode(self, payloads: Sequence[Payload]) -> list[Payload]:
    """Decode payloads, fetching offloaded ones from blob store."""
    result = []
    for payload in payloads:
        encoding = payload.metadata.get("encoding", b"").decode()
        if encoding == self.ENCODING:
            result.append(await self._fetch_payload(payload))
        else:
            result.append(payload)
    return result

BlobStore

BlobStore

Bases: ABC

Abstract blob store for large payload offloading.

put abstractmethod async

put(key: str, data: bytes) -> str

Store data and return a reference key.

Parameters:

Name Type Description Default
key str

Unique key for the data.

required
data bytes

Raw bytes to store.

required

Returns:

Type Description
str

The key under which the data was stored.

Source code in langgraph/temporal/_codec.py
@abstractmethod
async def put(self, key: str, data: bytes) -> str:
    """Store data and return a reference key.

    Args:
        key: Unique key for the data.
        data: Raw bytes to store.

    Returns:
        The key under which the data was stored.
    """
    ...

get abstractmethod async

get(key: str) -> bytes

Retrieve data by reference key.

Parameters:

Name Type Description Default
key str

The key returned by put().

required

Returns:

Type Description
bytes

The raw bytes.

Raises:

Type Description
KeyError

If the key is not found.

Source code in langgraph/temporal/_codec.py
@abstractmethod
async def get(self, key: str) -> bytes:
    """Retrieve data by reference key.

    Args:
        key: The key returned by `put()`.

    Returns:
        The raw bytes.

    Raises:
        KeyError: If the key is not found.
    """
    ...

delete abstractmethod async

delete(key: str) -> None

Delete data by reference key.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in langgraph/temporal/_codec.py
@abstractmethod
async def delete(self, key: str) -> None:
    """Delete data by reference key.

    Args:
        key: The key to delete.
    """
    ...

InMemoryBlobStore

InMemoryBlobStore

InMemoryBlobStore()

Bases: BlobStore

In-memory blob store for testing.

Not suitable for production use - data is lost on process restart.

Source code in langgraph/temporal/_codec.py
def __init__(self) -> None:
    self._store: dict[str, bytes] = {}