Large Payloads¶
Temporal has a default payload size limit of 2MB. AI agent workflows can exceed this — for example, when channel state includes large documents, images, or conversation histories.
LargePayloadCodec automatically offloads oversized payloads to an external blob store.
Setup¶
from temporalio.client import Client
from temporalio.converter import DataConverter
from langgraph.temporal._codec import LargePayloadCodec, InMemoryBlobStore
# For production, implement BlobStore with S3, GCS, etc.
store = InMemoryBlobStore() # testing only
codec = LargePayloadCodec(store=store)
client = await Client.connect(
"localhost:7233",
data_converter=DataConverter(payload_codec=codec),
)
How it works¶
- Before a payload enters Temporal's event history, the codec checks its size
- If the payload exceeds the threshold (default: 2MB), it's stored in the blob store
- A small reference payload replaces the original in the event history
- On decode, the codec fetches the original data from the blob store
Custom blob store¶
Implement BlobStore for your storage backend:
from langgraph.temporal._codec import BlobStore
class S3BlobStore(BlobStore):
def __init__(self, bucket: str):
self.bucket = bucket
async def put(self, key: str, data: bytes) -> str:
await s3.put_object(Bucket=self.bucket, Key=key, Body=data)
return key
async def get(self, key: str) -> bytes:
response = await s3.get_object(Bucket=self.bucket, Key=key)
return await response["Body"].read()
async def delete(self, key: str) -> None:
await s3.delete_object(Bucket=self.bucket, Key=key)
Configuring the threshold¶
Combining with encryption¶
Chain codecs by applying encryption first, then large payload handling:
from langgraph.temporal.encryption import EncryptionCodec
# Apply both: encrypt first, then offload if large
encryption = EncryptionCodec(key=my_key)
large_payload = LargePayloadCodec(store=store)
# Use Temporal's codec chain
client = await Client.connect(
"localhost:7233",
data_converter=DataConverter(
payload_codec=encryption, # applied first
),
)