Graph Definitions¶
StateGraph
¶
Bases: Graph
A graph whose nodes communicate by reading and writing to a shared state.
The signature of each node is State -> Partial
Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state_schema
|
Optional[type[Any]]
|
The schema class that defines the state. |
None
|
config_schema
|
Optional[type[Any]]
|
The schema class that defines the configuration. Use this to expose configurable parameters in your API. |
None
|
Example
from langchain_core.runnables import RunnableConfig
from typing_extensions import Annotated, TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
def reducer(a: list, b: int | None) -> list:
if b is not None:
return a + [b]
return a
class State(TypedDict):
x: Annotated[list, reducer]
class ConfigSchema(TypedDict):
r: float
graph = StateGraph(State, config_schema=ConfigSchema)
def node(state: State, config: RunnableConfig) -> dict:
r = config["configurable"].get("r", 1.0)
x = state["x"][-1]
next_value = x * r * (1 - x)
return {"x": next_value}
graph.add_node("A", node)
graph.set_entry_point("A")
graph.set_finish_point("A")
compiled = graph.compile()
print(compiled.config_specs)
# [ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
# {'x': [0.5, 0.75]}
Methods:
Name | Description |
---|---|
add_node |
Add a new node to the state graph. |
add_edge |
Add a directed edge from the start node (or list of start nodes) to the end node. |
add_conditional_edges |
Add a conditional edge from the starting node to any number of destination nodes. |
add_sequence |
Add a sequence of nodes that will be executed in the provided order. |
compile |
Compiles the state graph into a |
add_node
¶
add_node(
node: Union[str, RunnableLike],
action: Optional[RunnableLike] = None,
*,
metadata: Optional[dict[str, Any]] = None,
input: Optional[type[Any]] = None,
retry: Optional[
Union[RetryPolicy, Sequence[RetryPolicy]]
] = None,
destinations: Optional[
Union[dict[str, str], tuple[str, ...]]
] = None
) -> Self
Add a new node to the state graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
Union[str, RunnableLike]
|
The function or runnable this node will run. If a string is provided, it will be used as the node name, and action will be used as the function or runnable. |
required |
action
|
Optional[RunnableLike]
|
The action associated with the node. (default: None)
Will be used as the node function or runnable if |
None
|
metadata
|
Optional[dict[str, Any]]
|
The metadata associated with the node. (default: None) |
None
|
input
|
Optional[type[Any]]
|
The input schema for the node. (default: the graph's input schema) |
None
|
retry
|
Optional[Union[RetryPolicy, Sequence[RetryPolicy]]]
|
The policy for retrying the node. (default: None) If a sequence is provided, the first matching policy will be applied. |
None
|
destinations
|
Optional[Union[dict[str, str], tuple[str, ...]]]
|
Destinations that indicate where a node can route to.
This is useful for edgeless graphs with nodes that return |
None
|
Raises: ValueError: If the key is already being used as a state key.
Example
Customize the name:
Returns:
Name | Type | Description |
---|---|---|
Self |
Self
|
The instance of the state graph, allowing for method chaining. |
add_edge
¶
Add a directed edge from the start node (or list of start nodes) to the end node.
When a single start node is provided, the graph will wait for that node to complete before executing the end node. When multiple start nodes are provided, the graph will wait for ALL of the start nodes to complete before executing the end node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start_key
|
Union[str, list[str]]
|
The key(s) of the start node(s) of the edge. |
required |
end_key
|
str
|
The key of the end node of the edge. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the start key is 'END' or if the start key or end key is not present in the graph. |
Returns:
Name | Type | Description |
---|---|---|
Self |
Self
|
The instance of the state graph, allowing for method chaining. |
add_conditional_edges
¶
add_conditional_edges(
source: str,
path: Union[
Callable[..., Union[Hashable, list[Hashable]]],
Callable[
..., Awaitable[Union[Hashable, list[Hashable]]]
],
Runnable[Any, Union[Hashable, list[Hashable]]],
],
path_map: Optional[
Union[dict[Hashable, str], list[str]]
] = None,
then: Optional[str] = None,
) -> Self
Add a conditional edge from the starting node to any number of destination nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
str
|
The starting node. This conditional edge will run when exiting this node. |
required |
path
|
Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]]
|
The callable that determines the next
node or nodes. If not specifying |
required |
path_map
|
Optional[Union[dict[Hashable, str], list[str]]]
|
Optional mapping of paths to node
names. If omitted the paths returned by |
None
|
then
|
Optional[str]
|
The name of a node to execute after the nodes
selected by |
None
|
Returns:
Name | Type | Description |
---|---|---|
Self |
Self
|
The instance of the graph, allowing for method chaining. |
Without typehints on the path
function's return value (e.g., -> Literal["foo", "__end__"]:
)
or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
add_sequence
¶
Add a sequence of nodes that will be executed in the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes
|
Sequence[Union[RunnableLike, tuple[str, RunnableLike]]]
|
A sequence of RunnableLike objects (e.g. a LangChain Runnable or a callable) or (name, RunnableLike) tuples. If no names are provided, the name will be inferred from the node object (e.g. a runnable or a callable name). Each node will be executed in the order provided. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
if the sequence is empty. |
ValueError
|
if the sequence contains duplicate node names. |
Returns:
Name | Type | Description |
---|---|---|
Self |
Self
|
The instance of the state graph, allowing for method chaining. |
compile
¶
compile(
checkpointer: Checkpointer = None,
*,
store: Optional[BaseStore] = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
debug: bool = False,
name: Optional[str] = None
) -> CompiledStateGraph
Compiles the state graph into a CompiledStateGraph
object.
The compiled graph implements the Runnable
interface and can be invoked,
streamed, batched, and run asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checkpointer
|
Checkpointer
|
A checkpoint saver object or flag. If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph, allowing it to be paused, resumed, and replayed from any point. If None, it may inherit the parent graph's checkpointer when used as a subgraph. If False, it will not use or inherit any checkpointer. |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
An optional list of node names to interrupt before. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
An optional list of node names to interrupt after. |
None
|
debug
|
bool
|
A flag indicating whether to enable debug mode. |
False
|
name
|
Optional[str]
|
The name to use for the compiled graph. |
None
|
Returns:
Name | Type | Description |
---|---|---|
CompiledStateGraph |
CompiledStateGraph
|
The compiled state graph. |
CompiledStateGraph
¶
Bases: CompiledGraph
Methods:
Name | Description |
---|---|
stream |
Stream graph steps for a single input. |
astream |
Asynchronously stream graph steps for a single input. |
invoke |
Run the graph with a single input and config. |
ainvoke |
Asynchronously invoke the graph on a single input. |
get_state |
Get the current state of the graph. |
aget_state |
Get the current state of the graph. |
get_state_history |
Get the history of the state of the graph. |
aget_state_history |
Asynchronously get the history of the state of the graph. |
update_state |
Update the state of the graph with the given values, as if they came from |
aupdate_state |
Asynchronously update the state of the graph with the given values, as if they came from |
bulk_update_state |
Apply updates to the graph state in bulk. Requires a checkpointer to be set. |
abulk_update_state |
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set. |
get_graph |
Return a drawable representation of the computation graph. |
aget_graph |
Return a drawable representation of the computation graph. |
get_subgraphs |
Get the subgraphs of the graph. |
aget_subgraphs |
Get the subgraphs of the graph. |
with_config |
Create a copy of the Pregel object with an updated config. |
stream
¶
stream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]
Stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
def node_a(state: State):
response = llm.invoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
astream
async
¶
astream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]
Asynchronously stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
AsyncIterator[dict[str, Any] | Any]
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
async def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
async def node_a(state: State):
response = await llm.ainvoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
invoke
¶
invoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Run the graph with a single input and config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the graph. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the graph run. |
None
|
stream_mode
|
StreamMode
|
Optional[str]. The stream mode for the graph run. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to retrieve from the graph run. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run before. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run after. |
None
|
debug
|
bool | None
|
Optional. Enable debug mode for the graph run. |
None
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the graph run. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of the graph run. If stream_mode is "values", it returns the latest output. |
dict[str, Any] | Any
|
If stream_mode is not "values", it returns a list of output chunks. |
ainvoke
async
¶
ainvoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Asynchronously invoke the graph on a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the computation. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the computation. |
None
|
stream_mode
|
StreamMode
|
Optional. The stream mode for the computation. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to include in the result. Default is None. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt before. Default is None. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt after. Default is None. |
None
|
debug
|
bool | None
|
Optional. Whether to enable debug mode. Default is None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The result of the computation. If stream_mode is "values", it returns the latest value. |
dict[str, Any] | Any
|
If stream_mode is "chunks", it returns a list of chunks. |
get_state
¶
get_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
aget_state
async
¶
aget_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
get_state_history
¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[StateSnapshot]
Get the history of the state of the graph.
aget_state_history
async
¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> AsyncIterator[StateSnapshot]
Asynchronously get the history of the state of the graph.
update_state
¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
) -> RunnableConfig
Update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
aupdate_state
async
¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any,
as_node: str | None = None,
) -> RunnableConfig
Asynchronously update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
bulk_update_state
¶
bulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
abulk_update_state
async
¶
abulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
get_graph
¶
get_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
aget_graph
async
¶
aget_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
get_subgraphs
¶
get_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
Iterator[tuple[str, PregelProtocol]]
|
Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
aget_subgraphs
async
¶
aget_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
AsyncIterator[tuple[str, PregelProtocol]]
|
AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Self
Create a copy of the Pregel object with an updated config.
Graph
¶
Methods:
Name | Description |
---|---|
add_node |
Add a new node to the graph. |
add_edge |
Add a directed edge from the start node to the end node. |
add_conditional_edges |
Add a conditional edge from the starting node to any number of destination nodes. |
compile |
Compiles the graph into a |
add_node
¶
add_node(
node: Union[str, RunnableLike],
action: Optional[RunnableLike] = None,
*,
metadata: Optional[dict[str, Any]] = None
) -> Self
Add a new node to the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node
|
Union[str, RunnableLike]
|
The function or runnable this node will run. If a string is provided, it will be used as the node name, and action will be used as the function or runnable. |
required |
action
|
Optional[RunnableLike]
|
The action associated with the node. (default: None)
Will be used as the node function or runnable if |
None
|
metadata
|
Optional[dict[str, Any]]
|
The metadata associated with the node. (default: None) |
None
|
add_edge
¶
add_conditional_edges
¶
add_conditional_edges(
source: str,
path: Union[
Callable[..., Union[Hashable, list[Hashable]]],
Callable[
..., Awaitable[Union[Hashable, list[Hashable]]]
],
Runnable[Any, Union[Hashable, list[Hashable]]],
],
path_map: Optional[
Union[dict[Hashable, str], list[str]]
] = None,
then: Optional[str] = None,
) -> Self
Add a conditional edge from the starting node to any number of destination nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
str
|
The starting node. This conditional edge will run when exiting this node. |
required |
path
|
Union[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]]
|
The callable that determines the next
node or nodes. If not specifying |
required |
path_map
|
Optional[Union[dict[Hashable, str], list[str]]]
|
Optional mapping of paths to node
names. If omitted the paths returned by |
None
|
then
|
Optional[str]
|
The name of a node to execute after the nodes
selected by |
None
|
Returns:
Name | Type | Description |
---|---|---|
Self |
Self
|
The instance of the graph, allowing for method chaining. |
Without typehints on the path
function's return value (e.g., -> Literal["foo", "__end__"]:
)
or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
compile
¶
compile(
checkpointer: Checkpointer = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
debug: bool = False,
name: Optional[str] = None,
) -> CompiledGraph
Compiles the graph into a CompiledGraph
object.
The compiled graph implements the Runnable
interface and can be invoked,
streamed, batched, and run asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checkpointer
|
Checkpointer
|
A checkpoint saver object or flag. If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph, allowing it to be paused, resumed, and replayed from any point. If None, it may inherit the parent graph's checkpointer when used as a subgraph. If False, it will not use or inherit any checkpointer. |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
An optional list of node names to interrupt before. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
An optional list of node names to interrupt after. |
None
|
debug
|
bool
|
A flag indicating whether to enable debug mode. |
False
|
name
|
Optional[str]
|
The name to use for the compiled graph. |
None
|
Returns:
Name | Type | Description |
---|---|---|
CompiledGraph |
CompiledGraph
|
The compiled graph. |
CompiledGraph
¶
Bases: Pregel
Methods:
Name | Description |
---|---|
stream |
Stream graph steps for a single input. |
astream |
Asynchronously stream graph steps for a single input. |
invoke |
Run the graph with a single input and config. |
ainvoke |
Asynchronously invoke the graph on a single input. |
get_state |
Get the current state of the graph. |
aget_state |
Get the current state of the graph. |
get_state_history |
Get the history of the state of the graph. |
aget_state_history |
Asynchronously get the history of the state of the graph. |
update_state |
Update the state of the graph with the given values, as if they came from |
aupdate_state |
Asynchronously update the state of the graph with the given values, as if they came from |
bulk_update_state |
Apply updates to the graph state in bulk. Requires a checkpointer to be set. |
abulk_update_state |
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set. |
get_graph |
Return a drawable representation of the computation graph. |
aget_graph |
Return a drawable representation of the computation graph. |
get_subgraphs |
Get the subgraphs of the graph. |
aget_subgraphs |
Get the subgraphs of the graph. |
with_config |
Create a copy of the Pregel object with an updated config. |
stream
¶
stream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]
Stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
def node_a(state: State):
response = llm.invoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
astream
async
¶
astream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]
Asynchronously stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
AsyncIterator[dict[str, Any] | Any]
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
async def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
async def node_a(state: State):
response = await llm.ainvoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
invoke
¶
invoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Run the graph with a single input and config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the graph. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the graph run. |
None
|
stream_mode
|
StreamMode
|
Optional[str]. The stream mode for the graph run. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to retrieve from the graph run. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run before. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run after. |
None
|
debug
|
bool | None
|
Optional. Enable debug mode for the graph run. |
None
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the graph run. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of the graph run. If stream_mode is "values", it returns the latest output. |
dict[str, Any] | Any
|
If stream_mode is not "values", it returns a list of output chunks. |
ainvoke
async
¶
ainvoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Asynchronously invoke the graph on a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the computation. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the computation. |
None
|
stream_mode
|
StreamMode
|
Optional. The stream mode for the computation. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to include in the result. Default is None. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt before. Default is None. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt after. Default is None. |
None
|
debug
|
bool | None
|
Optional. Whether to enable debug mode. Default is None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The result of the computation. If stream_mode is "values", it returns the latest value. |
dict[str, Any] | Any
|
If stream_mode is "chunks", it returns a list of chunks. |
get_state
¶
get_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
aget_state
async
¶
aget_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
get_state_history
¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[StateSnapshot]
Get the history of the state of the graph.
aget_state_history
async
¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> AsyncIterator[StateSnapshot]
Asynchronously get the history of the state of the graph.
update_state
¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
) -> RunnableConfig
Update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
aupdate_state
async
¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any,
as_node: str | None = None,
) -> RunnableConfig
Asynchronously update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
bulk_update_state
¶
bulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
abulk_update_state
async
¶
abulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
get_graph
¶
get_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
aget_graph
async
¶
aget_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
get_subgraphs
¶
get_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
Iterator[tuple[str, PregelProtocol]]
|
Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
aget_subgraphs
async
¶
aget_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
AsyncIterator[tuple[str, PregelProtocol]]
|
AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Self
Create a copy of the Pregel object with an updated config.
Functions:
Name | Description |
---|---|
add_messages |
Merges two lists of messages, updating existing messages by ID. |
add_messages
¶
add_messages(
left: Messages,
right: Messages,
*,
format: Optional[Literal["langchain-openai"]] = None
) -> Messages
Merges two lists of messages, updating existing messages by ID.
By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
left
|
Messages
|
The base list of messages. |
required |
right
|
Messages
|
The list of messages (or single message) to merge into the base list. |
required |
format
|
Optional[Literal['langchain-openai']]
|
The format to return messages in. If None then messages will be returned as is. If 'langchain-openai' then messages will be returned as BaseMessage objects with their contents formatted to match OpenAI message format, meaning contents can be string, 'text' blocks, or 'image_url' blocks and tool responses are returned as their own ToolMessages. Requirement Must have |
None
|
Returns:
Type | Description |
---|---|
Messages
|
A new list of messages with the messages from |
Messages
|
If a message in |
Messages
|
message from |
Example
from langchain_core.messages import AIMessage, HumanMessage
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [AIMessage(content="Hi there!", id="2")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [HumanMessage(content="Hello again", id="1")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello again', id='1')]
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
messages: Annotated[list, add_messages]
builder = StateGraph(State)
builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({})
# {'messages': [AIMessage(content='Hello', id=...)]}
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, add_messages
class State(TypedDict):
messages: Annotated[list, add_messages(format='langchain-openai')]
def chatbot_node(state: State) -> list:
return {"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Here's an image:",
"cache_control": {"type": "ephemeral"},
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": "1234",
},
},
]
},
]}
builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({"messages": []})
# {
# 'messages': [
# HumanMessage(
# content=[
# {"type": "text", "text": "Here's an image:"},
# {
# "type": "image_url",
# "image_url": {"url": "data:image/jpeg;base64,1234"},
# },
# ],
# ),
# ]
# }