Skip to content

Low Level Conceptual Guide

Graphs

At its core, LangGraph models agent workflows as graphs. You define the behavior of your agents using three key components:

  1. State: A shared data structure that represents the current snapshot of your application. It can be any Python type, but is typically a TypedDict or Pydantic BaseModel.

  2. Nodes: Python functions that encode the logic of your agents. They receive the current State as input, perform some computation or side-effect, and return an updated State.

  3. Edges: Python functions that determine which Node to execute next based on the current State. They can be conditional branches or fixed transitions.

By composing Nodes and Edges, you can create complex, looping workflows that evolve the State over time. The real power, though, comes from how LangGraph manages that State. To emphasize: Nodes and Edges are nothing more than Python functions - they can contain an LLM or just good ol' Python code.

In short: nodes do the work. edges tell what to do next.

LangGraph's underlying graph algorithm uses message passing to define a general program. When a Node completes, it sends a message along one or more edges to other node(s). These nodes run their functions, pass the resulting messages to the next set of nodes, and on and on it goes. Inspired by Pregel, the program proceeds in discrete "super-steps" that are all executed conceptually in parallel. Whenever the graph is run, all the nodes start in an inactive state. Whenever an incoming edge (or "channel") receives a new message (state), the node becomes active, runs the function, and responds with updates. At the end of each superstep, each node votes to halt by marking itself as inactive if it has no more incoming messages. The graph terminates when all nodes are inactive and when no messages are in transit.

StateGraph

The StateGraph class is the main graph class to uses. This is parameterized by a user defined State object.

MessageGraph

The MessageGraph class is a special type of graph. The State of a MessageGraph is ONLY a list of messages. This class is rarely used except for chatbots, as most applications require the State to be more complex than a list of messages.

Compiling your graph

To build your graph, you first define the state, you then add nodes and edges, and then you compile it. What exactly is compiling your graph and why is it needed?

Compiling is a pretty simple step. It provides a few basic checks on the structure of your graph (no orphaned nodes, etc). It is also where you can specify runtime args like checkpointers and breakpoints. You compile your graph by just calling the .compile method:

graph = graph_builder.compile(...)

You MUST compile your graph before you can use it.

State

The first thing you do when you define a graph is define the State of the graph. The State consists of the schema of the graph as well as reducer functions which specify how to apply updates to the state. The schema of the State will be the input schema to all Nodes and Edges in the graph, and can be either a TypedDict or a Pydantic model. All Nodes will emit updates to the State which are then applied using the specified reducer function.

Schema

The main documented way to specify the schema of a graph is by using TypedDict. However, we also support using a Pydantic BaseModel as your graph state to add default values and additional data validation.

Reducers

Reducers are key to understanding how updates from nodes are applied to the State. Each key in the State has its own independent reducer function. If no reducer function is explicitly specified then it is assumed that all updates to that key should override it. Let's take a look at a few examples to understand them better.

Example A:

from typing import TypedDict

class State(TypedDict):
    foo: int
    bar: list[str]

In this example, no reducer functions are specified for any key. Let's assume the input to the graph is {"foo": 1, "bar": ["hi"]}. Let's then assume the first Node returns {"foo": 2}. This is treated as an update to the state. Notice that the Node does not need to return the whole State schema - just an update. After applying this update, the State would then be {"foo": 2, "bar": ["hi"]}. If the second node returns {"bar": ["bye"]} then the State would then be {"foo": 2, "bar": ["bye"]}

Example B:

from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

In this example, we've used the Annotated type to specify a reducer function (operator.add) for the second key (bar). Note that the first key remains unchanged. Let's assume the input to the graph is {"foo": 1, "bar": ["hi"]}. Let's then assume the first Node returns {"foo": 2}. This is treated as an update to the state. Notice that the Node does not need to return the whole State schema - just an update. After applying this update, the State would then be {"foo": 2, "bar": ["hi"]}. If the second node returns {"bar": ["bye"]} then the State would then be {"foo": 2, "bar": ["hi", "bye"]}. Notice here that the bar key is updated by adding the two lists together.

MessageState

MessageState is one of the few opinionated components in LangGraph. MessageState is a special state designed to make it easy to use a list of messages as a key in your state. Specifically, MessageState is defined as:

from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated, TypedDict

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

What this is doing is creating a TypedDict with a single key: messages. This is a list of Message objects, with add_messages as a reducer. add_messages basically adds messages to the existing list (it also does some nice extra things, like convert from OpenAI message format to the standard LangChain message format, handle updates based on message IDs, etc).

