How to wait for user input¶
Human-in-the-loop (HIL) interactions are crucial for agentic systems. Waiting for human input is a common HIL interaction pattern, allowing the agent to ask the user clarifying questions and await input before proceeding.
We can implement this in LangGraph using a breakpoint: breakpoints allow us to stop graph execution at a specific step. At this breakpoint, we can wait for human input. Once we have input from the user, we can add it to the graph state and proceed.
Setup¶
First we need to install the packages required
Next, we need to set API keys for Anthropic and / or OpenAI (the LLM(s) we will use)
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
_set_env("ANTHROPIC_API_KEY")
Set up LangSmith for LangGraph development
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.
Simple Usage¶
Let's look at very basic usage of this. One intuitive approach is simply to create a node, human_feedback
, that will get user feedback. This allows us to place our feedback gathering at a specific, chosen point in our graph.
1) We specify the breakpoint using interrupt_before
our human_feedback
node.
2) We set up a checkpointer to save the state of the graph up until this node.
3) We use .update_state
to update the state of the graph with the human response we get.
- We use the
as_node
parameter to apply this state update as the specified node,human_feedback
. - The graph will then resume execution as if the
human_feedback
node just acted.
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display
class State(TypedDict):
input: str
user_feedback: str
def step_1(state):
print("---Step 1---")
pass
def human_feedback(state):
print("---human_feedback---")
pass
def step_3(state):
print("---Step 3---")
pass
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)
# Set up memory
memory = MemorySaver()
# Add
graph = builder.compile(checkpointer=memory, interrupt_before=["human_feedback"])
# View
display(Image(graph.get_graph().draw_mermaid_png()))
Run until our breakpoint at human_feedback
-
# Input
initial_input = {"input": "hello world"}
# Thread
thread = {"configurable": {"thread_id": "1"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
print(event)
# Get user input
try:
user_input = input("Tell me how you want to update the state: ")
except:
user_input = "go to step 3!"
# We now update the state as if we are the human_feedback node
graph.update_state(thread, {"user_feedback": user_input}, as_node="human_feedback")
# We can check the state
print("--State after update--")
print(graph.get_state(thread))
# We can check the next node, showing that it is node 3 (which follows human_feedback)
graph.get_state(thread).next
--State after update--
StateSnapshot(values={'input': 'hello world', 'user_feedback': 'go to step 3!'}, next=('step_3',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef7830e-b807-6142-8002-1b511e4caf96'}}, metadata={'source': 'update', 'step': 2, 'writes': {'human_feedback': {'user_feedback': 'go to step 3!'}}, 'parents': {}}, created_at='2024-09-21T15:48:17.660131+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef7830e-36d1-6f1e-8001-4d4c913ae8a8'}}, tasks=(PregelTask(id='6b5486bf-eb6c-0e27-4784-cad2a69b86a2', name='step_3', path=('__pregel_pull', 'step_3'), error=None, interrupts=(), state=None),))
We can proceed after our breakpoint -
# Continue the graph execution
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
Agent¶
In the context of agents, waiting for user feedback is useful to ask clarifying questions.
To show this, we will build a relatively simple ReAct-style agent that does tool calling.
We will use OpenAI and / or Anthropic's models and a fake tool (just for demo purposes).
Using Pydantic with LangChain
This notebook uses Pydantic v2 BaseModel
, which requires langchain-core >= 0.3
. Using langchain-core < 0.3
will result in errors due to mixing of Pydantic v1 and v2 BaseModels
.
# Set up the state
from langgraph.graph import MessagesState, START
# Set up the tool
# We will have one real tool - a search tool
# We'll also have one "fake" tool - a "ask_human" tool
# Here we define any ACTUAL tools
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder for the actual implementation
# Don't let the LLM know this though 😊
return f"I looked up: {query}. Result: It's sunny in San Francisco, but you better look out if you're a Gemini 😈."
tools = [search]
tool_node = ToolNode(tools)
# Set up the model
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
model = ChatAnthropic(model="claude-3-5-sonnet-20240620")
model = ChatOpenAI(model="gpt-4o")
from pydantic import BaseModel
# We are going "bind" all tools to the model
# We have the ACTUAL tools from above, but we also need a mock tool to ask a human
# Since `bind_tools` takes in tools but also just tool definitions,
# We can define a tool definition for `ask_human`
class AskHuman(BaseModel):
"""Ask the human a question"""
question: str
model = model.bind_tools(tools + [AskHuman])
# Define nodes and conditional edges
# Define the function that determines whether to continue or not
def should_continue(state):
messages = state["messages"]
last_message = messages[-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return "end"
# If tool call is asking Human, we return that node
# You could also add logic here to let some system know that there's something that requires Human input
# For example, send a slack message, etc
elif last_message.tool_calls[0]["name"] == "AskHuman":
return "ask_human"
# Otherwise if there is, we continue
else:
return "continue"
# Define the function that calls the model
def call_model(state):
messages = state["messages"]
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
# We define a fake node to ask the human
def ask_human(state):
pass
# Build the graph
from langgraph.graph import END, StateGraph
# Define a new graph
workflow = StateGraph(MessagesState)
# Define the three nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_node("ask_human", ask_human)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Finally we pass in a mapping.
# The keys are strings, and the values are other nodes.
# END is a special node marking that the graph should finish.
# What will happen is we will call `should_continue`, and then the output of that
# will be matched against the keys in this mapping.
# Based on which one it matches, that node will then be called.
{
# If `tools`, then we call the tool node.
"continue": "action",
# We may ask the human
"ask_human": "ask_human",
# Otherwise we finish.
"end": END,
},
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")
# After we get back the human response, we go back to the agent
workflow.add_edge("ask_human", "agent")
# Set up memory
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
# We add a breakpoint BEFORE the `ask_human` node so it never executes
app = workflow.compile(checkpointer=memory, interrupt_before=["ask_human"])
display(Image(app.get_graph().draw_mermaid_png()))
Interacting with the Agent¶
We can now interact with the agent. Let's ask it to ask the user where they are, then tell them the weather.
This should make it use the ask_human
tool first, then use the normal tool.
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(
content="Use the search tool to ask the user where they are, then look up the weather there"
)
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
event["messages"][-1].pretty_print()
================================[1m Human Message [0m=================================
Use the search tool to ask the user where they are, then look up the weather there
==================================[1m Ai Message [0m==================================
Tool Calls:
AskHuman (call_LDo62KBPQKZWxPI5IHxPBF0w)
Call ID: call_LDo62KBPQKZWxPI5IHxPBF0w
Args:
question: Can you tell me where you are located?
We now want to update this thread with a response from the user. We then can kick off another run.
Because we are treating this as a tool call, we will need to update the state as if it is a response from a tool call. In order to do this, we will need to check the state to get the ID of the tool call.
tool_call_id = app.get_state(config).values["messages"][-1].tool_calls[0]["id"]
# We now create the tool call with the id and the response we want
tool_message = [
{"tool_call_id": tool_call_id, "type": "tool", "content": "san francisco"}
]
# # This is equivalent to the below, either one works
# from langchain_core.messages import ToolMessage
# tool_message = [ToolMessage(tool_call_id=tool_call_id, content="san francisco")]
# We now update the state
# Notice that we are also specifying `as_node="ask_human"`
# This will apply this update as this node,
# which will make it so that afterwards it continues as normal
app.update_state(config, {"messages": tool_message}, as_node="ask_human")
# We can check the state
# We can see that the state currently has the `agent` node next
# This is based on how we define our graph,
# where after the `ask_human` node goes (which we just triggered)
# there is an edge to the `agent` node
app.get_state(config).next
We can now tell the agent to continue. We can just pass in None
as the input to the graph, since no additional input is needed
==================================[1m Ai Message [0m==================================
Tool Calls:
search (call_LJlkCFfHvAS2taKHTaMmORE5)
Call ID: call_LJlkCFfHvAS2taKHTaMmORE5
Args:
query: current weather in San Francisco
=================================[1m Tool Message [0m=================================
Name: search
["I looked up: current weather in San Francisco. Result: It's sunny in San Francisco, but you better look out if you're a Gemini \ud83d\ude08."]
==================================[1m Ai Message [0m==================================
The current weather in San Francisco is sunny. Enjoy the good weather! 🌞