Skip to content

Metrics

Execution metrics for monitoring LangGraph workflows on Temporal.

MetricsReporter

MetricsReporter

MetricsReporter(
    *, workflow_id: str = "", enable_otel: bool = False
)

Collects and reports LangGraph-specific execution metrics.

Metrics are emitted via structured logging by default. When OpenTelemetry is available, metrics are also recorded as OTel measurements.

Attributes:

Name Type Description
node_durations dict[str, list[float]]

Per-node execution duration history (ms).

step_count int

Total steps executed.

interrupt_count int

Total interrupts encountered.

Source code in langgraph/temporal/metrics.py
def __init__(
    self,
    *,
    workflow_id: str = "",
    enable_otel: bool = False,
) -> None:
    self.workflow_id = workflow_id
    self.node_durations: dict[str, list[float]] = defaultdict(list)
    self.step_count: int = 0
    self.interrupt_count: int = 0
    self._enable_otel = enable_otel
    self._meter: Any = None

    if enable_otel:
        self._setup_otel()

record_node_execution

record_node_execution(
    node_name: str,
    duration_ms: float,
    *,
    success: bool = True,
) -> None

Record a node execution duration.

Parameters:

Name Type Description Default
node_name str

Name of the executed node.

required
duration_ms float

Execution duration in milliseconds.

required
success bool

Whether the execution succeeded.

True
Source code in langgraph/temporal/metrics.py
def record_node_execution(
    self,
    node_name: str,
    duration_ms: float,
    *,
    success: bool = True,
) -> None:
    """Record a node execution duration.

    Args:
        node_name: Name of the executed node.
        duration_ms: Execution duration in milliseconds.
        success: Whether the execution succeeded.
    """
    self.node_durations[node_name].append(duration_ms)

    logger.info(
        "node_execution",
        extra={
            "workflow_id": self.workflow_id,
            "node_name": node_name,
            "duration_ms": duration_ms,
            "success": success,
        },
    )

    if self._enable_otel and self._meter:
        self._node_duration_histogram.record(
            duration_ms,
            attributes={
                "node_name": node_name,
                "workflow_id": self.workflow_id,
                "success": str(success),
            },
        )

record_step_completion

record_step_completion(
    step: int, channel_count: int = 0
) -> None

Record a workflow step completion.

Parameters:

Name Type Description Default
step int

The step number that completed.

required
channel_count int

Number of channels with values.

0
Source code in langgraph/temporal/metrics.py
def record_step_completion(
    self,
    step: int,
    channel_count: int = 0,
) -> None:
    """Record a workflow step completion.

    Args:
        step: The step number that completed.
        channel_count: Number of channels with values.
    """
    self.step_count = step

    logger.info(
        "step_completion",
        extra={
            "workflow_id": self.workflow_id,
            "step": step,
            "channel_count": channel_count,
        },
    )

    if self._enable_otel and self._meter:
        self._step_counter.add(
            1,
            attributes={"workflow_id": self.workflow_id},
        )

record_interrupt

record_interrupt(
    node_name: str, interrupt_type: str = "node"
) -> None

Record an interrupt event.

Parameters:

Name Type Description Default
node_name str

Name of the node that triggered the interrupt.

required
interrupt_type str

Type of interrupt (node, before, after).

'node'
Source code in langgraph/temporal/metrics.py
def record_interrupt(
    self,
    node_name: str,
    interrupt_type: str = "node",
) -> None:
    """Record an interrupt event.

    Args:
        node_name: Name of the node that triggered the interrupt.
        interrupt_type: Type of interrupt (node, before, after).
    """
    self.interrupt_count += 1

    logger.info(
        "interrupt",
        extra={
            "workflow_id": self.workflow_id,
            "node_name": node_name,
            "interrupt_type": interrupt_type,
        },
    )

    if self._enable_otel and self._meter:
        self._interrupt_counter.add(
            1,
            attributes={
                "workflow_id": self.workflow_id,
                "node_name": node_name,
                "interrupt_type": interrupt_type,
            },
        )

measure_node

measure_node(node_name: str) -> Any

Context manager to measure node execution duration.

Usage
with reporter.measure_node("agent"):
    result = await execute_node(...)
Source code in langgraph/temporal/metrics.py
@contextmanager
def measure_node(self, node_name: str) -> Any:
    """Context manager to measure node execution duration.

    Usage:
        ```python
        with reporter.measure_node("agent"):
            result = await execute_node(...)
        ```
    """
    start = time.monotonic()
    try:
        yield
    finally:
        duration_ms = (time.monotonic() - start) * 1000
        self.record_node_execution(node_name, duration_ms)

get_summary

get_summary() -> dict[str, Any]

Get a summary of collected metrics.

Returns:

Type Description
dict[str, Any]

Dictionary with metrics summary.

Source code in langgraph/temporal/metrics.py
def get_summary(self) -> dict[str, Any]:
    """Get a summary of collected metrics.

    Returns:
        Dictionary with metrics summary.
    """
    summary: dict[str, Any] = {
        "workflow_id": self.workflow_id,
        "step_count": self.step_count,
        "interrupt_count": self.interrupt_count,
        "nodes": {},
    }

    for node_name, durations in self.node_durations.items():
        summary["nodes"][node_name] = {
            "count": len(durations),
            "avg_duration_ms": sum(durations) / len(durations) if durations else 0,
            "max_duration_ms": max(durations) if durations else 0,
            "min_duration_ms": min(durations) if durations else 0,
        }

    return summary