Skip to content


All = Literal['*'] module-attribute

Special value to indicate that graph should interrupt on all nodes.

StreamMode = Literal['values', 'updates', 'debug', 'messages', 'custom'] module-attribute

How the stream method should emit outputs.

  • 'values': Emit all values of the state for each step.
  • 'updates': Emit only the node name(s) and updates that were returned by the node(s) after each step.
  • 'debug': Emit debug events for each step.
  • 'messages': Emit LLM messages token-by-token.
  • 'custom': Emit custom output write: StreamWriter kwarg of each node.

StreamWriter = Callable[[Any], None] module-attribute

Callable that accepts a single argument and writes it to the output stream. Always injected into nodes if requested as a keyword argument, but it's a no-op when not using stream_mode="custom".


Bases: NamedTuple

Configuration for retrying nodes.

Source code in libs/langgraph/langgraph/
class RetryPolicy(NamedTuple):
    """Configuration for retrying nodes."""

    initial_interval: float = 0.5
    """Amount of time that must elapse before the first retry occurs. In seconds."""
    backoff_factor: float = 2.0
    """Multiplier by which the interval increases after each retry."""
    max_interval: float = 128.0
    """Maximum amount of time that may elapse between retries. In seconds."""
    max_attempts: int = 3
    """Maximum number of attempts to make before giving up, including the first."""
    jitter: bool = True
    """Whether to add random jitter to the interval between retries."""
    retry_on: Union[
        Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]
    ] = default_retry_on
    """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry."""

initial_interval: float = 0.5 class-attribute instance-attribute

Amount of time that must elapse before the first retry occurs. In seconds.

backoff_factor: float = 2.0 class-attribute instance-attribute

Multiplier by which the interval increases after each retry.

max_interval: float = 128.0 class-attribute instance-attribute

Maximum amount of time that may elapse between retries. In seconds.

max_attempts: int = 3 class-attribute instance-attribute

Maximum number of attempts to make before giving up, including the first.

jitter: bool = True class-attribute instance-attribute

Whether to add random jitter to the interval between retries.

retry_on: Union[Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]] = default_retry_on class-attribute instance-attribute

List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.


Bases: NamedTuple

Configuration for caching nodes.

Source code in libs/langgraph/langgraph/
class CachePolicy(NamedTuple):
    """Configuration for caching nodes."""


Interrupt dataclass

Source code in libs/langgraph/langgraph/
class Interrupt:
    value: Any
    when: Literal["during"] = "during"


Bases: NamedTuple

Source code in libs/langgraph/langgraph/
class PregelTask(NamedTuple):
    id: str
    name: str
    path: tuple[Union[str, int], ...]
    error: Optional[Exception] = None
    interrupts: tuple[Interrupt, ...] = ()
    state: Union[None, RunnableConfig, "StateSnapshot"] = None
    result: Optional[dict[str, Any]] = None


Bases: NamedTuple

Source code in libs/langgraph/langgraph/
class PregelExecutableTask(NamedTuple):
    name: str
    input: Any
    proc: Runnable
    writes: deque[tuple[str, Any]]
    config: RunnableConfig
    triggers: list[str]
    retry_policy: Optional[RetryPolicy]
    cache_policy: Optional[CachePolicy]
    id: str
    path: tuple[Union[str, int], ...]
    scheduled: bool = False


Bases: NamedTuple

Snapshot of the state of the graph at the beginning of a step.

Source code in libs/langgraph/langgraph/
class StateSnapshot(NamedTuple):
    """Snapshot of the state of the graph at the beginning of a step."""

    values: Union[dict[str, Any], Any]
    """Current values of channels"""
    next: tuple[str, ...]
    """The name of the node to execute in each task for this step."""
    config: RunnableConfig
    """Config used to fetch this snapshot"""
    metadata: Optional[CheckpointMetadata]
    """Metadata associated with this snapshot"""
    created_at: Optional[str]
    """Timestamp of snapshot creation"""
    parent_config: Optional[RunnableConfig]
    """Config used to fetch the parent snapshot, if any"""
    tasks: tuple[PregelTask, ...]
    """Tasks to execute in this step. If already attempted, may contain an error."""

values: Union[dict[str, Any], Any] instance-attribute

Current values of channels

next: tuple[str, ...] instance-attribute

The name of the node to execute in each task for this step.

config: RunnableConfig instance-attribute

Config used to fetch this snapshot

metadata: Optional[CheckpointMetadata] instance-attribute

Metadata associated with this snapshot

created_at: Optional[str] instance-attribute

Timestamp of snapshot creation

parent_config: Optional[RunnableConfig] instance-attribute

Config used to fetch the parent snapshot, if any

tasks: tuple[PregelTask, ...] instance-attribute

Tasks to execute in this step. If already attempted, may contain an error.


A message or packet to send to a specific node in the graph.

The Send class is used within a StateGraph's conditional edges to dynamically invoke a node with a custom state at the next step.

Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management.

One such example is a "map-reduce" workflow where your graph invokes the same node multiple times in parallel with different states, before aggregating the results back into the main graph's state.


  • node (str) –

    The name of the target node to send the message to.

  • arg (Any) –

    The state or message to send to the target node.


>>> from typing import Annotated
>>> import operator
>>> class OverallState(TypedDict):
...     subjects: list[str]
...     jokes: Annotated[list[str], operator.add]
>>> from langgraph.types import Send
>>> from langgraph.graph import END, START
>>> def continue_to_jokes(state: OverallState):
...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
>>> from langgraph.graph import StateGraph
>>> builder = StateGraph(OverallState)
>>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
>>> builder.add_conditional_edges(START, continue_to_jokes)
>>> builder.add_edge("generate_joke", END)
>>> graph = builder.compile()
>>> # Invoking with two subjects results in a generated joke for each
>>> graph.invoke({"subjects": ["cats", "dogs"]})
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
Source code in libs/langgraph/langgraph/
class Send:
    """A message or packet to send to a specific node in the graph.

    The `Send` class is used within a `StateGraph`'s conditional edges to
    dynamically invoke a node with a custom state at the next step.

    Importantly, the sent state can differ from the core graph's state,
    allowing for flexible and dynamic workflow management.

    One such example is a "map-reduce" workflow where your graph invokes
    the same node multiple times in parallel with different states,
    before aggregating the results back into the main graph's state.

        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.

        >>> from typing import Annotated
        >>> import operator
        >>> class OverallState(TypedDict):
        ...     subjects: list[str]
        ...     jokes: Annotated[list[str], operator.add]
        >>> from langgraph.types import Send
        >>> from langgraph.graph import END, START
        >>> def continue_to_jokes(state: OverallState):
        ...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
        >>> from langgraph.graph import StateGraph
        >>> builder = StateGraph(OverallState)
        >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
        >>> builder.add_conditional_edges(START, continue_to_jokes)
        >>> builder.add_edge("generate_joke", END)
        >>> graph = builder.compile()
        >>> # Invoking with two subjects results in a generated joke for each
        >>> graph.invoke({"subjects": ["cats", "dogs"]})
        {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}

    __slots__ = ("node", "arg")

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        Initialize a new instance of the Send class.

            node (str): The name of the target node to send the message to.
            arg (Any): The state or message to send to the target node.
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg

__init__(node: str, arg: Any) -> None

Initialize a new instance of the Send class.


  • node (str) –

    The name of the target node to send the message to.

  • arg (Any) –

    The state or message to send to the target node.

Source code in libs/langgraph/langgraph/
def __init__(self, /, node: str, arg: Any) -> None:
    Initialize a new instance of the Send class.

        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.
    self.node = node
    self.arg = arg