We often see a list of messages being a key component of state, so this prebuilt state is intended to make it easy to use messages. Typically, there is more state to track than just messages, so we see people subclass this state and add more fields, like:

from langgraph.graph import MessagesState

class State(MessagesState):
    documents: list[str]

Nodes

In LangGraph, nodes are typically python functions (sync or async) where the first positional argument is the state, and (optionally), the second positional argument is a "config", containing optional configurable parameters (such as a thread_id).

Similar to NetworkX, you add these nodes to a graph using the add_node method:

from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph

builder = StateGraph(dict)


def my_node(state: dict, config: RunnableConfig):
    print("In node: ", config["configurable"]["user_id"])
    return {"results": f"Hello, {state['input']}!"}


# The second argument is optional
def my_other_node(state: dict):
    return state


builder.add_node("my_node", my_node)
builder.add_node("other_node", my_other_node)
...

Behind the scenes, functions are converted to RunnableLambda's, which add batch and async support to your function, along with native tracing and debugging.

If you add a node to graph without specifying a name, it will be given a default name equivalent to the function name.

builder.add_node(my_node)
# You can then create edges to/from this node by referencing it as `"my_node"`

START Node

The START Node is a special node that represents the node sends user input to the graph. The main purpose for referencing this node is to determine which nodes should be called first.

from langgraph.graph import START

graph.add_edge(START, "node_a")

END Node

The END Node is a special node that represents a terminal node. This node is referenced when you want to denote which edges have no actions after they are done.

from langgraph.graph import END

graph.add_edge("node_a", END)

Edges

Edges define how the logic is routed and how the graph decides to stop. This is a big part of how your agents work and how different nodes communicate with each other. There are a few key types of edges:

  • Normal Edges: Go directly from one node to the next.
  • Conditional Edges: Call a function to determine which node(s) to go to next.
  • Entry Point: Which node to call first when user input arrives.
  • Conditional Entry Point: Call a function to determine which node(s) to call first when user input arrives.

A node can have MULTIPLE outgoing edges. If a node has multiple out-going edges, all of those destination nodes will be executed in parallel as a part of the next superstep.

Normal Edges

If you always want to go from node A to node B, you can use the add_edge method directly.

graph.add_edge("node_a", "node_b")

Conditional Edges

If you want to optionally route to 1 or more edges (or optionally terminate), you can use the add_conditional_edges method. This method accepts the name of a node and a "routing function" to call after that node is executed:

graph.add_edge("node_a", routing_function)

Similar to nodes, the routing_function accept the current state of the graph and return a value.

By default, the return value routing_function is used as the name of the node (or a list of nodes) to send the state to next. All those nodes will be run in parallel as a part of the next superstep.

You can optionally provide a dictionary that maps the routing_function's output to the name of the next node.

graph.add_edge("node_a", routing_function, {True: "node_b", False: "node_c"})

Entry Point

The entry point is the first node(s) that are run when the graph starts. You can use the add_edge method from the virtual START node to the first node to execute to specify where to enter the graph.

from langgraph.graph import START

graph.add_edge(START, "node_a")

Conditional Entry Point

A conditional entry point lets you start at different nodes depending on custom logic. You can use add_conditional_edges from the virtual START node to accomplish this.

from langgraph.graph import START

graph.add_conditional_edges(START, routing_function)

You can optionally provide a dictionary that maps the routing_function's output to the name of the next node.

graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

Send

By default, Nodes and Edges are defined ahead of time and operate on the same shared state. However, there can be cases where the exact edges are not known ahead of time and/or you may want different versions of State to exist at the same time. A common of example of this is with map-reduce design patterns. In this design pattern, a first node may generate a list of objects, and you may want to apply some other node to all those objects. The number of objects may be unknown ahead of time (meaning the number of edges may not be known) and the input State to the downstream Node should be different (one for each generated object).

To support this design pattern, LangGraph supports returning Send objects from conditional edges. Send takes two arguments: first is the name of the node, and second is the state to pass to that node.

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

graph.add_conditional_edges("node_a", continue_to_jokes)

Checkpointer

One of the main benefits of LangGraph is that it comes backed by a persistence layer. This is accomplished via checkpointers.

Checkpointers can be used to save a checkpoint of the state of a graph after all steps of the graph. This allows for several things.

First, it allows for human-in-the-loop workflows, as it allows humans to inspect, interrupt, and approve steps. Checkpointers are needed for these workflows as the human has to be able to view the state of a graph at any point in time, and the graph has to be to resume execution after the human has made any updates to the state.

