The Functional API allows you to add LangGraph's key features -- persistence, memory, human-in-the-loop, and streaming — to your applications with minimal changes to your existing code.
It is designed to integrate these features into existing code that may use standard language primitives for branching and control flow, such as if statements, for loops, and function calls. Unlike many data orchestration frameworks that require restructuring code into an explicit pipeline or DAG, the Functional API allows you to incorporate these capabilities without enforcing a rigid execution model.
The Functional API uses two key building blocks:
entrypoint – An entrypoint is a wrapper that takes a function as the starting point of a workflow. It encapsulates workflow logic and manages execution flow, including handling long-running tasks and interrupts.
task – Represents a discrete unit of work, such as an API call or data processing step, that can be executed asynchronously within an entrypoint. Tasks return a future-like object that can be awaited or resolved synchronously.
This provides a minimal abstraction for building workflows with state management and streaming.
Tip
For users who prefer a more declarative approach, LangGraph's Graph API allows you to define workflows using a Graph paradigm. Both APIs share the same underlying runtime, so you can use them together in the same application.
Please see the Functional API vs. Graph API section for a comparison of the two paradigms.
Below we demonstrate a simple application that writes an essay and interrupts to request human review.
import{task,entrypoint,interrupt,MemorySaver}from"@langchain/langgraph";constwriteEssay=task("write_essay",(topic:string):string=>{// A placeholder for a long-running task.return`An essay about topic: ${topic}`;});constworkflow=entrypoint({checkpointer:newMemorySaver(),name:"workflow"},async(topic:string)=>{constessay=awaitwriteEssay(topic);constisApproved=interrupt({// Any json-serializable payload provided to interrupt as argument.// It will be surfaced on the client side as an Interrupt when streaming data// from the workflow.essay,// The essay we want reviewed.// We can add any additional information that we need.// For example, introduce a key called "action" with some instructions.action:"Please approve/reject the essay",});return{essay,// The essay that was generatedisApproved,// Response from HIL};});
Detailed Explanation
This workflow will write an essay about the topic "cat" and then pause to get a review from a human. The workflow can be interrupted for an indefinite amount of time until a review is provided.
When the workflow is resumed, it executes from the very start, but because the result of the writeEssay task was already saved, the task result will be loaded from the checkpoint instead of being recomputed.
import{task,entrypoint,interrupt,MemorySaver,Command}from"@langchain/langgraph";constwriteEssay=task("write_essay",(topic:string):string=>{return`An essay about topic: ${topic}`;});constworkflow=entrypoint({checkpointer:newMemorySaver(),name:"workflow"},async(topic:string)=>{constessay=awaitwriteEssay(topic);constisApproved=interrupt({essay,// The essay we want reviewed.action:"Please approve/reject the essay",});return{essay,isApproved,};});constthreadId=crypto.randomUUID();constconfig={configurable:{thread_id:threadId,},};forawait(constitemofawaitworkflow.stream("cat",config)){console.log(item);}
{write_essay:'An essay about topic: cat'}{__interrupt__:[{value:{essay:'An essay about topic: cat',action:'Please approve/reject the essay'},resumable:true,ns:['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],when:'during'}]}
An essay has been written and is ready for review. Once the review is provided, we can resume the workflow:
// Get review from a user (e.g., via a UI)// In this case, we're using a bool, but this can be any json-serializable value.consthumanReview=true;forawait(constitemofawaitworkflow.stream(newCommand({resume:humanReview}),config)){console.log(item);}
{workflow:{essay:'An essay about topic: cat',isApproved:true}}
The workflow has been completed and the review has been added to the essay.
The entrypoint function can be used to create a workflow from a function. It encapsulates workflow logic and manages execution flow, including handling long-running tasks and interrupts.
An entrypoint is defined by passing a function to the entrypoint function.
The function must accept a single positional argument, which serves as the workflow input. If you need to pass multiple pieces of data, use an object as the input type for the first argument.
You will often want to pass a checkpointer to the entrypoint function to enable persistence and use features like human-in-the-loop.
import{entrypoint,MemorySaver}from"@langchain/langgraph";constcheckpointer=newMemorySaver();constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(someInput:Record<string,any>):Promise<number>=>{// some logic that may involve long-running tasks like API calls,// and may be interrupted for human-in-the-loop.returnresult;});
Serialization
The inputs and outputs of entrypoints must be JSON-serializable to support checkpointing. Please see the serialization section for more details.
When declaring an entrypoint, you can access additional parameters that will be injected automatically at run time by using the getPreviousState function and other utilities. These parameters include:
Parameter
Description
config
For accessing runtime configuration. Automatically populated as the second argument to the entrypoint function (but not task, since tasks can have a variable number of arguments). See RunnableConfig for information.
import{entrypoint,getPreviousState,BaseStore,InMemoryStore,}from"@langchain/langgraph";import{RunnableConfig}from"@langchain/core/runnables";constinMemoryStore=newInMemoryStore(...);// An instance of InMemoryStore for long-term memoryconstmyWorkflow=entrypoint({checkpointer,// Specify the checkpointerstore:inMemoryStore,// Specify the storename:"myWorkflow",},async(someInput:Record<string,any>)=>{constprevious=getPreviousState<any>();// For short-term memory// Rest of workflow logic...});
When an entrypoint is defined with a checkpointer, it stores information between successive invocations on the same thread id in checkpoints.
This allows accessing the state from the previous invocation using the getPreviousState function.
By default, the previous state is the return value of the previous invocation.
constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(number:number)=>{constprevious=getPreviousState<number>();returnnumber+(previous??0);});constconfig={configurable:{thread_id:"some_thread_id",},};awaitmyWorkflow.invoke(1,config);// 1 (previous was undefined)awaitmyWorkflow.invoke(2,config);// 3 (previous was 1 from the previous invocation)
entrypoint.final is a special primitive that can be returned from an entrypoint and allows decoupling the value that is saved in the checkpoint from the return value of the entrypoint.
The first value is the return value of the entrypoint, and the second value is the value that will be saved in the checkpoint.
constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(number:number)=>{constprevious=getPreviousState<number>();// This will return the previous value to the caller, saving// 2 * number to the checkpoint, which will be used in the next invocation// for the previous statereturnentrypoint.final({value:previous??0,save:2*number,});});constconfig={configurable:{thread_id:"1",},};awaitmyWorkflow.invoke(3,config);// 0 (previous was undefined)awaitmyWorkflow.invoke(1,config);// 6 (previous was 3 * 2 from the previous invocation)
A task represents a discrete unit of work, such as an API call or data processing step. It has three key characteristics:
Asynchronous Execution: Tasks are designed to be executed asynchronously, allowing multiple operations to run concurrently without blocking.
Checkpointing: Task results are saved to a checkpoint, enabling resumption of the workflow from the last saved state. (See persistence for more details).
Retries: Tasks can be configured with a retry policy to handle transient errors.
Tasks are defined using the task function, which wraps a regular function.
import{task}from"@langchain/langgraph";constslowComputation=task({"slowComputation",async(inputValue:any)=>{// Simulate a long-running operation...returnresult;});
Serialization
The outputs of tasks must be JSON-serializable to support checkpointing.
You can specify a retry policy for a task by passing a retry parameter to the task function.
constslowComputation=task({name:"slowComputation",// only attempt to run this task once before giving upretry:{maxAttempts:1},},async(inputValue:any)=>{// A long-running operation that may failreturnresult;});
Checkpointing: When you need to save the result of a long-running operation to a checkpoint, so you don't need to recompute it when resuming the workflow.
Human-in-the-loop: If you're building a workflow that requires human intervention, you MUST use tasks to encapsulate any randomness (e.g., API calls) to ensure that the workflow can be resumed correctly. See the determinism section for more details.
Parallel Execution: For I/O-bound tasks, tasks enable parallel execution, allowing multiple operations to run concurrently without blocking (e.g., calling multiple APIs).
Observability: Wrapping operations in tasks provides a way to track the progress of the workflow and monitor the execution of individual operations using LangSmith.
Retryable Work: When work needs to be retried to handle failures or inconsistencies, tasks provide a way to encapsulate and manage the retry logic.
There are two key aspects to serialization in LangGraph:
entrypoint inputs and outputs must be JSON-serializable.
task outputs must be JSON-serializable.
These requirements are necessary for enabling checkpointing and workflow resumption. Use JavaScript primitives
like objects, arrays, strings, numbers, and booleans to ensure that your inputs and outputs are serializable.
Serialization ensures that workflow state, such as task results and intermediate values, can be reliably saved and restored. This is critical for enabling human-in-the-loop interactions, fault tolerance, and parallel execution.
Providing non-serializable inputs or outputs will result in a runtime error when a workflow is configured with a checkpointer.
To utilize features like human-in-the-loop, any randomness should be encapsulated inside of tasks. This guarantees that when execution is halted (e.g., for human in the loop) and then resumed, it will follow the same sequence of steps, even if task results are non-deterministic.
LangGraph achieves this behavior by persisting task and subgraph results as they execute. A well-designed workflow ensures that resuming execution follows the same sequence of steps, allowing previously computed results to be retrieved correctly without having to re-execute them. This is particularly useful for long-running tasks or tasks with non-deterministic results, as it avoids repeating previously done work and allows resuming from essentially the same
While different runs of a workflow can produce different results, resuming a specific run should always follow the same sequence of recorded steps. This allows LangGraph to efficiently look up task and subgraph results that were executed prior to the graph being interrupted and avoid recomputing them.
Idempotency ensures that running the same operation multiple times produces the same result. This helps prevent duplicate API calls and redundant processing if a step is rerun due to a failure. Always place API calls inside tasks functions for checkpointing, and design them to be idempotent in case of re-execution. Re-execution can occur if a task starts, but does not complete successfully. Then, if the workflow is resumed, the task will run again. Use idempotency keys or verify existing results to avoid duplication.
The Functional API and the Graph APIs (StateGraph) provide two different paradigms to create in LangGraph. Here are some key differences:
Control flow: The Functional API does not require thinking about graph structure. You can use standard Python constructs to define workflows. This will usually trim the amount of code you need to write.
State management: The GraphAPI requires declaring a State and may require defining reducers to manage updates to the graph state. @entrypoint and @tasks do not require explicit state management as their state is scoped to the function and is not shared across functions.
Checkpointing: Both APIs generate and use checkpoints. In the Graph API a new checkpoint is generated after every superstep. In the Functional API, when tasks are executed, their results are saved to an existing checkpoint associated with the given entrypoint instead of creating a new checkpoint.
Visualization: The Graph API makes it easy to visualize the workflow as a graph which can be useful for debugging, understanding the workflow, and sharing with others. The Functional API does not support visualization as the graph is dynamically generated during runtime.
Encapsulate side effects (e.g., writing to a file, sending an email) in tasks to ensure they are not executed multiple times when resuming a workflow.
In this example, a side effect (writing to a file) is directly included in the workflow, so it will be executed a second time when resuming the workflow.
constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(inputs:Record<string,any>)=>{// This code will be executed a second time when resuming the workflow.// Which is likely not what you want.awaitfs.writeFile("output.txt","Side effect executed");constvalue=interrupt("question");returnvalue;});
In this example, the side effect is encapsulated in a task, ensuring consistent execution upon resumption.
import{task}from"@langchain/langgraph";constwriteToFile=task("writeToFile",async()=>{awaitfs.writeFile("output.txt","Side effect executed");});constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(inputs:Record<string,any>)=>{// The side effect is now encapsulated in a task.awaitwriteToFile();constvalue=interrupt("question");returnvalue;});
Operations that might give different results each time (like getting current time or random numbers) should be encapsulated in tasks to ensure that on resume, the same result is returned.
In a task: Get random number (5) → interrupt → resume → (returns 5 again) → ...
Not in a task: Get random number (5) → interrupt → resume → get new random number (7) → ...
This is especially important when using human-in-the-loop workflows with multiple interrupts calls. LangGraph keeps a list
of resume values for each task/entrypoint. When an interrupt is encountered, it's matched with the corresponding resume value.
This matching is strictly index-based, so the order of the resume values should match the order of the interrupts.
If order of execution is not maintained when resuming, one interrupt call may be matched with the wrong resume value, leading to incorrect results.
Please read the section on determinism for more details.
In this example, the workflow uses the current time to determine which task to execute. This is non-deterministic because the result of the workflow depends on the time at which it is executed.
In this example, the workflow uses the input t0 to determine which task to execute. This is deterministic because the result of the workflow depends only on the input.
Tasks can be executed in parallel by invoking them concurrently and waiting for the results. This is useful for improving performance in IO bound tasks (e.g., calling APIs for LLMs).
The Functional API and the Graph API can be used together in the same application as they share the same underlying runtime.
import{entrypoint,StateGraph}from"@langchain/langgraph";constbuilder=newStateGraph();...constsomeGraph=builder.compile();constsomeWorkflow=entrypoint({name:"someWorkflow"},async(someInput:Record<string,any>)=>{// Call a graph defined using the graph APIconstresult1=awaitsomeGraph.invoke(...);// Call another graph defined using the graph APIconstresult2=awaitanotherGraph.invoke(...);return{result1,result2,};});
You can call other entrypoints from within an entrypoint or a task.
constsomeOtherWorkflow=entrypoint({name:"someOtherWorkflow"},// Will automatically use the checkpointer from the parent entrypointasync(inputs:{value:number})=>{returninputs.value;});constmyWorkflow=entrypoint({checkpointer,name:"myWorkflow"},async(inputs:Record<string,any>)=>{constvalue=awaitsomeOtherWorkflow.invoke([{value:1}]);returnvalue;});
You can stream custom data from an entrypoint by using the write method on config. This allows you to write custom data to the custom stream.
import{entrypoint,task,MemorySaver,LangGraphRunnableConfig,}from"@langchain/langgraph";constaddOne=task("addOne",(x:number)=>x+1);constaddTwo=task("addTwo",(x:number)=>x+2);constcheckpointer=newMemorySaver();constmain=entrypoint({checkpointer,name:"main"},async(inputs:{number:number},config:LangGraphRunnableConfig)=>{config.writer?.("hello");// Write some data to the `custom` streamawaitaddOne(inputs.number);// Will write data to the `updates` streamconfig.writer?.("world");// Write some more data to the `custom` streamawaitaddTwo(inputs.number);// Will write data to the `updates` streamreturn5;});constconfig={configurable:{thread_id:"1",},};conststream=awaitmain.stream({number:1},{streamMode:["custom","updates"],...config});forawait(constchunkofstream){console.log(chunk);}
import{entrypoint,task,MemorySaver}from"@langchain/langgraph";// Global variable to track the number of attemptsletattempts=0;constgetInfo=task("getInfo",()=>{/* * Simulates a task that fails once before succeeding. * Throws an error on the first attempt, then returns "OK" on subsequent tries. */attempts+=1;if(attempts<2){thrownewError("Failure");// Simulate a failure on the first attempt}return"OK";});// Initialize an in-memory checkpointer for persistenceconstcheckpointer=newMemorySaver();constslowTask=task("slowTask",async()=>{/* * Simulates a slow-running task by introducing a 1-second delay. */awaitnewPromise((resolve)=>setTimeout(resolve,1000));return"Ran slow task.";});constmain=entrypoint({checkpointer,name:"main"},async(inputs:Record<string,any>)=>{/* * Main workflow function that runs the slowTask and getInfo tasks sequentially. * * Parameters: * - inputs: Record<string, any> containing workflow input values. * * The workflow first executes `slowTask` and then attempts to execute `getInfo`, * which will fail on the first invocation. */constslowTaskResult=awaitslowTask();// Blocking call to slowTaskawaitgetInfo();// Error will be thrown here on the first attemptreturnslowTaskResult;});// Workflow execution configuration with a unique thread identifierconstconfig={configurable:{thread_id:"1",// Unique identifier to track workflow execution},};// This invocation will take ~1 second due to the slowTask executiontry{// First invocation will throw an error due to the `getInfo` task failingawaitmain.invoke({anyInput:"foobar"},config);}catch(err){// Handle the failure gracefully}
When we resume execution, we won't need to re-run the slowTask as its result is already saved in the checkpoint.
long-term memory allows storing information across different thread ids. This could be useful for learning information
about a given user in one conversation and using it in another.
Please see the following how-to guides for more details: