In [ ]:
Copied!
%%capture --no-stderr
%pip install -U langgraph langchain_anthropic langchain_community
%%capture --no-stderr
%pip install -U langgraph langchain_anthropic langchain_community
In [ ]:
Copied!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
In order to configure the retry policy, you have to pass the retry
parameter to the add_node
function. The retry
parameter takes in a RetryPolicy
named tuple object. Below we instantiate a RetryPolicy
object with the default parameters:
In [15]:
Copied!
from langgraph.pregel import RetryPolicy
RetryPolicy()
from langgraph.pregel import RetryPolicy
RetryPolicy()
Out[15]:
RetryPolicy(initial_interval=0.5, backoff_factor=2.0, max_interval=128.0, max_attempts=3, jitter=True, retry_on=<function default_retry_on at 0x1157419e0>)
In [21]:
Copied!
import operator
import sqlite3
from typing import Annotated, Sequence, TypedDict
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import BaseMessage
from langgraph.graph import END, StateGraph, START
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage
db = SQLDatabase.from_uri("sqlite:///:memory:")
model = ChatAnthropic(model_name="claude-2.1")
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
def query_database(state):
query_result = db.run("SELECT * FROM Artist LIMIT 10;")
return {"messages": [AIMessage(content=query_result)]}
def call_model(state):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Define a new graph
workflow = StateGraph(AgentState)
workflow.add_node(
"query_database",
query_database,
retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
workflow.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
workflow.add_edge(START, "model")
workflow.add_edge("model", "query_database")
workflow.add_edge("query_database", END)
app = workflow.compile()
import operator
import sqlite3
from typing import Annotated, Sequence, TypedDict
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import BaseMessage
from langgraph.graph import END, StateGraph, START
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage
db = SQLDatabase.from_uri("sqlite:///:memory:")
model = ChatAnthropic(model_name="claude-2.1")
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
def query_database(state):
query_result = db.run("SELECT * FROM Artist LIMIT 10;")
return {"messages": [AIMessage(content=query_result)]}
def call_model(state):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Define a new graph
workflow = StateGraph(AgentState)
workflow.add_node(
"query_database",
query_database,
retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
workflow.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
workflow.add_edge(START, "model")
workflow.add_edge("model", "query_database")
workflow.add_edge("query_database", END)
app = workflow.compile()