Second, it allows for "memory" between interactions. You can use checkpointers to create threads and save the state of a thread after a graph executes. In the case of repeated human interactions (like conversations) any follow up messages can be sent to that checkpoint, which will retain its memory of previous ones.

See this guide for how to add a checkpointer to your graph.

Threads

When using a checkpointer, you must specify a thread_id or thread_ts when running the graph. Threads are used to checkpoint multiple different runs. This can be used to enable a multi-tenant chat applications.

thread_id is simply the ID of a thread. This is always required

thread_ts can optionally be passed. This identifier refers to a specific checkpoint within a thread. This can be used to kick of a run of a graph from some point halfway through a thread.

You must pass these when invoking the graph as part of the configurable part of the config.

config = {"configurable": {"thread_id": "a"}}
graph.invoke(inputs, config=config)

See this guide for how to use threads.

Checkpointer state

When you use a checkpointer with a graph, you can interact with the state of that graph. This usually done when enabling different human-in-the-loop interaction patterns. Each time you run the graph, the checkpointer creates several checkpoints every time a node or set of nodes finishes running. The most recent checkpoint is the current state of the thread. When interacting with the checkpointer state, you must specify a thread identifier.

Each checkpoint has two properties:

  • values: This is the value of the state at this point in time.
  • next: This is a tuple of the nodes to execute next in the graph.

Get state

You can get the state of a checkpointer by calling graph.get_state(config). The config should contain thread_id, and the state will be fetched for that thread.

Get state history

You can also call graph.get_state_history(config) to get a list of the history of the graph. The config should contain thread_id, and the state history will be fetched for that thread.

Update state

You can also interact with the state directly and update it. This takes three different components:

  • config
  • values
  • as_node

config

The config should contain thread_id specifying which thread to update.

values

These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the reducer functions that are part of the state. So this does NOT automatically overwrite the state. Let's walk through an example.

Let's assume you have defined the state of your graph as:

from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

Let's now assume the current state of the graph is

{"foo": 1, "bar": ["a"]}

If you update the state as below:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

Then the new state of the graph will be:

{"foo": 2, "bar": ["a", "b"]}

The foo key is completely changed (because there is no reducer specified for that key, so it overwrites it). However, there is a reducer specified for the bar key, and so it appends "b" to the state of bar.

as_node

The final thing you specify when calling update_state is as_node. This update will be applied as if it came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

The reason this matters is that the next steps in the graph to execute depend on the last node to have given an update, so this can be used to control which node executes next.

Configuration

When creating a graph, you can also mark that certain parts of the graph are configurable. This is commonly done to enable easily switching between models or system prompts. This allows you to create a single "cognitive architecture" (the graph) but have multiple different instance of it.

You can optionally specify a config_schema when creating a graph.

class ConfigSchema(TypedDict):
    llm: str

graph = StateGraph(State, config_schema=ConfigSchema)

You can then pass this configuration into the graph using the configurable config field.

config = {"configurable": {"llm": "anthropic"}}

graph.invoke(inputs, config=config)

You can then access and use this configuration inside a node:

def node_a(state, config):
    llm_type = config.get("configurable", {}).get("llm", "openai")
    llm = get_llm(llm_type)
    ...

See this guide for a full breakdown on configuration

Breakpoints

It can often be useful to set breakpoints before or after certain nodes execute. This can be used to wait for human approval before continuing. These can be set when you "compile" a graph. You can set breakpoints either before a node executes (using interrupt_before) or after a node executes (using interrupt_after.)

You MUST use a checkpoiner when using breakpoints. This is because your graph needs to be able to resume execution.

In order to resume execution, you can just invoke your graph with None as the input.

# Initial run of graph
graph.invoke(inputs, config=config)

# Let's assume it hit a breakpoint somewhere, you can then resume by passing in None
graph.invoke(None, config=config)

See this guide for a full walkthrough of how to add breakpoints.

Visualization

It's often nice to be able to visualize graphs, especially as they get more complex. LangGraph comes with several built-in ways to visualize graphs. See this how-to guide for more info.

Streaming

LangGraph is built with first class support for streaming. There are several different streaming modes that LangGraph supports:

  • "values": This streams the full value of the state after each step of the graph.
  • "updates: This streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are streamed separately.
  • "debug": This streams as much information as possible throughout the execution of the graph.

In addition, you can use the astream_events method to stream back events that happen inside nodes. This is useful for streaming tokens of LLM calls.

Comments