Parallel execution of nodes is essential to speed up overall graph operation. LangGraph offers native support for parallel execution of nodes, which can significantly enhance the performance of graph-based workflows. This parallelization is achieved through fan-out and fan-in mechanisms, utilizing both standard edges and conditional_edges. Below are some examples showing how to add create branching dataflows that work for you.
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.
In this example, we fan out from Node A to B and C and then fan in to D. With our state, we specify the reducer add operation. This will combine or accumulate values for the specific key in the State, rather than simply overwriting the existing value. For lists, this means concatenating the new list with the existing list. See this guide for more detail on updating state with reducers.
importoperatorfromtypingimportAnnotated,Anyfromtyping_extensionsimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDclassState(TypedDict):# The operator.add reducer fn makes this append-onlyaggregate:Annotated[list,operator.add]defa(state:State):print(f'Adding "A" to {state["aggregate"]}')return{"aggregate":["A"]}defb(state:State):print(f'Adding "B" to {state["aggregate"]}')return{"aggregate":["B"]}defc(state:State):print(f'Adding "C" to {state["aggregate"]}')return{"aggregate":["C"]}defd(state:State):print(f'Adding "D" to {state["aggregate"]}')return{"aggregate":["D"]}builder=StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(c)builder.add_node(d)builder.add_edge(START,"a")builder.add_edge("a","b")builder.add_edge("a","c")builder.add_edge("b","d")builder.add_edge("c","d")builder.add_edge("d",END)graph=builder.compile()
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "D" to ['A', 'B', 'C']
{'aggregate': ['A', 'B', 'C', 'D']}
Note
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.
Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
Exception handling?
LangGraph executes nodes within "supersteps", meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).
Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don't repeat when resumed.
If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
You can write regular python code within your node to catch and handle exceptions.
You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn't worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Parallel node fan-out and fan-in with extra steps¶
The above example showed how to fan-out and fan-in when each path was only one step. But what if one path had more than one step? Let's add a node b_2 in the "b" branch:
defb_2(state:State):print(f'Adding "B_2" to {state["aggregate"]}')return{"aggregate":["B_2"]}builder=StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(b_2)builder.add_node(c)builder.add_node(d)builder.add_edge(START,"a")builder.add_edge("a","b")builder.add_edge("a","c")builder.add_edge("b","b_2")builder.add_edge(["b_2","c"],"d")builder.add_edge("d",END)graph=builder.compile()
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "B_2" to ['A', 'B', 'C']Adding "D" to ['A', 'B', 'C', 'B_2']
{'aggregate': ['A', 'B', 'C', 'B_2', 'D']}
Note
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. What happens in the next step?
We use add_edge(["b_2", "c"], "d") here to force node "d" to only run when both nodes "b_2" and "c" have finished execution. If we added two separate edges,
node "d" would run twice: after node b2 finishes and once again after node c (in whichever order those nodes finish).
importoperatorfromtypingimportAnnotated,Sequencefromtyping_extensionsimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDclassState(TypedDict):aggregate:Annotated[list,operator.add]# Add a key to the state. We will set this key to determine# how we branch.which:strdefa(state:State):print(f'Adding "A" to {state["aggregate"]}')return{"aggregate":["A"]}defb(state:State):print(f'Adding "B" to {state["aggregate"]}')return{"aggregate":["B"]}defc(state:State):print(f'Adding "C" to {state["aggregate"]}')return{"aggregate":["C"]}defd(state:State):print(f'Adding "D" to {state["aggregate"]}')return{"aggregate":["D"]}defe(state:State):print(f'Adding "E" to {state["aggregate"]}')return{"aggregate":["E"]}builder=StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(c)builder.add_node(d)builder.add_node(e)builder.add_edge(START,"a")defroute_bc_or_cd(state:State)->Sequence[str]:ifstate["which"]=="cd":return["c","d"]return["b","c"]intermediates=["b","c","d"]builder.add_conditional_edges("a",route_bc_or_cd,intermediates,)fornodeinintermediates:builder.add_edge(node,"e")builder.add_edge("e",END)graph=builder.compile()