Skip to content

How to wait for user input (Functional API)

Prerequisites

This guide assumes familiarity with the following:

Human-in-the-loop (HIL) interactions are crucial for agentic systems. Waiting for human input is a common HIL interaction pattern, allowing the agent to ask the user clarifying questions and await input before proceeding.

We can implement this in LangGraph using the interrupt() function. interrupt allows us to stop graph execution to collect input from a user and continue execution with collected input.

This guide demonstrates how to implement human-in-the-loop workflows using LangGraph's Functional API. Specifically, we will demonstrate:

  1. A simple usage example
  2. How to use with a ReAct agent

Setup

First, let's install the required packages and set our API keys:

%%capture --no-stderr
%pip install -U langgraph langchain-openai
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

Set up LangSmith for better debugging

Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM aps built with LangGraph — read more about how to get started in the docs.

Simple usage

Let's demonstrate a simple usage example. We will create three tasks:

  1. Append "bar".
  2. Pause for human input. When resuming, append human input.
  3. Append "qux".
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """Append bar."""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """Append user input."""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """Append qux."""
    return f"{input_query} qux"

API Reference: entrypoint | task | Command | interrupt

We can now compose these tasks in a simple entrypoint:

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3

API Reference: MemorySaver

All we have done to enable human-in-the-loop workflows is called interrupt() inside a task.

Tip

The results of prior tasks-- in this case step_1-- are persisted, so that they are not run again following the interrupt.

Let's send in a query string:

config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
{'step_1': 'foo bar'}


{'__interrupt__': (Interrupt(value='Please provide feedback: foo bar', resumable=True, ns=['graph:d66b2e35-0ee3-d8d6-1a22-aec9d58f13b9', 'human_feedback:e0cd4ee2-b874-e1d2-8bc4-3f7ddc06bcc2'], when='during'),)}
Note that we've paused with an interrupt after step_1. The interrupt provides instructions to resume the run. To resume, we issue a Command containing the data expected by the human_feedback task.

# Continue execution
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
{'human_feedback': 'foo bar baz'}


{'step_3': 'foo bar baz qux'}


{'graph': 'foo bar baz qux'}
After resuming, the run proceeds through the remaining step and terminates as expected.

Agent

We will build off of the agent created in the How to create a ReAct agent using the Functional API guide.

Here we will extend the agent by allowing it to reach out to a human for assistance when needed.

Define model and tools

Let's first define the tools and model we will use for our example. As in the ReAct agent guide, we will use a single place-holder tool that gets a description of the weather for a location.

We will use an OpenAI chat model for this example, but any model supporting tool-calling will suffice.

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

model = ChatOpenAI(model="gpt-4o-mini")


@tool
def get_weather(location: str):
    """Call to get the weather from a specific location."""
    # This is a placeholder for the actual implementation
    if any([city in location.lower() for city in ["sf", "san francisco"]]):
        return "It's sunny!"
    elif "boston" in location.lower():
        return "It's rainy!"
    else:
        return f"I am not sure what the weather is in {location}"

API Reference: ChatOpenAI | tool

To reach out to a human for assistance, we can simply add a tool that calls interrupt:

from langgraph.types import Command, interrupt


@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]


tools = [get_weather, human_assistance]

API Reference: Command | interrupt

Define tasks

Our tasks are otherwise unchanged from the ReAct agent guide:

  1. Call model: We want to query our chat model with a list of messages.
  2. Call tool: If our model generates tool calls, we want to execute them.

We just have one more tool accessible to the model.

from langchain_core.messages import ToolMessage
from langgraph.func import entrypoint, task

tools_by_name = {tool.name: tool for tool in tools}


@task
def call_model(messages):
    """Call model with a sequence of messages."""
    response = model.bind_tools(tools).invoke(messages)
    return response


@task
def call_tool(tool_call):
    tool = tools_by_name[tool_call["name"]]
    observation = tool.invoke(tool_call)
    return ToolMessage(content=observation, tool_call_id=tool_call["id"])

API Reference: ToolMessage | entrypoint | task

Define entrypoint

Our entrypoint is also unchanged from the ReAct agent guide:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.message import add_messages

checkpointer = MemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    llm_response = call_model(messages).result()
    while True:
        if not llm_response.tool_calls:
            break

        # Execute tools
        tool_result_futures = [
            call_tool(tool_call) for tool_call in llm_response.tool_calls
        ]
        tool_results = [fut.result() for fut in tool_result_futures]

        # Append to message list
        messages = add_messages(messages, [llm_response, *tool_results])

        # Call model again
        llm_response = call_model(messages).result()

    # Generate final response
    messages = add_messages(messages, llm_response)
    return entrypoint.final(value=llm_response, save=messages)

API Reference: MemorySaver | add_messages

Usage

Let's invoke our model with a question that requires human assistance. Our question will also require an invocation of the get_weather tool:

def _print_step(step: dict) -> None:
    for task_name, result in step.items():
        if task_name == "agent":
            continue  # just stream from tasks
        print(f"\n{task_name}:")
        if task_name == "__interrupt__":
            print(result)
        else:
            result.pretty_print()
config = {"configurable": {"thread_id": "1"}}

user_message = {
    "role": "user",
    "content": (
        "Can you reach out for human assistance: what should I feed my cat? "
        "Separately, can you check the weather in San Francisco?"
    ),
}
print(user_message)

for step in agent.stream([user_message], config):
    _print_step(step)
{'role': 'user', 'content': 'Can you reach out for human assistance: what should I feed my cat? Separately, can you check the weather in San Francisco?'}

call_model:
================================== Ai Message ==================================
Tool Calls:
  human_assistance (call_joAEBVX7Abfm7TsZ0k95ZkVx)
 Call ID: call_joAEBVX7Abfm7TsZ0k95ZkVx
  Args:
    query: What should I feed my cat?
  get_weather (call_ut7zfHFCcms63BOZLrRHszGH)
 Call ID: call_ut7zfHFCcms63BOZLrRHszGH
  Args:
    location: San Francisco

call_tool:
================================= Tool Message =================================

content="It's sunny!" name='get_weather' tool_call_id='call_ut7zfHFCcms63BOZLrRHszGH'

__interrupt__:
(Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)
Note that we generate two tool calls, and although our run is interrupted, we did not block the execution of the get_weather tool.

Let's inspect where we're interrupted:

print(step)
{'__interrupt__': (Interrupt(value={'query': 'What should I feed my cat?'}, resumable=True, ns=['agent:aa676ccc-b038-25e3-9c8a-18e81d4e1372', 'call_tool:059d53d2-3344-13bc-e170-48b632c2dd97'], when='during'),)}
We can resume execution by issuing a Command. Note that the data we supply in the Command can be customized to your needs based on the implementation of human_assistance.

human_response = "You should feed your cat a fish."
human_command = Command(resume={"data": human_response})

for step in agent.stream(human_command, config):
    _print_step(step)
call_tool:
================================= Tool Message =================================

content='You should feed your cat a fish.' name='human_assistance' tool_call_id='call_joAEBVX7Abfm7TsZ0k95ZkVx'

call_model:
================================== Ai Message ==================================

For human assistance, you should feed your cat fish. 

Regarding the weather in San Francisco, it's sunny!
Above, when we resume we provide the final tool message, allowing the model to generate its response. Check out the LangSmith traces to see a full breakdown of the runs:

  1. Trace from initial query
  2. Trace after resuming

Comments