Low Level Conceptual Guide¶
Graphs¶
At its core, LangGraph models agent workflows as graphs. You define the behavior of your agents using three key components:
-
State
: A shared data structure that represents the current snapshot of your application. It can be any Python type, but is typically aTypedDict
or PydanticBaseModel
. -
Nodes
: Python functions that encode the logic of your agents. They receive the currentState
as input, perform some computation or side-effect, and return an updatedState
. -
Edges
: Python functions that determine whichNode
to execute next based on the currentState
. They can be conditional branches or fixed transitions.
By composing Nodes
and Edges
, you can create complex, looping workflows that evolve the State
over time. The real power, though, comes from how LangGraph manages that State
. To emphasize: Nodes
and Edges
are nothing more than Python functions - they can contain an LLM or just good ol' Python code.
In short: nodes do the work. edges tell what to do next.
LangGraph's underlying graph algorithm uses message passing to define a general program. When a Node
completes, it sends a message along one or more edges to other node(s). These nodes run their functions, pass the resulting messages to the next set of nodes, and on and on it goes. Inspired by Pregel, the program proceeds in discrete "super-steps" that are all executed conceptually in parallel. Whenever the graph is run, all the nodes start in an inactive
state. Whenever an incoming edge (or "channel") receives a new message (state), the node becomes active
, runs the function, and responds with updates. At the end of each superstep, each node votes to halt
by marking itself as inactive
if it has no more incoming messages. The graph terminates when all nodes are inactive
and when no messages are in transit.
StateGraph¶
The StateGraph
class is the main graph class to uses. This is parameterized by a user defined State
object.
MessageGraph¶
The MessageGraph
class is a special type of graph. The State
of a MessageGraph
is ONLY a list of messages. This class is rarely used except for chatbots, as most applications require the State
to be more complex than a list of messages.
Compiling your graph¶
To build your graph, you first define the state, you then add nodes and edges, and then you compile it. What exactly is compiling your graph and why is it needed?
Compiling is a pretty simple step. It provides a few basic checks on the structure of your graph (no orphaned nodes, etc). It is also where you can specify runtime args like checkpointers and breakpoints. You compile your graph by just calling the .compile
method:
You MUST compile your graph before you can use it.
State¶
The first thing you do when you define a graph is define the State
of the graph. The State
consists of the schema of the graph as well as reducer
functions which specify how to apply updates to the state. The schema of the State
will be the input schema to all Nodes
and Edges
in the graph, and can be either a TypedDict
or a Pydantic
model. All Nodes
will emit updates to the State
which are then applied using the specified reducer
function.
Schema¶
The main documented way to specify the schema of a graph is by using TypedDict
. However, we also support using a Pydantic BaseModel as your graph state to add default values and additional data validation.
Reducers¶
Reducers are key to understanding how updates from nodes are applied to the State
. Each key in the State
has its own independent reducer function. If no reducer function is explicitly specified then it is assumed that all updates to that key should override it. Let's take a look at a few examples to understand them better.
Example A:
In this example, no reducer functions are specified for any key. Let's assume the input to the graph is {"foo": 1, "bar": ["hi"]}
. Let's then assume the first Node
returns {"foo": 2}
. This is treated as an update to the state. Notice that the Node
does not need to return the whole State
schema - just an update. After applying this update, the State
would then be {"foo": 2, "bar": ["hi"]}
. If the second node returns {"bar": ["bye"]}
then the State
would then be {"foo": 2, "bar": ["bye"]}
Example B:
from typing import TypedDict, Annotated
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
In this example, we've used the Annotated
type to specify a reducer function (operator.add
) for the second key (bar
). Note that the first key remains unchanged. Let's assume the input to the graph is {"foo": 1, "bar": ["hi"]}
. Let's then assume the first Node
returns {"foo": 2}
. This is treated as an update to the state. Notice that the Node
does not need to return the whole State
schema - just an update. After applying this update, the State
would then be {"foo": 2, "bar": ["hi"]}
. If the second node returns {"bar": ["bye"]}
then the State
would then be {"foo": 2, "bar": ["hi", "bye"]}
. Notice here that the bar
key is updated by adding the two lists together.
MessageState¶
MessageState
is one of the few opinionated components in LangGraph. MessageState
is a special state designed to make it easy to use a list of messages as a key in your state. Specifically, MessageState
is defined as:
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated, TypedDict
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
What this is doing is creating a TypedDict
with a single key: messages
. This is a list of Message
objects, with add_messages
as a reducer. add_messages
basically adds messages to the existing list (it also does some nice extra things, like convert from OpenAI message format to the standard LangChain message format, handle updates based on message IDs, etc).
We often see a list of messages being a key component of state, so this prebuilt state is intended to make it easy to use messages. Typically, there is more state to track than just messages, so we see people subclass this state and add more fields, like:
Nodes¶
In LangGraph, nodes are typically python functions (sync or async
) where the first positional argument is the state, and (optionally), the second positional argument is a "config", containing optional configurable parameters (such as a thread_id
).
Similar to NetworkX
, you add these nodes to a graph using the add_node method:
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
builder = StateGraph(dict)
def my_node(state: dict, config: RunnableConfig):
print("In node: ", config["configurable"]["user_id"])
return {"results": f"Hello, {state['input']}!"}
# The second argument is optional
def my_other_node(state: dict):
return state
builder.add_node("my_node", my_node)
builder.add_node("other_node", my_other_node)
...
Behind the scenes, functions are converted to RunnableLambda's, which add batch and async support to your function, along with native tracing and debugging.
If you add a node to graph without specifying a name, it will be given a default name equivalent to the function name.
builder.add_node(my_node)
# You can then create edges to/from this node by referencing it as `"my_node"`
START
Node¶
The START
Node is a special node that represents the node sends user input to the graph. The main purpose for referencing this node is to determine which nodes should be called first.
END
Node¶
The END
Node is a special node that represents a terminal node. This node is referenced when you want to denote which edges have no actions after they are done.
Edges¶
Edges define how the logic is routed and how the graph decides to stop. This is a big part of how your agents work and how different nodes communicate with each other. There are a few key types of edges:
- Normal Edges: Go directly from one node to the next.
- Conditional Edges: Call a function to determine which node(s) to go to next.
- Entry Point: Which node to call first when user input arrives.
- Conditional Entry Point: Call a function to determine which node(s) to call first when user input arrives.
A node can have MULTIPLE outgoing edges. If a node has multiple out-going edges, all of those destination nodes will be executed in parallel as a part of the next superstep.
Normal Edges¶
If you always want to go from node A to node B, you can use the add_edge method directly.
Conditional Edges¶
If you want to optionally route to 1 or more edges (or optionally terminate), you can use the add_conditional_edges method. This method accepts the name of a node and a "routing function" to call after that node is executed:
Similar to nodes, the routing_function
accept the current state
of the graph and return a value.
By default, the return value routing_function
is used as the name of the node (or a list of nodes) to send the state to next. All those nodes will be run in parallel as a part of the next superstep.
You can optionally provide a dictionary that maps the routing_function
's output to the name of the next node.
Entry Point¶
The entry point is the first node(s) that are run when the graph starts. You can use the add_edge
method from the virtual START
node to the first node to execute to specify where to enter the graph.
Conditional Entry Point¶
A conditional entry point lets you start at different nodes depending on custom logic. You can use add_conditional_edges
from the virtual START
node to accomplish this.
You can optionally provide a dictionary that maps the routing_function
's output to the name of the next node.
Send
¶
By default, Nodes
and Edges
are defined ahead of time and operate on the same shared state. However, there can be cases where the exact edges are not known ahead of time and/or you may want different versions of State
to exist at the same time. A common of example of this is with map-reduce
design patterns. In this design pattern, a first node may generate a list of objects, and you may want to apply some other node to all those objects. The number of objects may be unknown ahead of time (meaning the number of edges may not be known) and the input State
to the downstream Node
should be different (one for each generated object).
To support this design pattern, LangGraph supports returning Send
objects from conditional edges. Send
takes two arguments: first is the name of the node, and second is the state to pass to that node.
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
graph.add_conditional_edges("node_a", continue_to_jokes)
Checkpointer¶
One of the main benefits of LangGraph is that it comes backed by a persistence layer. This is accomplished via checkpointers.
Checkpointers can be used to save a checkpoint of the state of a graph after all steps of the graph. This allows for several things.
First, it allows for human-in-the-loop workflows, as it allows humans to inspect, interrupt, and approve steps. Checkpointers are needed for these workflows as the human has to be able to view the state of a graph at any point in time, and the graph has to be to resume execution after the human has made any updates to the state.
Second, it allows for "memory" between interactions. You can use checkpointers to create threads and save the state of a thread after a graph executes. In the case of repeated human interactions (like conversations) any follow up messages can be sent to that checkpoint, which will retain its memory of previous ones.
See this guide for how to add a checkpointer to your graph.
Threads¶
When using a checkpointer, you must specify a thread_id
or thread_ts
when running the graph.
Threads are used to checkpoint multiple different runs. This can be used to enable a multi-tenant chat applications.
thread_id
is simply the ID of a thread. This is always required
thread_ts
can optionally be passed. This identifier refers to a specific checkpoint within a thread. This can be used to kick of a run of a graph from some point halfway through a thread.
You must pass these when invoking the graph as part of the configurable part of the config.
See this guide for how to use threads.
Checkpointer state¶
When you use a checkpointer with a graph, you can interact with the state of that graph. This usually done when enabling different human-in-the-loop interaction patterns. Each time you run the graph, the checkpointer creates several checkpoints every time a node or set of nodes finishes running. The most recent checkpoint is the current state of the thread. When interacting with the checkpointer state, you must specify a thread identifier.
Each checkpoint has two properties:
- values: This is the value of the state at this point in time.
- next: This is a tuple of the nodes to execute next in the graph.
Get state¶
You can get the state of a checkpointer by calling graph.get_state(config)
. The config should contain thread_id
, and the state will be fetched for that thread.
Get state history¶
You can also call graph.get_state_history(config)
to get a list of the history of the graph. The config should contain thread_id
, and the state history will be fetched for that thread.
Update state¶
You can also interact with the state directly and update it. This takes three different components:
- config
- values
as_node
config
The config should contain thread_id
specifying which thread to update.
values
These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the reducer functions that are part of the state. So this does NOT automatically overwrite the state. Let's walk through an example.
Let's assume you have defined the state of your graph as:
from typing import TypedDict, Annotated
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
Let's now assume the current state of the graph is
If you update the state as below:
Then the new state of the graph will be:
The foo
key is completely changed (because there is no reducer specified for that key, so it overwrites it). However, there is a reducer specified for the bar
key, and so it appends "b"
to the state of bar
.
as_node
The final thing you specify when calling update_state
is as_node
. This update will be applied as if it came from node as_node
. If as_node
is not provided, it will be set to the last node that updated the state, if not ambiguous.
The reason this matters is that the next steps in the graph to execute depend on the last node to have given an update, so this can be used to control which node executes next.
Configuration¶
When creating a graph, you can also mark that certain parts of the graph are configurable. This is commonly done to enable easily switching between models or system prompts. This allows you to create a single "cognitive architecture" (the graph) but have multiple different instance of it.
You can optionally specify a config_schema
when creating a graph.
You can then pass this configuration into the graph using the configurable
config field.
You can then access and use this configuration inside a node:
def node_a(state, config):
llm_type = config.get("configurable", {}).get("llm", "openai")
llm = get_llm(llm_type)
...
See this guide for a full breakdown on configuration
Breakpoints¶
It can often be useful to set breakpoints before or after certain nodes execute. This can be used to wait for human approval before continuing. These can be set when you "compile" a graph. You can set breakpoints either before a node executes (using interrupt_before
) or after a node executes (using interrupt_after
.)
You MUST use a checkpoiner when using breakpoints. This is because your graph needs to be able to resume execution.
In order to resume execution, you can just invoke your graph with None
as the input.
# Initial run of graph
graph.invoke(inputs, config=config)
# Let's assume it hit a breakpoint somewhere, you can then resume by passing in None
graph.invoke(None, config=config)
See this guide for a full walkthrough of how to add breakpoints.
Visualization¶
It's often nice to be able to visualize graphs, especially as they get more complex. LangGraph comes with several built-in ways to visualize graphs. See this how-to guide for more info.
Streaming¶
LangGraph is built with first class support for streaming. There are several different streaming modes that LangGraph supports:
"values"
: This streams the full value of the state after each step of the graph."updates
: This streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are streamed separately."debug"
: This streams as much information as possible throughout the execution of the graph.
In addition, you can use the astream_events
method to stream back events that happen inside nodes. This is useful for streaming tokens of LLM calls.