Skip to content

Graph Definitions

Graphs are the core abstraction of LangGraph. Each StateGraph implementation is used to create graph workflows. Once compiled, you can run the CompiledGraph to run the application.

StateGraph

from langgraph.graph import StateGraph
from typing_extensions import TypedDict
class MyState(TypedDict)
    ...
graph = StateGraph(MyState)

Bases: Graph

A graph whose nodes communicate by reading and writing to a shared state. The signature of each node is State -> Partial.

Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.

Parameters:

  • state_schema (Type[Any], default: None ) –

    The schema class that defines the state.

  • config_schema (Optional[Type[Any]], default: None ) –

    The schema class that defines the configuration. Use this to expose configurable parameters in your API.

Examples:

>>> from langchain_core.runnables import RunnableConfig
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.checkpoint.memory import MemorySaver
>>> from langgraph.graph import StateGraph
>>>
>>> def reducer(a: list, b: int | None) -> list:
...     if b is not None:
...         return a + [b]
...     return a
>>>
>>> class State(TypedDict):
...     x: Annotated[list, reducer]
>>>
>>> class ConfigSchema(TypedDict):
...     r: float
>>>
>>> graph = StateGraph(State, config_schema=ConfigSchema)
>>>
>>> def node(state: State, config: RunnableConfig) -> dict:
...     r = config["configurable"].get("r", 1.0)
...     x = state["x"][-1]
...     next_value = x * r * (1 - x)
...     return {"x": next_value}
>>>
>>> graph.add_node("A", node)
>>> graph.set_entry_point("A")
>>> graph.set_finish_point("A")
>>> compiled = graph.compile()
>>>
>>> print(compiled.config_specs)
[ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
>>>
>>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
>>> print(step1)
{'x': [0.5, 0.75]}
Source code in libs/langgraph/langgraph/graph/state.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class StateGraph(Graph):
    """A graph whose nodes communicate by reading and writing to a shared state.
    The signature of each node is State -> Partial<State>.

    Each state key can optionally be annotated with a reducer function that
    will be used to aggregate the values of that key received from multiple nodes.
    The signature of a reducer function is (Value, Value) -> Value.

    Args:
        state_schema (Type[Any]): The schema class that defines the state.
        config_schema (Optional[Type[Any]]): The schema class that defines the configuration.
            Use this to expose configurable parameters in your API.


    Examples:
        >>> from langchain_core.runnables import RunnableConfig
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.checkpoint.memory import MemorySaver
        >>> from langgraph.graph import StateGraph
        >>>
        >>> def reducer(a: list, b: int | None) -> list:
        ...     if b is not None:
        ...         return a + [b]
        ...     return a
        >>>
        >>> class State(TypedDict):
        ...     x: Annotated[list, reducer]
        >>>
        >>> class ConfigSchema(TypedDict):
        ...     r: float
        >>>
        >>> graph = StateGraph(State, config_schema=ConfigSchema)
        >>>
        >>> def node(state: State, config: RunnableConfig) -> dict:
        ...     r = config["configurable"].get("r", 1.0)
        ...     x = state["x"][-1]
        ...     next_value = x * r * (1 - x)
        ...     return {"x": next_value}
        >>>
        >>> graph.add_node("A", node)
        >>> graph.set_entry_point("A")
        >>> graph.set_finish_point("A")
        >>> compiled = graph.compile()
        >>>
        >>> print(compiled.config_specs)
        [ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
        >>>
        >>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
        >>> print(step1)
        {'x': [0.5, 0.75]}"""

    nodes: dict[str, StateNodeSpec]
    channels: dict[str, BaseChannel]
    managed: dict[str, ManagedValueSpec]
    schemas: dict[Type[Any], dict[str, Union[BaseChannel, ManagedValueSpec]]]

    def __init__(
        self,
        state_schema: Optional[Type[Any]] = None,
        config_schema: Optional[Type[Any]] = None,
        *,
        input: Optional[Type[Any]] = None,
        output: Optional[Type[Any]] = None,
    ) -> None:
        super().__init__()
        if state_schema is None:
            if input is None or output is None:
                raise ValueError("Must provide state_schema or input and output")
            state_schema = input
            warnings.warn(
                "Initializing StateGraph without state_schema is deprecated. "
                "Please pass in an explicit state_schema instead of just an input and output schema.",
                LangGraphDeprecationWarning,
                stacklevel=2,
            )
        else:
            if input is None:
                input = state_schema
            if output is None:
                output = state_schema
        self.schemas = {}
        self.channels = {}
        self.managed = {}
        self.schema = state_schema
        self.input = input
        self.output = output
        self._add_schema(state_schema)
        self._add_schema(input, allow_managed=False)
        self._add_schema(output, allow_managed=False)
        self.config_schema = config_schema
        self.waiting_edges: set[tuple[tuple[str, ...], str]] = set()

    @property
    def _all_edges(self) -> set[tuple[str, str]]:
        return self.edges | {
            (start, end) for starts, end in self.waiting_edges for start in starts
        }

    def _add_schema(self, schema: Type[Any], /, allow_managed: bool = True) -> None:
        if schema not in self.schemas:
            _warn_invalid_state_schema(schema)
            channels, managed = _get_channels(schema)
            if managed and not allow_managed:
                names = ", ".join(managed)
                schema_name = getattr(schema, "__name__", "")
                raise ValueError(
                    f"Invalid managed channels detected in {schema_name}: {names}."
                    " Managed channels are not permitted in Input/Output schema."
                )
            self.schemas[schema] = {**channels, **managed}
            for key, channel in channels.items():
                if key in self.channels:
                    if self.channels[key] != channel:
                        if isinstance(channel, LastValue):
                            pass
                        else:
                            raise ValueError(
                                f"Channel '{key}' already exists with a different type"
                            )
                else:
                    self.channels[key] = channel
            for key, managed in managed.items():
                if key in self.managed:
                    if self.managed[key] != managed:
                        raise ValueError(
                            f"Managed value '{key}' already exists with a different type"
                        )
                else:
                    self.managed[key] = managed

    @overload
    def add_node(
        self,
        node: RunnableLike,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.
        Will take the name of the function/runnable as the node name.

        Args:
            node (RunnableLike): The function or runnable this node will run.

        Raises:
            ValueError: If the key is already being used as a state key.

        Returns:
            None
        """
        ...

    @overload
    def add_node(
        self,
        node: str,
        action: RunnableLike,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.

        Args:
            node (str): The key of the node.
            action (RunnableLike): The action associated with the node.

        Raises:
            ValueError: If the key is already being used as a state key.

        Returns:
            None
        """
        ...

    def add_node(
        self,
        node: Union[str, RunnableLike],
        action: Optional[RunnableLike] = None,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.

        Will take the name of the function/runnable as the node name.

        Args:
            node (Union[str, RunnableLike)]: The function or runnable this node will run.
            action (Optional[RunnableLike]): The action associated with the node. (default: None)
            metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
            input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
            retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
        Raises:
            ValueError: If the key is already being used as a state key.


        Examples:
            ```pycon
            >>> from langgraph.graph import START, StateGraph
            ...
            >>> def my_node(state, config):
            ...    return {"x": state["x"] + 1}
            ...
            >>> builder = StateGraph(dict)
            >>> builder.add_node(my_node)  # node name will be 'my_node'
            >>> builder.add_edge(START, "my_node")
            >>> graph = builder.compile()
            >>> graph.invoke({"x": 1})
            {'x': 2}
            ```
            Customize the name:

            ```pycon
            >>> builder = StateGraph(dict)
            >>> builder.add_node("my_fair_node", my_node)
            >>> builder.add_edge(START, "my_fair_node")
            >>> graph = builder.compile()
            >>> graph.invoke({"x": 1})
            {'x': 2}
            ```

        Returns:
            None
        """
        if not isinstance(node, str):
            action = node
            if isinstance(action, Runnable):
                node = action.name
            else:
                node = getattr(action, "__name__", action.__class__.__name__)
            if node is None:
                raise ValueError(
                    "Node name must be provided if action is not a function"
                )
        if node in self.channels:
            raise ValueError(f"'{node}' is already being used as a state key")
        if self.compiled:
            logger.warning(
                "Adding a node to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        if not isinstance(node, str):
            action = node
            node = getattr(action, "name", action.__name__)
        if node in self.nodes:
            raise ValueError(f"Node `{node}` already present.")
        if node == END or node == START:
            raise ValueError(f"Node `{node}` is reserved.")

        for character in (NS_SEP, NS_END):
            if character in node:
                raise ValueError(
                    f"'{character}' is a reserved character and is not allowed in the node names."
                )

        try:
            if isfunction(action) and (
                hints := get_type_hints(action.__call__) or get_type_hints(action)
            ):
                if input is None:
                    first_parameter_name = next(
                        iter(inspect.signature(action).parameters.keys())
                    )
                    if input_hint := hints.get(first_parameter_name):
                        if isinstance(input_hint, type) and get_type_hints(input_hint):
                            input = input_hint
        except (TypeError, StopIteration):
            pass
        if input is not None:
            self._add_schema(input)
        self.nodes[node] = StateNodeSpec(
            coerce_to_runnable(action, name=node, trace=False),
            metadata,
            input=input or self.schema,
            retry_policy=retry,
        )

    def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
        """Adds a directed edge from the start node to the end node.

        If the graph transitions to the start_key node, it will always transition to the end_key node next.

        Args:
            start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
            end_key (str): The key of the end node of the edge.

        Raises:
            ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

        Returns:
            None
        """
        if isinstance(start_key, str):
            return super().add_edge(start_key, end_key)

        if self.compiled:
            logger.warning(
                "Adding an edge to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        for start in start_key:
            if start == END:
                raise ValueError("END cannot be a start node")
            if start not in self.nodes:
                raise ValueError(f"Need to add_node `{start}` first")
        if end_key == START:
            raise ValueError("START cannot be an end node")
        if end_key != END and end_key not in self.nodes:
            raise ValueError(f"Need to add_node `{end_key}` first")

        self.waiting_edges.add((tuple(start_key), end_key))

    def compile(
        self,
        checkpointer: Optional[BaseCheckpointSaver] = None,
        *,
        store: Optional[BaseStore] = None,
        interrupt_before: Optional[Union[All, Sequence[str]]] = None,
        interrupt_after: Optional[Union[All, Sequence[str]]] = None,
        debug: bool = False,
    ) -> "CompiledStateGraph":
        """Compiles the state graph into a `CompiledGraph` object.

        The compiled graph implements the `Runnable` interface and can be invoked,
        streamed, batched, and run asynchronously.

        Args:
            checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
                This serves as a fully versioned "memory" for the graph, allowing
                the graph to be paused and resumed, and replayed from any point.
            interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
            interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
            debug (bool): A flag indicating whether to enable debug mode.

        Returns:
            CompiledStateGraph: The compiled state graph.
        """
        # assign default values
        interrupt_before = interrupt_before or []
        interrupt_after = interrupt_after or []

        # validate the graph
        self.validate(
            interrupt=(
                (interrupt_before if interrupt_before != "*" else []) + interrupt_after
                if interrupt_after != "*"
                else []
            )
        )

        # prepare output channels
        output_channels = (
            "__root__"
            if len(self.schemas[self.output]) == 1
            and "__root__" in self.schemas[self.output]
            else [
                key
                for key, val in self.schemas[self.output].items()
                if not is_managed_value(val)
            ]
        )
        stream_channels = (
            "__root__"
            if len(self.channels) == 1 and "__root__" in self.channels
            else [
                key for key, val in self.channels.items() if not is_managed_value(val)
            ]
        )

        compiled = CompiledStateGraph(
            builder=self,
            config_type=self.config_schema,
            nodes={},
            channels={
                **self.channels,
                **self.managed,
                START: EphemeralValue(self.input),
            },
            input_channels=START,
            stream_mode="updates",
            output_channels=output_channels,
            stream_channels=stream_channels,
            checkpointer=checkpointer,
            interrupt_before_nodes=interrupt_before,
            interrupt_after_nodes=interrupt_after,
            auto_validate=False,
            debug=debug,
            store=store,
        )

        compiled.attach_node(START, None)
        for key, node in self.nodes.items():
            compiled.attach_node(key, node)

        for start, end in self.edges:
            compiled.attach_edge(start, end)

        for starts, end in self.waiting_edges:
            compiled.attach_edge(starts, end)

        for start, branches in self.branches.items():
            for name, branch in branches.items():
                compiled.attach_branch(start, name, branch)

        return compiled.validate()

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

Source code in libs/langgraph/langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[Hashable, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None

    Note: Without typehints on the `path` function's return value (e.g., `-> Literal["foo", "__end__"]:`)
        or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    try:
        if isinstance(path_map, dict):
            path_map = path_map.copy()
        elif isinstance(path_map, list):
            path_map = {name: name for name in path_map}
        elif rtn_type := get_type_hints(path.__call__).get(
            "return"
        ) or get_type_hints(path).get("return"):
            if get_origin(rtn_type) is Literal:
                path_map = {name: name for name in get_args(rtn_type)}
    except Exception:
        pass
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Equivalent to calling add_edge(START, key).

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Equivalent to calling `add_edge(START, key)`.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

add_node(node, action=None, *, metadata=None, input=None, retry=None)

Adds a new node to the state graph.

Will take the name of the function/runnable as the node name.

Parameters:

  • node (Union[str, RunnableLike)]) –

    The function or runnable this node will run.

  • action (Optional[RunnableLike], default: None ) –

    The action associated with the node. (default: None)

  • metadata (Optional[dict[str, Any]], default: None ) –

    The metadata associated with the node. (default: None)

  • input (Optional[Type[Any]], default: None ) –

    The input schema for the node. (default: the graph's input schema)

  • retry (Optional[RetryPolicy], default: None ) –

    The policy for retrying the node. (default: None)

Raises: ValueError: If the key is already being used as a state key.

Examples:

>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
...    return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node)  # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
Customize the name:

>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/state.py
def add_node(
    self,
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    metadata: Optional[dict[str, Any]] = None,
    input: Optional[Type[Any]] = None,
    retry: Optional[RetryPolicy] = None,
) -> None:
    """Adds a new node to the state graph.

    Will take the name of the function/runnable as the node name.

    Args:
        node (Union[str, RunnableLike)]: The function or runnable this node will run.
        action (Optional[RunnableLike]): The action associated with the node. (default: None)
        metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
        input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
        retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
    Raises:
        ValueError: If the key is already being used as a state key.


    Examples:
        ```pycon
        >>> from langgraph.graph import START, StateGraph
        ...
        >>> def my_node(state, config):
        ...    return {"x": state["x"] + 1}
        ...
        >>> builder = StateGraph(dict)
        >>> builder.add_node(my_node)  # node name will be 'my_node'
        >>> builder.add_edge(START, "my_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```
        Customize the name:

        ```pycon
        >>> builder = StateGraph(dict)
        >>> builder.add_node("my_fair_node", my_node)
        >>> builder.add_edge(START, "my_fair_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```

    Returns:
        None
    """
    if not isinstance(node, str):
        action = node
        if isinstance(action, Runnable):
            node = action.name
        else:
            node = getattr(action, "__name__", action.__class__.__name__)
        if node is None:
            raise ValueError(
                "Node name must be provided if action is not a function"
            )
    if node in self.channels:
        raise ValueError(f"'{node}' is already being used as a state key")
    if self.compiled:
        logger.warning(
            "Adding a node to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    if not isinstance(node, str):
        action = node
        node = getattr(action, "name", action.__name__)
    if node in self.nodes:
        raise ValueError(f"Node `{node}` already present.")
    if node == END or node == START:
        raise ValueError(f"Node `{node}` is reserved.")

    for character in (NS_SEP, NS_END):
        if character in node:
            raise ValueError(
                f"'{character}' is a reserved character and is not allowed in the node names."
            )

    try:
        if isfunction(action) and (
            hints := get_type_hints(action.__call__) or get_type_hints(action)
        ):
            if input is None:
                first_parameter_name = next(
                    iter(inspect.signature(action).parameters.keys())
                )
                if input_hint := hints.get(first_parameter_name):
                    if isinstance(input_hint, type) and get_type_hints(input_hint):
                        input = input_hint
    except (TypeError, StopIteration):
        pass
    if input is not None:
        self._add_schema(input)
    self.nodes[node] = StateNodeSpec(
        coerce_to_runnable(action, name=node, trace=False),
        metadata,
        input=input or self.schema,
        retry_policy=retry,
    )

add_edge(start_key, end_key)

Adds a directed edge from the start node to the end node.

If the graph transitions to the start_key node, it will always transition to the end_key node next.

Parameters:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.

  • end_key (str) –

    The key of the end node of the edge.

Raises:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/state.py
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == START:
        raise ValueError("START cannot be an end node")
    if end_key != END and end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

compile(checkpointer=None, *, store=None, interrupt_before=None, interrupt_after=None, debug=False)

Compiles the state graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.

Returns:

  • CompiledStateGraph ( CompiledStateGraph ) –

    The compiled state graph.

Source code in libs/langgraph/langgraph/graph/state.py
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    *,
    store: Optional[BaseStore] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> "CompiledStateGraph":
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledStateGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(
            (interrupt_before if interrupt_before != "*" else []) + interrupt_after
            if interrupt_after != "*"
            else []
        )
    )

    # prepare output channels
    output_channels = (
        "__root__"
        if len(self.schemas[self.output]) == 1
        and "__root__" in self.schemas[self.output]
        else [
            key
            for key, val in self.schemas[self.output].items()
            if not is_managed_value(val)
        ]
    )
    stream_channels = (
        "__root__"
        if len(self.channels) == 1 and "__root__" in self.channels
        else [
            key for key, val in self.channels.items() if not is_managed_value(val)
        ]
    )

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={
            **self.channels,
            **self.managed,
            START: EphemeralValue(self.input),
        },
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=stream_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
        store=store,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

handler: python

MessageGraph

Bases: StateGraph

A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages. Each node in a MessageGraph takes a list of messages as input and returns zero or more messages as output. The add_messages function is used to merge the output messages from each node into the existing list of messages in the graph's state.

Examples:

>>> from langgraph.graph.message import MessageGraph
...
>>> builder = MessageGraph()
>>> builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> builder.compile().invoke([("user", "Hi there.")])
[HumanMessage(content="Hi there.", id='...'), AIMessage(content="Hello!", id='...')]
>>> from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
>>> from langgraph.graph.message import MessageGraph
...
>>> builder = MessageGraph()
>>> builder.add_node(
...     "chatbot",
...     lambda state: [
...         AIMessage(
...             content="Hello!",
...             tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
...         )
...     ],
... )
>>> builder.add_node(
...     "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
... )
>>> builder.set_entry_point("chatbot")
>>> builder.add_edge("chatbot", "search")
>>> builder.set_finish_point("search")
>>> builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
{'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
             AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
             ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
Source code in libs/langgraph/langgraph/graph/message.py
class MessageGraph(StateGraph):
    """A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

    MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages.
    Each node in a MessageGraph takes a list of messages as input and returns zero or more
    messages as output. The `add_messages` function is used to merge the output messages from each node
    into the existing list of messages in the graph's state.

    Examples:
        ```pycon
        >>> from langgraph.graph.message import MessageGraph
        ...
        >>> builder = MessageGraph()
        >>> builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
        >>> builder.set_entry_point("chatbot")
        >>> builder.set_finish_point("chatbot")
        >>> builder.compile().invoke([("user", "Hi there.")])
        [HumanMessage(content="Hi there.", id='...'), AIMessage(content="Hello!", id='...')]
        ```

        ```pycon
        >>> from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
        >>> from langgraph.graph.message import MessageGraph
        ...
        >>> builder = MessageGraph()
        >>> builder.add_node(
        ...     "chatbot",
        ...     lambda state: [
        ...         AIMessage(
        ...             content="Hello!",
        ...             tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
        ...         )
        ...     ],
        ... )
        >>> builder.add_node(
        ...     "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
        ... )
        >>> builder.set_entry_point("chatbot")
        >>> builder.add_edge("chatbot", "search")
        >>> builder.set_finish_point("search")
        >>> builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
        {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
                     AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
                     ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
        ```
    """

    def __init__(self) -> None:
        super().__init__(Annotated[list[AnyMessage], add_messages])

add_node(node, action=None, *, metadata=None, input=None, retry=None)

Adds a new node to the state graph.

Will take the name of the function/runnable as the node name.

Parameters:

  • node (Union[str, RunnableLike)]) –

    The function or runnable this node will run.

  • action (Optional[RunnableLike], default: None ) –

    The action associated with the node. (default: None)

  • metadata (Optional[dict[str, Any]], default: None ) –

    The metadata associated with the node. (default: None)

  • input (Optional[Type[Any]], default: None ) –

    The input schema for the node. (default: the graph's input schema)

  • retry (Optional[RetryPolicy], default: None ) –

    The policy for retrying the node. (default: None)

Raises: ValueError: If the key is already being used as a state key.

Examples:

>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
...    return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node)  # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
Customize the name:

>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/state.py
def add_node(
    self,
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    metadata: Optional[dict[str, Any]] = None,
    input: Optional[Type[Any]] = None,
    retry: Optional[RetryPolicy] = None,
) -> None:
    """Adds a new node to the state graph.

    Will take the name of the function/runnable as the node name.

    Args:
        node (Union[str, RunnableLike)]: The function or runnable this node will run.
        action (Optional[RunnableLike]): The action associated with the node. (default: None)
        metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
        input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
        retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
    Raises:
        ValueError: If the key is already being used as a state key.


    Examples:
        ```pycon
        >>> from langgraph.graph import START, StateGraph
        ...
        >>> def my_node(state, config):
        ...    return {"x": state["x"] + 1}
        ...
        >>> builder = StateGraph(dict)
        >>> builder.add_node(my_node)  # node name will be 'my_node'
        >>> builder.add_edge(START, "my_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```
        Customize the name:

        ```pycon
        >>> builder = StateGraph(dict)
        >>> builder.add_node("my_fair_node", my_node)
        >>> builder.add_edge(START, "my_fair_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```

    Returns:
        None
    """
    if not isinstance(node, str):
        action = node
        if isinstance(action, Runnable):
            node = action.name
        else:
            node = getattr(action, "__name__", action.__class__.__name__)
        if node is None:
            raise ValueError(
                "Node name must be provided if action is not a function"
            )
    if node in self.channels:
        raise ValueError(f"'{node}' is already being used as a state key")
    if self.compiled:
        logger.warning(
            "Adding a node to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    if not isinstance(node, str):
        action = node
        node = getattr(action, "name", action.__name__)
    if node in self.nodes:
        raise ValueError(f"Node `{node}` already present.")
    if node == END or node == START:
        raise ValueError(f"Node `{node}` is reserved.")

    for character in (NS_SEP, NS_END):
        if character in node:
            raise ValueError(
                f"'{character}' is a reserved character and is not allowed in the node names."
            )

    try:
        if isfunction(action) and (
            hints := get_type_hints(action.__call__) or get_type_hints(action)
        ):
            if input is None:
                first_parameter_name = next(
                    iter(inspect.signature(action).parameters.keys())
                )
                if input_hint := hints.get(first_parameter_name):
                    if isinstance(input_hint, type) and get_type_hints(input_hint):
                        input = input_hint
    except (TypeError, StopIteration):
        pass
    if input is not None:
        self._add_schema(input)
    self.nodes[node] = StateNodeSpec(
        coerce_to_runnable(action, name=node, trace=False),
        metadata,
        input=input or self.schema,
        retry_policy=retry,
    )

add_edge(start_key, end_key)

Adds a directed edge from the start node to the end node.

If the graph transitions to the start_key node, it will always transition to the end_key node next.

Parameters:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.

  • end_key (str) –

    The key of the end node of the edge.

Raises:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/state.py
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == START:
        raise ValueError("START cannot be an end node")
    if end_key != END and end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

Source code in libs/langgraph/langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[Hashable, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None

    Note: Without typehints on the `path` function's return value (e.g., `-> Literal["foo", "__end__"]:`)
        or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    try:
        if isinstance(path_map, dict):
            path_map = path_map.copy()
        elif isinstance(path_map, list):
            path_map = {name: name for name in path_map}
        elif rtn_type := get_type_hints(path.__call__).get(
            "return"
        ) or get_type_hints(path).get("return"):
            if get_origin(rtn_type) is Literal:
                path_map = {name: name for name in get_args(rtn_type)}
    except Exception:
        pass
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Equivalent to calling add_edge(START, key).

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Equivalent to calling `add_edge(START, key)`.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in libs/langgraph/langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

compile(checkpointer=None, *, store=None, interrupt_before=None, interrupt_after=None, debug=False)

Compiles the state graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.

Returns:

  • CompiledStateGraph ( CompiledStateGraph ) –

    The compiled state graph.

Source code in libs/langgraph/langgraph/graph/state.py
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    *,
    store: Optional[BaseStore] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> "CompiledStateGraph":
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledStateGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(
            (interrupt_before if interrupt_before != "*" else []) + interrupt_after
            if interrupt_after != "*"
            else []
        )
    )

    # prepare output channels
    output_channels = (
        "__root__"
        if len(self.schemas[self.output]) == 1
        and "__root__" in self.schemas[self.output]
        else [
            key
            for key, val in self.schemas[self.output].items()
            if not is_managed_value(val)
        ]
    )
    stream_channels = (
        "__root__"
        if len(self.channels) == 1 and "__root__" in self.channels
        else [
            key for key, val in self.channels.items() if not is_managed_value(val)
        ]
    )

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={
            **self.channels,
            **self.managed,
            START: EphemeralValue(self.input),
        },
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=stream_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
        store=store,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

add_messages

Merges two lists of messages, updating existing messages by ID.

By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.

Parameters:

  • left (Messages) –

    The base list of messages.

  • right (Messages) –

    The list of messages (or single message) to merge into the base list.

Returns:

  • Messages

    A new list of messages with the messages from right merged into left.

  • Messages

    If a message in right has the same ID as a message in left, the

  • Messages

    message from right will replace the message from left.

Examples:

>>> from langchain_core.messages import AIMessage, HumanMessage
>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [AIMessage(content="Hi there!", id="2")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]

>>> msgs1 = [HumanMessage(content="Hello", id="1")]
>>> msgs2 = [HumanMessage(content="Hello again", id="1")]
>>> add_messages(msgs1, msgs2)
[HumanMessage(content='Hello again', id='1')]

>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph
>>>
>>> class State(TypedDict):
...     messages: Annotated[list, add_messages]
...
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({})
{'messages': [AIMessage(content='Hello', id=...)]}
Source code in libs/langgraph/langgraph/graph/message.py
def add_messages(left: Messages, right: Messages) -> Messages:
    """Merges two lists of messages, updating existing messages by ID.

    By default, this ensures the state is "append-only", unless the
    new message has the same ID as an existing message.

    Args:
        left: The base list of messages.
        right: The list of messages (or single message) to merge
            into the base list.

    Returns:
        A new list of messages with the messages from `right` merged into `left`.
        If a message in `right` has the same ID as a message in `left`, the
        message from `right` will replace the message from `left`.

    Examples:
        ```pycon
        >>> from langchain_core.messages import AIMessage, HumanMessage
        >>> msgs1 = [HumanMessage(content="Hello", id="1")]
        >>> msgs2 = [AIMessage(content="Hi there!", id="2")]
        >>> add_messages(msgs1, msgs2)
        [HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]

        >>> msgs1 = [HumanMessage(content="Hello", id="1")]
        >>> msgs2 = [HumanMessage(content="Hello again", id="1")]
        >>> add_messages(msgs1, msgs2)
        [HumanMessage(content='Hello again', id='1')]

        >>> from typing import Annotated
        >>> from typing_extensions import TypedDict
        >>> from langgraph.graph import StateGraph
        >>>
        >>> class State(TypedDict):
        ...     messages: Annotated[list, add_messages]
        ...
        >>> builder = StateGraph(State)
        >>> builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
        >>> builder.set_entry_point("chatbot")
        >>> builder.set_finish_point("chatbot")
        >>> graph = builder.compile()
        >>> graph.invoke({})
        {'messages': [AIMessage(content='Hello', id=...)]}
        ```

    """
    # coerce to list
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]
    # coerce to message
    left = [message_chunk_to_message(m) for m in convert_to_messages(left)]
    right = [message_chunk_to_message(m) for m in convert_to_messages(right)]
    # assign missing ids
    for m in left:
        if m.id is None:
            m.id = str(uuid.uuid4())
    for m in right:
        if m.id is None:
            m.id = str(uuid.uuid4())
    # merge
    left_idx_by_id = {m.id: i for i, m in enumerate(left)}
    merged = left.copy()
    ids_to_remove = set()
    for m in right:
        if (existing_idx := left_idx_by_id.get(m.id)) is not None:
            if isinstance(m, RemoveMessage):
                ids_to_remove.add(m.id)
            else:
                merged[existing_idx] = m
        else:
            if isinstance(m, RemoveMessage):
                raise ValueError(
                    f"Attempting to delete a message with an ID that doesn't exist ('{m.id}')"
                )

            merged.append(m)
    merged = [m for m in merged if m.id not in ids_to_remove]
    return merged

CompiledGraph

Bases: Pregel

Source code in libs/langgraph/langgraph/graph/graph.py
class CompiledGraph(Pregel):
    builder: Graph

    def __init__(self, *, builder: Graph, **kwargs):
        super().__init__(**kwargs)
        self.builder = builder

    def attach_node(self, key: str, node: NodeSpec) -> None:
        self.channels[key] = EphemeralValue(Any)
        self.nodes[key] = (
            PregelNode(channels=[], triggers=[], metadata=node.metadata)
            | node.runnable
            | ChannelWrite([ChannelWriteEntry(key)], tags=[TAG_HIDDEN])
        )
        cast(list[str], self.stream_channels).append(key)

    def attach_edge(self, start: str, end: str) -> None:
        if end == END:
            # publish to end channel
            self.nodes[start].writers.append(
                ChannelWrite([ChannelWriteEntry(END)], tags=[TAG_HIDDEN])
            )
        else:
            # subscribe to start channel
            self.nodes[end].triggers.append(start)
            self.nodes[end].channels.append(start)

    def attach_branch(self, start: str, name: str, branch: Branch) -> None:
        def branch_writer(
            packets: list[Union[str, Send]], config: RunnableConfig
        ) -> Optional[ChannelWrite]:
            writes = [
                (
                    ChannelWriteEntry(f"branch:{start}:{name}:{p}" if p != END else END)
                    if not isinstance(p, Send)
                    else p
                )
                for p in packets
            ]
            return ChannelWrite(writes, tags=[TAG_HIDDEN])

        # add hidden start node
        if start == START and start not in self.nodes:
            self.nodes[start] = Channel.subscribe_to(START, tags=[TAG_HIDDEN])

        # attach branch writer
        self.nodes[start] |= branch.run(branch_writer)

        # attach branch readers
        ends = branch.ends.values() if branch.ends else [node for node in self.nodes]
        for end in ends:
            if end != END:
                channel_name = f"branch:{start}:{name}:{end}"
                self.channels[channel_name] = EphemeralValue(Any)
                self.nodes[end].triggers.append(channel_name)
                self.nodes[end].channels.append(channel_name)

    def get_graph(
        self,
        config: Optional[RunnableConfig] = None,
        *,
        xray: Union[int, bool] = False,
    ) -> DrawableGraph:
        """Returns a drawable representation of the computation graph."""
        graph = DrawableGraph()
        start_nodes: dict[str, DrawableNode] = {
            START: graph.add_node(self.get_input_schema(config), START)
        }
        end_nodes: dict[str, DrawableNode] = {}
        if xray:
            subgraphs = dict(self.get_subgraphs())
        else:
            subgraphs = {}

        def add_edge(
            start: str, end: str, label: Optional[str] = None, conditional: bool = False
        ) -> None:
            if end == END and END not in end_nodes:
                end_nodes[END] = graph.add_node(self.get_output_schema(config), END)
            return graph.add_edge(
                start_nodes[start], end_nodes[end], label, conditional
            )

        for key, n in self.builder.nodes.items():
            node = n.runnable
            metadata = n.metadata or {}
            if key in self.interrupt_before_nodes and key in self.interrupt_after_nodes:
                metadata["__interrupt"] = "before,after"
            elif key in self.interrupt_before_nodes:
                metadata["__interrupt"] = "before"
            elif key in self.interrupt_after_nodes:
                metadata["__interrupt"] = "after"
            if xray:
                subgraph = (
                    subgraphs[key].get_graph(
                        config=config,
                        xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                    )
                    if key in subgraphs
                    else node.get_graph(config=config)
                )
                subgraph.trim_first_node()
                subgraph.trim_last_node()
                if len(subgraph.nodes) > 1:
                    end_nodes[key], start_nodes[key] = graph.extend(
                        subgraph, prefix=key
                    )
                else:
                    n = graph.add_node(node, key, metadata=metadata or None)
                    start_nodes[key] = n
                    end_nodes[key] = n
            else:
                n = graph.add_node(node, key, metadata=metadata or None)
                start_nodes[key] = n
                end_nodes[key] = n
        for start, end in sorted(self.builder._all_edges):
            add_edge(start, end)
        for start, branches in self.builder.branches.items():
            default_ends = {
                **{k: k for k in self.builder.nodes if k != start},
                END: END,
            }
            for _, branch in branches.items():
                if branch.ends is not None:
                    ends = branch.ends
                elif branch.then is not None:
                    ends = {k: k for k in default_ends if k not in (END, branch.then)}
                else:
                    ends = default_ends
                for label, end in ends.items():
                    add_edge(
                        start,
                        end,
                        label if label != end else None,
                        conditional=True,
                    )
                    if branch.then is not None:
                        add_edge(end, branch.then)

        return graph

stream_mode: StreamMode = stream_mode class-attribute instance-attribute

Mode to stream output, defaults to 'values'.

stream_channels: Optional[Union[str, Sequence[str]]] = stream_channels class-attribute instance-attribute

Channels to stream, defaults to all channels not in reserved channels

step_timeout: Optional[float] = step_timeout class-attribute instance-attribute

Maximum time to wait for a step to complete, in seconds. Defaults to None.

debug: bool = debug if debug is not None else get_debug() instance-attribute

Whether to print debug information during execution. Defaults to False.

checkpointer: Optional[BaseCheckpointSaver] = checkpointer class-attribute instance-attribute

Checkpointer used to save and load graph state. Defaults to None.

store: Optional[BaseStore] = store class-attribute instance-attribute

Memory store to use for SharedValues. Defaults to None.

retry_policy: Optional[RetryPolicy] = retry_policy class-attribute instance-attribute

Retry policy to use when running tasks. Set to None to disable.

get_state(config, *, subgraphs=False)

Get the current state of the graph.

Source code in libs/langgraph/langgraph/pregel/__init__.py
def get_state(
    self, config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot:
    """Get the current state of the graph."""
    checkpointer: Optional[BaseCheckpointSaver] = config["configurable"].get(
        CONFIG_KEY_CHECKPOINTER, self.checkpointer
    )
    if not checkpointer:
        raise ValueError("No checkpointer set")

    if (
        checkpoint_ns := config["configurable"].get("checkpoint_ns", "")
    ) and CONFIG_KEY_CHECKPOINTER not in config["configurable"]:
        # remove task_ids from checkpoint_ns
        recast_checkpoint_ns = NS_SEP.join(
            part.split(NS_END)[0] for part in checkpoint_ns.split(NS_SEP)
        )
        # find the subgraph with the matching name
        for name, pregel in self.get_subgraphs(recurse=True):
            if name == recast_checkpoint_ns:
                return pregel.get_state(
                    patch_configurable(
                        config, {CONFIG_KEY_CHECKPOINTER: checkpointer}
                    ),
                    subgraphs=subgraphs,
                )
        else:
            raise ValueError(f"Subgraph {recast_checkpoint_ns} not found")

    config = merge_configs(self.config, config) if self.config else config
    saved = checkpointer.get_tuple(config)
    return self._prepare_state_snapshot(
        config, saved, recurse=checkpointer if subgraphs else None
    )

aget_state(config, *, subgraphs=False) async

Get the current state of the graph.

Source code in libs/langgraph/langgraph/pregel/__init__.py
async def aget_state(
    self, config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot:
    """Get the current state of the graph."""
    checkpointer: Optional[BaseCheckpointSaver] = config["configurable"].get(
        CONFIG_KEY_CHECKPOINTER, self.checkpointer
    )
    if not checkpointer:
        raise ValueError("No checkpointer set")

    if (
        checkpoint_ns := config["configurable"].get("checkpoint_ns", "")
    ) and CONFIG_KEY_CHECKPOINTER not in config["configurable"]:
        # remove task_ids from checkpoint_ns
        recast_checkpoint_ns = NS_SEP.join(
            part.split(NS_END)[0] for part in checkpoint_ns.split(NS_SEP)
        )
        # find the subgraph with the matching name
        async for name, pregel in self.aget_subgraphs(recurse=True):
            if name == recast_checkpoint_ns:
                return await pregel.aget_state(
                    patch_configurable(
                        config, {CONFIG_KEY_CHECKPOINTER: checkpointer}
                    ),
                    subgraphs=subgraphs,
                )
        else:
            raise ValueError(f"Subgraph {recast_checkpoint_ns} not found")

    config = merge_configs(self.config, config) if self.config else config
    saved = await checkpointer.aget_tuple(config)
    return await self._aprepare_state_snapshot(
        config, saved, recurse=checkpointer if subgraphs else None
    )

get_state_history(config, *, filter=None, before=None, limit=None)

Get the history of the state of the graph.

Source code in libs/langgraph/langgraph/pregel/__init__.py
def get_state_history(
    self,
    config: RunnableConfig,
    *,
    filter: Optional[Dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> Iterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    checkpointer: Optional[BaseCheckpointSaver] = config["configurable"].get(
        CONFIG_KEY_CHECKPOINTER, self.checkpointer
    )
    if not checkpointer:
        raise ValueError("No checkpointer set")

    if (
        checkpoint_ns := config["configurable"].get("checkpoint_ns", "")
    ) and CONFIG_KEY_CHECKPOINTER not in config["configurable"]:
        # remove task_ids from checkpoint_ns
        recast_checkpoint_ns = NS_SEP.join(
            part.split(NS_END)[0] for part in checkpoint_ns.split(NS_SEP)
        )
        # find the subgraph with the matching name
        for name, pregel in self.get_subgraphs(recurse=True):
            if name == recast_checkpoint_ns:
                yield from pregel.get_state_history(
                    patch_configurable(
                        config, {CONFIG_KEY_CHECKPOINTER: checkpointer}
                    ),
                    filter=filter,
                    before=before,
                    limit=limit,
                )
                return
        else:
            raise ValueError(f"Subgraph {recast_checkpoint_ns} not found")

    config = merge_configs(
        self.config, config, {"configurable": {"checkpoint_ns": checkpoint_ns}}
    )
    # eagerly consume list() to avoid holding up the db cursor
    for checkpoint_tuple in list(
        checkpointer.list(config, before=before, limit=limit, filter=filter)
    ):
        yield self._prepare_state_snapshot(
            checkpoint_tuple.config, checkpoint_tuple
        )

aget_state_history(config, *, filter=None, before=None, limit=None) async

Get the history of the state of the graph.

Source code in libs/langgraph/langgraph/pregel/__init__.py
async def aget_state_history(
    self,
    config: RunnableConfig,
    *,
    filter: Optional[Dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> AsyncIterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    checkpointer: Optional[BaseCheckpointSaver] = config["configurable"].get(
        CONFIG_KEY_CHECKPOINTER, self.checkpointer
    )
    if not checkpointer:
        raise ValueError("No checkpointer set")

    if (
        checkpoint_ns := config["configurable"].get("checkpoint_ns", "")
    ) and CONFIG_KEY_CHECKPOINTER not in config["configurable"]:
        # remove task_ids from checkpoint_ns
        recast_checkpoint_ns = NS_SEP.join(
            part.split(NS_END)[0] for part in checkpoint_ns.split(NS_SEP)
        )
        # find the subgraph with the matching name
        async for name, pregel in self.aget_subgraphs(recurse=True):
            if name == recast_checkpoint_ns:
                async for state in pregel.aget_state_history(
                    patch_configurable(
                        config, {CONFIG_KEY_CHECKPOINTER: checkpointer}
                    ),
                    filter=filter,
                    before=before,
                    limit=limit,
                ):
                    yield state
                return
        else:
            raise ValueError(f"Subgraph {recast_checkpoint_ns} not found")

    config = merge_configs(
        self.config, config, {"configurable": {"checkpoint_ns": checkpoint_ns}}
    )
    # eagerly consume list() to avoid holding up the db cursor
    for checkpoint_tuple in [
        c
        async for c in checkpointer.alist(
            config, before=before, limit=limit, filter=filter
        )
    ]:
        yield await self._aprepare_state_snapshot(
            checkpoint_tuple.config, checkpoint_tuple
        )

update_state(config, values, as_node=None)

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

Source code in libs/langgraph/langgraph/pregel/__init__.py
def update_state(
    self,
    config: RunnableConfig,
    values: Optional[Union[dict[str, Any], Any]],
    as_node: Optional[str] = None,
) -> RunnableConfig:
    """Update the state of the graph with the given values, as if they came from
    node `as_node`. If `as_node` is not provided, it will be set to the last node
    that updated the state, if not ambiguous.
    """
    checkpointer: Optional[BaseCheckpointSaver] = config["configurable"].get(
        CONFIG_KEY_CHECKPOINTER, self.checkpointer
    )
    if not checkpointer:
        raise ValueError("No checkpointer set")

    # delegate to subgraph
    if (
        checkpoint_ns := config["configurable"].get("checkpoint_ns", "")
    ) and CONFIG_KEY_CHECKPOINTER not in config["configurable"]:
        # remove task_ids from checkpoint_ns
        recast_checkpoint_ns = NS_SEP.join(
            part.split(NS_END)[0] for part in checkpoint_ns.split(NS_SEP)
        )
        # find the subgraph with the matching name
        for name, pregel in self.get_subgraphs(recurse=True):
            if name == recast_checkpoint_ns:
                return pregel.update_state(
                    patch_configurable(
                        config, {CONFIG_KEY_CHECKPOINTER: checkpointer}
                    ),
                    values,
                    as_node,
                )
        else:
            raise ValueError(f"Subgraph {recast_checkpoint_ns} not found")

    # get last checkpoint
    config = merge_configs(self.config, config) if self.config else config
    saved = checkpointer.get_tuple(config)
    checkpoint = copy_checkpoint(saved.checkpoint) if saved else empty_checkpoint()
    checkpoint_previous_versions = (
        saved.checkpoint["channel_versions"].copy() if saved else {}
    )
    step = saved.metadata.get("step", -1) if saved else -1
    # merge configurable fields with previous checkpoint config
    checkpoint_config = patch_configurable(
        config,
        {"checkpoint_ns": config["configurable"].get("checkpoint_ns", "")},
    )
    if saved:
        checkpoint_config = patch_configurable(config, saved.config["configurable"])
    # find last node that updated the state, if not provided
    if values is None and as_node is None:
        next_config = checkpointer.put(
            checkpoint_config,
            create_checkpoint(checkpoint, None, step),
            {
                "source": "update",
                "step": step + 1,
                "writes": {},
                "parents": saved.metadata.get("parents", {}) if saved else {},
            },
            {},
        )
        return patch_checkpoint_map(next_config, saved.metadata if saved else None)
    elif as_node is None and not any(
        v for vv in checkpoint["versions_seen"].values() for v in vv.values()
    ):
        if (
            isinstance(self.input_channels, str)
            and self.input_channels in self.nodes
        ):
            as_node = self.input_channels
    elif as_node is None:
        last_seen_by_node = sorted(
            (v, n)
            for n, seen in checkpoint["versions_seen"].items()
            if n in self.nodes
            for v in seen.values()
        )
        # if two nodes updated the state at the same time, it's ambiguous
        if last_seen_by_node:
            if len(last_seen_by_node) == 1:
                as_node = last_seen_by_node[0][1]
            elif last_seen_by_node[-1][0] != last_seen_by_node[-2][0]:
                as_node = last_seen_by_node[-1][1]
    if as_node is None:
        raise InvalidUpdateError("Ambiguous update, specify as_node")
    if as_node not in self.nodes:
        raise InvalidUpdateError(f"Node {as_node} does not exist")
    # update channels
    with ChannelsManager(self.channels, checkpoint, config) as (
        channels,
        managed,
    ):
        # create task to run all writers of the chosen node
        writers = self.nodes[as_node].flat_writers
        if not writers:
            raise InvalidUpdateError(f"Node {as_node} has no writers")
        writes = deque()
        task = PregelTaskWrites(as_node, writes, [INTERRUPT])
        task_id = str(uuid5(UUID(checkpoint["id"]), INTERRUPT))
        run = RunnableSequence(*writers) if len(writers) > 1 else writers[0]
        # execute task
        run.invoke(
            values,
            patch_config(
                config,
                run_name=self.name + "UpdateState",
                configurable={
                    # deque.extend is thread-safe
                    CONFIG_KEY_SEND: partial(
                        local_write,
                        step + 1,
                        writes.extend,
                        self.nodes,
                        channels,
                        managed,
                    ),
                    CONFIG_KEY_READ: partial(
                        local_read,
                        step + 1,
                        checkpoint,
                        channels,
                        managed,
                        task,
                        config,
                    ),
                },
            ),
        )
        # save task writes
        if saved:
            checkpointer.put_writes(checkpoint_config, task.writes, task_id)
        # apply to checkpoint and save
        assert not apply_writes(
            checkpoint, channels, [task], checkpointer.get_next_version
        ), "Can't write to SharedValues from update_state"
        checkpoint = create_checkpoint(checkpoint, channels, step + 1)
        next_config = checkpointer.put(
            checkpoint_config,
            checkpoint,
            {
                "source": "update",
                "step": step + 1,
                "writes": {as_node: values},
                "parents": saved.metadata.get("parents", {}) if saved else {},
            },
            get_new_channel_versions(
                checkpoint_previous_versions, checkpoint["channel_versions"]
            ),
        )
        return patch_checkpoint_map(next_config, saved.metadata if saved else None)

stream(input, config=None, *, stream_mode=None, output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, subgraphs=False)

Stream graph steps for a single input.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input to the graph.

  • config (Optional[RunnableConfig], default: None ) –

    The configuration to use for the run.

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    The keys to stream, defaults to all non-context channels.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt before, defaults to all nodes in the graph.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt after, defaults to all nodes in the graph.

  • debug (Optional[bool], default: None ) –

    Whether to print debug information during execution, defaults to False.

  • subgraphs (bool, default: False ) –

    Whether to stream subgraphs, defaults to False.

Yields:

  • Union[dict[str, Any], Any]

    The output of each step in the graph. The output shape depends on the stream_mode.

Examples:

Using different stream modes with a graph:

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
With stream_mode="values":

>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
With stream_mode="updates":

>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
With stream_mode="debug":

>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Source code in libs/langgraph/langgraph/pregel/__init__.py
def stream(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None,
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    subgraphs: bool = False,
) -> Iterator[Union[dict[str, Any], Any]]:
    """Stream graph steps for a single input.

    Args:
        input: The input to the graph.
        config: The configuration to use for the run.
        stream_mode: The mode to stream output, defaults to self.stream_mode.
            Options are 'values', 'updates', and 'debug'.
            values: Emit the current values of the state for each step.
            updates: Emit only the updates to the state for each step.
                Output is a dict with the node name as key and the updated values as value.
            debug: Emit debug events for each step.
        output_keys: The keys to stream, defaults to all non-context channels.
        interrupt_before: Nodes to interrupt before, defaults to all nodes in the graph.
        interrupt_after: Nodes to interrupt after, defaults to all nodes in the graph.
        debug: Whether to print debug information during execution, defaults to False.
        subgraphs: Whether to stream subgraphs, defaults to False.

    Yields:
        The output of each step in the graph. The output shape depends on the stream_mode.

    Examples:
        Using different stream modes with a graph:
        ```pycon
        >>> import operator
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.graph import StateGraph
        >>> from langgraph.constants import START
        ...
        >>> class State(TypedDict):
        ...     alist: Annotated[list, operator.add]
        ...     another_list: Annotated[list, operator.add]
        ...
        >>> builder = StateGraph(State)
        >>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
        >>> builder.add_node("b", lambda _state: {"alist": ["there"]})
        >>> builder.add_edge("a", "b")
        >>> builder.add_edge(START, "a")
        >>> graph = builder.compile()
        ```
        With stream_mode="values":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
        ...     print(event)
        {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
        {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
        {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
        ```
        With stream_mode="updates":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
        ...     print(event)
        {'a': {'another_list': ['hi']}}
        {'b': {'alist': ['there']}}
        ```
        With stream_mode="debug":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
        ...     print(event)
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
        ```
    """

    stream = SyncQueue()

    def output() -> Iterator:
        while True:
            try:
                ns, mode, payload = stream.get(block=False)
            except queue.Empty:
                break
            if subgraphs and isinstance(stream_mode, list):
                yield (ns, mode, payload)
            elif isinstance(stream_mode, list):
                yield (mode, payload)
            elif subgraphs:
                yield (ns, payload)
            else:
                yield payload

    config = ensure_config(self.config, config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(
        None,
        input,
        name=config.get("run_name", self.get_name()),
        run_id=config.get("run_id"),
    )
    try:
        if config["recursion_limit"] < 1:
            raise ValueError("recursion_limit must be at least 1")
        if self.checkpointer and not config.get("configurable"):
            raise ValueError(
                f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}"
            )
        # assign defaults
        (
            debug,
            stream_modes,
            output_keys,
            interrupt_before,
            interrupt_after,
            checkpointer,
        ) = self._defaults(
            config,
            stream_mode=stream_mode,
            output_keys=output_keys,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            debug=debug,
        )
        # set up messages stream mode
        if "messages" in stream_modes:
            run_manager.inheritable_handlers.append(
                StreamMessagesHandler(stream.put)
            )
        with SyncPregelLoop(
            input,
            stream=StreamProtocol(stream.put, stream_modes),
            config=config,
            store=self.store,
            checkpointer=checkpointer,
            nodes=self.nodes,
            specs=self.channels,
            output_keys=output_keys,
            stream_keys=self.stream_channels_asis,
            debug=debug,
        ) as loop:
            # create runner
            runner = PregelRunner(
                submit=loop.submit,
                put_writes=loop.put_writes,
            )
            # enable subgraph streaming
            if subgraphs:
                loop.config["configurable"][CONFIG_KEY_STREAM] = loop.stream
            # enable concurrent streaming
            if subgraphs or "messages" in stream_modes:
                # we are careful to have a single waiter live at any one time
                # because on exit we increment semaphore count by exactly 1
                waiter: Optional[concurrent.futures.Future] = None
                # because sync futures cannot be cancelled, we instead
                # release the stream semaphore on exit, which will cause
                # a pending waiter to return immediately
                loop.stack.callback(stream._count.release)

                def get_waiter() -> asyncio.Task[None]:
                    nonlocal waiter
                    if waiter is None or waiter.done():
                        waiter = loop.submit(stream.wait)
                        return waiter
                    else:
                        return waiter
            else:
                get_waiter = None
            # Similarly to Bulk Synchronous Parallel / Pregel model
            # computation proceeds in steps, while there are channel updates
            # channel updates from step N are only visible in step N+1
            # channels are guaranteed to be immutable for the duration of the step,
            # with channel updates applied only at the transition between steps
            while loop.tick(
                input_keys=self.input_channels,
                interrupt_before=interrupt_before,
                interrupt_after=interrupt_after,
                manager=run_manager,
            ):
                for _ in runner.tick(
                    loop.tasks.values(),
                    timeout=self.step_timeout,
                    retry_policy=self.retry_policy,
                    get_waiter=get_waiter,
                ):
                    # emit output
                    yield from output()
        # emit output
        yield from output()
        # handle exit
        if loop.status == "out_of_steps":
            raise GraphRecursionError(
                f"Recursion limit of {config['recursion_limit']} reached "
                "without hitting a stop condition. You can increase the "
                "limit by setting the `recursion_limit` config key."
            )
        # set final channel values as run output
        run_manager.on_chain_end(loop.output)
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise

astream(input, config=None, *, stream_mode=None, output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, subgraphs=False) async

Stream graph steps for a single input.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input to the graph.

  • config (Optional[RunnableConfig], default: None ) –

    The configuration to use for the run.

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    The keys to stream, defaults to all non-context channels.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt before, defaults to all nodes in the graph.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt after, defaults to all nodes in the graph.

  • debug (Optional[bool], default: None ) –

    Whether to print debug information during execution, defaults to False.

  • subgraphs (bool, default: False ) –

    Whether to stream subgraphs, defaults to False.

Yields:

Examples:

Using different stream modes with a graph:

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
With stream_mode="values":

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
With stream_mode="updates":

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
With stream_mode="debug":

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Source code in libs/langgraph/langgraph/pregel/__init__.py
async def astream(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None,
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    subgraphs: bool = False,
) -> AsyncIterator[Union[dict[str, Any], Any]]:
    """Stream graph steps for a single input.

    Args:
        input: The input to the graph.
        config: The configuration to use for the run.
        stream_mode: The mode to stream output, defaults to self.stream_mode.
            Options are 'values', 'updates', and 'debug'.
            values: Emit the current values of the state for each step.
            updates: Emit only the updates to the state for each step.
                Output is a dict with the node name as key and the updated values as value.
            debug: Emit debug events for each step.
        output_keys: The keys to stream, defaults to all non-context channels.
        interrupt_before: Nodes to interrupt before, defaults to all nodes in the graph.
        interrupt_after: Nodes to interrupt after, defaults to all nodes in the graph.
        debug: Whether to print debug information during execution, defaults to False.
        subgraphs: Whether to stream subgraphs, defaults to False.

    Yields:
        The output of each step in the graph. The output shape depends on the stream_mode.

    Examples:
        Using different stream modes with a graph:
        ```pycon
        >>> import operator
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.graph import StateGraph
        >>> from langgraph.constants import START
        ...
        >>> class State(TypedDict):
        ...     alist: Annotated[list, operator.add]
        ...     another_list: Annotated[list, operator.add]
        ...
        >>> builder = StateGraph(State)
        >>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
        >>> builder.add_node("b", lambda _state: {"alist": ["there"]})
        >>> builder.add_edge("a", "b")
        >>> builder.add_edge(START, "a")
        >>> graph = builder.compile()
        ```
        With stream_mode="values":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
        ...     print(event)
        {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
        {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
        {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
        ```
        With stream_mode="updates":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
        ...     print(event)
        {'a': {'another_list': ['hi']}}
        {'b': {'alist': ['there']}}
        ```
        With stream_mode="debug":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
        ...     print(event)
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
        ```
    """

    stream = AsyncQueue()
    aioloop = asyncio.get_running_loop()

    def output() -> Iterator:
        while True:
            try:
                ns, mode, payload = stream.get_nowait()
            except asyncio.QueueEmpty:
                break
            if subgraphs and isinstance(stream_mode, list):
                yield (ns, mode, payload)
            elif isinstance(stream_mode, list):
                yield (mode, payload)
            elif subgraphs:
                yield (ns, payload)
            else:
                yield payload

    if subgraphs:

        def get_waiter() -> asyncio.Task[None]:
            return aioloop.create_task(stream.wait())
    else:
        get_waiter = None

    config = ensure_config(self.config, config)
    callback_manager = get_async_callback_manager_for_config(config)
    run_manager = await callback_manager.on_chain_start(
        None,
        input,
        name=config.get("run_name", self.get_name()),
        run_id=config.get("run_id"),
    )
    # if running from astream_log() run each proc with streaming
    do_stream = next(
        (
            h
            for h in run_manager.handlers
            if isinstance(h, _StreamingCallbackHandler)
        ),
        None,
    )
    try:
        if config["recursion_limit"] < 1:
            raise ValueError("recursion_limit must be at least 1")
        if self.checkpointer and not config.get("configurable"):
            raise ValueError(
                f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}"
            )
        # assign defaults
        (
            debug,
            stream_modes,
            output_keys,
            interrupt_before,
            interrupt_after,
            checkpointer,
        ) = self._defaults(
            config,
            stream_mode=stream_mode,
            output_keys=output_keys,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            debug=debug,
        )
        async with AsyncPregelLoop(
            input,
            stream=StreamProtocol(stream.put_nowait, stream_modes),
            config=config,
            store=self.store,
            checkpointer=checkpointer,
            nodes=self.nodes,
            specs=self.channels,
            output_keys=output_keys,
            stream_keys=self.stream_channels_asis,
        ) as loop:
            # create runner
            runner = PregelRunner(
                submit=loop.submit,
                put_writes=loop.put_writes,
                use_astream=do_stream is not None,
            )
            # enable subgraph streaming
            if subgraphs:
                loop.config["configurable"][CONFIG_KEY_STREAM] = loop.stream
            # Similarly to Bulk Synchronous Parallel / Pregel model
            # computation proceeds in steps, while there are channel updates
            # channel updates from step N are only visible in step N+1
            # channels are guaranteed to be immutable for the duration of the step,
            # with channel updates applied only at the transition between steps
            while loop.tick(
                input_keys=self.input_channels,
                interrupt_before=interrupt_before,
                interrupt_after=interrupt_after,
                manager=run_manager,
            ):
                async for _ in runner.atick(
                    loop.tasks.values(),
                    timeout=self.step_timeout,
                    retry_policy=self.retry_policy,
                    get_waiter=get_waiter,
                ):
                    # emit output
                    for o in output():
                        yield o
        # emit output
        for o in output():
            yield o
        # handle exit
        if loop.status == "out_of_steps":
            raise GraphRecursionError(
                f"Recursion limit of {config['recursion_limit']} reached "
                "without hitting a stop condition. You can increase the "
                "limit by setting the `recursion_limit` config key."
            )
        # set final channel values as run output
        await run_manager.on_chain_end(loop.output)
    except BaseException as e:
        await asyncio.shield(run_manager.on_chain_error(e))
        raise

invoke(input, config=None, *, stream_mode='values', output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs)

Run the graph with a single input and config.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input data for the graph. It can be a dictionary or any other type.

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the graph run.

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]. The stream mode for the graph run. Default is "values".

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to retrieve from the graph run.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run before.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run after.

  • debug (Optional[bool], default: None ) –

    Optional. Enable debug mode for the graph run.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to the graph run.

Returns:

  • Union[dict[str, Any], Any]

    The output of the graph run. If stream_mode is "values", it returns the latest output.

  • Union[dict[str, Any], Any]

    If stream_mode is not "values", it returns a list of output chunks.

Source code in libs/langgraph/langgraph/pregel/__init__.py
def invoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Run the graph with a single input and config.

    Args:
        input: The input data for the graph. It can be a dictionary or any other type.
        config: Optional. The configuration for the graph run.
        stream_mode: Optional[str]. The stream mode for the graph run. Default is "values".
        output_keys: Optional. The output keys to retrieve from the graph run.
        interrupt_before: Optional. The nodes to interrupt the graph run before.
        interrupt_after: Optional. The nodes to interrupt the graph run after.
        debug: Optional. Enable debug mode for the graph run.
        **kwargs: Additional keyword arguments to pass to the graph run.

    Returns:
        The output of the graph run. If stream_mode is "values", it returns the latest output.
        If stream_mode is not "values", it returns a list of output chunks.
    """
    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    for chunk in self.stream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

ainvoke(input, config=None, *, stream_mode='values', output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs) async

Asynchronously invoke the graph on a single input.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input data for the computation. It can be a dictionary or any other type.

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the computation.

  • stream_mode (StreamMode, default: 'values' ) –

    Optional. The stream mode for the computation. Default is "values".

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to include in the result. Default is None.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt before. Default is None.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt after. Default is None.

  • debug (Optional[bool], default: None ) –

    Optional. Whether to enable debug mode. Default is None.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments.

Returns:

  • Union[dict[str, Any], Any]

    The result of the computation. If stream_mode is "values", it returns the latest value.

  • Union[dict[str, Any], Any]

    If stream_mode is "chunks", it returns a list of chunks.

Source code in libs/langgraph/langgraph/pregel/__init__.py
async def ainvoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Asynchronously invoke the graph on a single input.

    Args:
        input: The input data for the computation. It can be a dictionary or any other type.
        config: Optional. The configuration for the computation.
        stream_mode: Optional. The stream mode for the computation. Default is "values".
        output_keys: Optional. The output keys to include in the result. Default is None.
        interrupt_before: Optional. The nodes to interrupt before. Default is None.
        interrupt_after: Optional. The nodes to interrupt after. Default is None.
        debug: Optional. Whether to enable debug mode. Default is None.
        **kwargs: Additional keyword arguments.

    Returns:
        The result of the computation. If stream_mode is "values", it returns the latest value.
        If stream_mode is "chunks", it returns a list of chunks.
    """

    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    async for chunk in self.astream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

get_graph(config=None, *, xray=False)

Returns a drawable representation of the computation graph.

Source code in libs/langgraph/langgraph/graph/graph.py
def get_graph(
    self,
    config: Optional[RunnableConfig] = None,
    *,
    xray: Union[int, bool] = False,
) -> DrawableGraph:
    """Returns a drawable representation of the computation graph."""
    graph = DrawableGraph()
    start_nodes: dict[str, DrawableNode] = {
        START: graph.add_node(self.get_input_schema(config), START)
    }
    end_nodes: dict[str, DrawableNode] = {}
    if xray:
        subgraphs = dict(self.get_subgraphs())
    else:
        subgraphs = {}

    def add_edge(
        start: str, end: str, label: Optional[str] = None, conditional: bool = False
    ) -> None:
        if end == END and END not in end_nodes:
            end_nodes[END] = graph.add_node(self.get_output_schema(config), END)
        return graph.add_edge(
            start_nodes[start], end_nodes[end], label, conditional
        )

    for key, n in self.builder.nodes.items():
        node = n.runnable
        metadata = n.metadata or {}
        if key in self.interrupt_before_nodes and key in self.interrupt_after_nodes:
            metadata["__interrupt"] = "before,after"
        elif key in self.interrupt_before_nodes:
            metadata["__interrupt"] = "before"
        elif key in self.interrupt_after_nodes:
            metadata["__interrupt"] = "after"
        if xray:
            subgraph = (
                subgraphs[key].get_graph(
                    config=config,
                    xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                )
                if key in subgraphs
                else node.get_graph(config=config)
            )
            subgraph.trim_first_node()
            subgraph.trim_last_node()
            if len(subgraph.nodes) > 1:
                end_nodes[key], start_nodes[key] = graph.extend(
                    subgraph, prefix=key
                )
            else:
                n = graph.add_node(node, key, metadata=metadata or None)
                start_nodes[key] = n
                end_nodes[key] = n
        else:
            n = graph.add_node(node, key, metadata=metadata or None)
            start_nodes[key] = n
            end_nodes[key] = n
    for start, end in sorted(self.builder._all_edges):
        add_edge(start, end)
    for start, branches in self.builder.branches.items():
        default_ends = {
            **{k: k for k in self.builder.nodes if k != start},
            END: END,
        }
        for _, branch in branches.items():
            if branch.ends is not None:
                ends = branch.ends
            elif branch.then is not None:
                ends = {k: k for k in default_ends if k not in (END, branch.then)}
            else:
                ends = default_ends
            for label, end in ends.items():
                add_edge(
                    start,
                    end,
                    label if label != end else None,
                    conditional=True,
                )
                if branch.then is not None:
                    add_edge(end, branch.then)

    return graph

StreamMode

How the stream method should emit outputs.

  • 'values': Emit all values of the state for each step.
  • 'updates': Emit only the node name(s) and updates that were returned by the node(s) after each step.
  • 'debug': Emit debug events for each step.
  • 'messages': Emit LLM messages token-by-token.

Constants

The following constants and classes are used to help control graph execution.

START

START is a string constant ("__start__") that serves as a "virtual" node in the graph. Adding an edge (or conditional edges) from START to node one or more nodes in your graph will direct the graph to begin execution there.

from langgraph.graph import START
...
builder.add_edge(START, "my_node")
# Or to add a conditional starting point
builder.add_conditional_edges(START, my_condition)

END

END is a string constant ("__end__") that serves as a "virtual" node in the graph. Adding an edge (or conditional edges) from one or more nodes in your graph to the END "node" will direct the graph to cease execution as soon as it reaches this point.

from langgraph.graph import END
...
builder.add_edge("my_node", END) # Stop any time my_node completes
# Or to conditionally terminate
def my_condition(state):
    if state["should_stop"]:
        return END
    return "my_node"
builder.add_conditional_edges("my_node", my_condition)

Send

A message or packet to send to a specific node in the graph.

The Send class is used within a StateGraph's conditional edges to dynamically invoke a node with a custom state at the next step.

Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management.

One such example is a "map-reduce" workflow where your graph invokes the same node multiple times in parallel with different states, before aggregating the results back into the main graph's state.

Attributes:

  • node (str) –

    The name of the target node to send the message to.

  • arg (Any) –

    The state or message to send to the target node.

Examples:

>>> from typing import Annotated
>>> import operator
>>> class OverallState(TypedDict):
...     subjects: list[str]
...     jokes: Annotated[list[str], operator.add]
...
>>> from langgraph.constants import Send
>>> from langgraph.graph import END, START
>>> def continue_to_jokes(state: OverallState):
...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
...
>>> from langgraph.graph import StateGraph
>>> builder = StateGraph(OverallState)
>>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
>>> builder.add_conditional_edges(START, continue_to_jokes)
>>> builder.add_edge("generate_joke", END)
>>> graph = builder.compile()
>>>
>>> # Invoking with two subjects results in a generated joke for each
>>> graph.invoke({"subjects": ["cats", "dogs"]})
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
Source code in libs/langgraph/langgraph/constants.py
class Send:
    """A message or packet to send to a specific node in the graph.

    The `Send` class is used within a `StateGraph`'s conditional edges to
    dynamically invoke a node with a custom state at the next step.

    Importantly, the sent state can differ from the core graph's state,
    allowing for flexible and dynamic workflow management.

    One such example is a "map-reduce" workflow where your graph invokes
    the same node multiple times in parallel with different states,
    before aggregating the results back into the main graph's state.

    Attributes:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.

    Examples:
        >>> from typing import Annotated
        >>> import operator
        >>> class OverallState(TypedDict):
        ...     subjects: list[str]
        ...     jokes: Annotated[list[str], operator.add]
        ...
        >>> from langgraph.constants import Send
        >>> from langgraph.graph import END, START
        >>> def continue_to_jokes(state: OverallState):
        ...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
        ...
        >>> from langgraph.graph import StateGraph
        >>> builder = StateGraph(OverallState)
        >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
        >>> builder.add_conditional_edges(START, continue_to_jokes)
        >>> builder.add_edge("generate_joke", END)
        >>> graph = builder.compile()
        >>>
        >>> # Invoking with two subjects results in a generated joke for each
        >>> graph.invoke({"subjects": ["cats", "dogs"]})
        {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
    """

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        """
        Initialize a new instance of the Send class.

        Args:
            node (str): The name of the target node to send the message to.
            arg (Any): The state or message to send to the target node.
        """
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

__init__(node, arg)

Initialize a new instance of the Send class.

Parameters:

  • node (str) –

    The name of the target node to send the message to.

  • arg (Any) –

    The state or message to send to the target node.

Source code in libs/langgraph/langgraph/constants.py
def __init__(self, /, node: str, arg: Any) -> None:
    """
    Initialize a new instance of the Send class.

    Args:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.
    """
    self.node = node
    self.arg = arg

RetryPolicy

Bases: NamedTuple

Configuration for retrying nodes.

Source code in libs/langgraph/langgraph/pregel/types.py
class RetryPolicy(NamedTuple):
    """Configuration for retrying nodes."""

    initial_interval: float = 0.5
    """Amount of time that must elapse before the first retry occurs. In seconds."""
    backoff_factor: float = 2.0
    """Multiplier by which the interval increases after each retry."""
    max_interval: float = 128.0
    """Maximum amount of time that may elapse between retries. In seconds."""
    max_attempts: int = 3
    """Maximum number of attempts to make before giving up, including the first."""
    jitter: bool = True
    """Whether to add random jitter to the interval between retries."""
    retry_on: Union[
        Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]
    ] = default_retry_on
    """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry."""

initial_interval: float = 0.5 class-attribute instance-attribute

Amount of time that must elapse before the first retry occurs. In seconds.

backoff_factor: float = 2.0 class-attribute instance-attribute

Multiplier by which the interval increases after each retry.

max_interval: float = 128.0 class-attribute instance-attribute

Maximum amount of time that may elapse between retries. In seconds.

max_attempts: int = 3 class-attribute instance-attribute

Maximum number of attempts to make before giving up, including the first.

jitter: bool = True class-attribute instance-attribute

Whether to add random jitter to the interval between retries.

retry_on: Union[Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool]] = default_retry_on class-attribute instance-attribute

List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.

Comments