How to create a custom checkpointer using Postgres¶
When creating LangGraph agents, you can also set them up so that they persist their state. This allows you to do things like interact with an agent multiple times and have it remember previous interactions.
This example shows how to use Postgres
as the backend for persisting checkpoint state.
NOTE: this is just an example implementation. You can implement your own checkpointer using a different database or modify this one as long as it conforms to the BaseCheckpointSaver
interface.
Checkpointer implementation¶
In [1]:
Copied!
%%capture --no-stderr
%pip install -U psycopg psycopg-pool langgraph
%%capture --no-stderr
%pip install -U psycopg psycopg-pool langgraph
In [2]:
Copied!
"""Implementation of a langgraph checkpoint saver using Postgres."""
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Generator, Optional, Union, Tuple, List
import psycopg
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint import BaseCheckpointSaver
from langgraph.serde.jsonplus import JsonPlusSerializer
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, CheckpointTuple
from psycopg_pool import AsyncConnectionPool, ConnectionPool
class JsonAndBinarySerializer(JsonPlusSerializer):
def _default(self, obj):
if isinstance(obj, (bytes, bytearray)):
return self._encode_constructor_args(
obj.__class__, method="fromhex", args=[obj.hex()]
)
return super()._default(obj)
def dumps(self, obj: Any) -> tuple[str, bytes]:
if isinstance(obj, bytes):
return "bytes", obj
elif isinstance(obj, bytearray):
return "bytearray", obj
return "json", super().dumps(obj)
def loads(self, s: tuple[str, bytes]) -> Any:
if s[0] == "bytes":
return s[1]
elif s[0] == "bytearray":
return bytearray(s[1])
elif s[0] == "json":
return super().loads(s[1])
else:
raise NotImplementedError(f"Unknown serialization type: {s[0]}")
@contextmanager
def _get_sync_connection(
connection: Union[psycopg.Connection, ConnectionPool, None],
) -> Generator[psycopg.Connection, None, None]:
"""Get the connection to the Postgres database."""
if isinstance(connection, psycopg.Connection):
yield connection
elif isinstance(connection, ConnectionPool):
with connection.connection() as conn:
yield conn
else:
raise ValueError(
"Invalid sync connection object. Please initialize the check pointer "
f"with an appropriate sync connection object. "
f"Got {type(connection)}."
)
@asynccontextmanager
async def _get_async_connection(
connection: Union[psycopg.AsyncConnection, AsyncConnectionPool, None],
) -> AsyncGenerator[psycopg.AsyncConnection, None]:
"""Get the connection to the Postgres database."""
if isinstance(connection, psycopg.AsyncConnection):
yield connection
elif isinstance(connection, AsyncConnectionPool):
async with connection.connection() as conn:
yield conn
else:
raise ValueError(
"Invalid async connection object. Please initialize the check pointer "
f"with an appropriate async connection object. "
f"Got {type(connection)}."
)
class PostgresSaver(BaseCheckpointSaver):
sync_connection: Optional[Union[psycopg.Connection, ConnectionPool]] = None
"""The synchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open
and remember to close the connection when done.
"""
async_connection: Optional[
Union[psycopg.AsyncConnection, AsyncConnectionPool]
] = None
"""The asynchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open
and remember to close the connection when done.
"""
def __init__(
self,
sync_connection: Optional[Union[psycopg.Connection, ConnectionPool]] = None,
async_connection: Optional[
Union[psycopg.AsyncConnection, AsyncConnectionPool]
] = None
):
super().__init__(serde=JsonPlusSerializer())
self.sync_connection = sync_connection
self.async_connection = async_connection
@contextmanager
def _get_sync_connection(self) -> Generator[psycopg.Connection, None, None]:
"""Get the connection to the Postgres database."""
with _get_sync_connection(self.sync_connection) as connection:
yield connection
@asynccontextmanager
async def _get_async_connection(
self,
) -> AsyncGenerator[psycopg.AsyncConnection, None]:
"""Get the connection to the Postgres database."""
async with _get_async_connection(self.async_connection) as connection:
yield connection
CREATE_TABLES_QUERY = """
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT NOT NULL,
thread_ts TEXT NOT NULL,
parent_ts TEXT,
checkpoint BYTEA NOT NULL,
metadata BYTEA NOT NULL,
PRIMARY KEY (thread_id, thread_ts)
);
"""
@staticmethod
def create_tables(connection: Union[psycopg.Connection, ConnectionPool], /) -> None:
"""Create the schema for the checkpoint saver."""
with _get_sync_connection(connection) as conn:
with conn.cursor() as cur:
cur.execute(PostgresSaver.CREATE_TABLES_QUERY)
@staticmethod
async def acreate_tables(
connection: Union[psycopg.AsyncConnection, AsyncConnectionPool], /
) -> None:
"""Create the schema for the checkpoint saver."""
async with _get_async_connection(connection) as conn:
async with conn.cursor() as cur:
await cur.execute(PostgresSaver.CREATE_TABLES_QUERY)
@staticmethod
def drop_tables(connection: psycopg.Connection, /) -> None:
"""Drop the table for the checkpoint saver."""
with connection.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS checkpoints;")
@staticmethod
async def adrop_tables(connection: psycopg.AsyncConnection, /) -> None:
"""Drop the table for the checkpoint saver."""
async with connection.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS checkpoints;")
UPSERT_CHECKPOINT_QUERY = """
INSERT INTO checkpoints
(thread_id, thread_ts, parent_ts, checkpoint, metadata)
VALUES
(%s, %s, %s, %s, %s)
ON CONFLICT (thread_id, thread_ts)
DO UPDATE SET checkpoint = EXCLUDED.checkpoint,
metadata = EXCLUDED.metadata;
"""
def put(self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata) -> RunnableConfig:
"""Put the checkpoint for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
checkpoint: The checkpoint to persist.
Returns:
The RunnableConfig that describes the checkpoint that was just created.
It'll contain the `thread_id` and `thread_ts` of the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
parent_ts = config["configurable"].get("thread_ts")
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
cur.execute(
self.UPSERT_CHECKPOINT_QUERY,
(
thread_id,
checkpoint["ts"],
parent_ts if parent_ts else None,
self.serde.dumps(checkpoint),
self.serde.dumps(metadata),
),
)
return {
"configurable": {
"thread_id": thread_id,
"thread_ts": checkpoint["ts"],
},
}
async def aput(
self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata
) -> RunnableConfig:
"""Put the checkpoint for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
checkpoint: The checkpoint to persist.
Returns:
The RunnableConfig that describes the checkpoint that was just created.
It'll contain the `thread_id` and `thread_ts` of the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
parent_ts = config["configurable"].get("thread_ts")
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
await cur.execute(
self.UPSERT_CHECKPOINT_QUERY,
(
thread_id,
checkpoint["ts"],
parent_ts if parent_ts else None,
self.serde.dumps(checkpoint),
self.serde.dumps(metadata),
),
)
return {
"configurable": {
"thread_id": thread_id,
"thread_ts": checkpoint["ts"],
},
}
LIST_CHECKPOINTS_QUERY_STR = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
{where}
ORDER BY thread_ts DESC
"""
def list(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> Generator[CheckpointTuple, None, None]:
"""Get all the checkpoints for the given configuration."""
where, args = self._search_where(config, filter, before)
query = self.LIST_CHECKPOINTS_QUERY_STR.format(where=where)
if limit:
query += f" LIMIT {limit}"
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
thread_id = config["configurable"]["thread_id"]
cur.execute(query, tuple(args))
for value in cur:
checkpoint, metadata, thread_ts, parent_ts = value
yield CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if parent_ts
else None,
)
async def alist(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> AsyncIterator[CheckpointTuple]:
"""Get all the checkpoints for the given configuration."""
where, args = self._search_where(config, filter, before)
query = self.LIST_CHECKPOINTS_QUERY_STR.format(where=where)
if limit:
query += f" LIMIT {limit}"
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
thread_id = config["configurable"]["thread_id"]
await cur.execute(query, tuple(args))
async for value in cur:
checkpoint, metadata, thread_ts, parent_ts = value
yield CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if parent_ts
else None,
)
GET_CHECKPOINT_BY_TS_QUERY = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
WHERE thread_id = %(thread_id)s AND thread_ts = %(thread_ts)s
"""
GET_CHECKPOINT_QUERY = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
WHERE thread_id = %(thread_id)s
ORDER BY thread_ts DESC LIMIT 1
"""
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get the checkpoint tuple for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
Returns:
The checkpoint tuple for the given configuration if it exists,
otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
"""
thread_id = config["configurable"]["thread_id"]
thread_ts = config["configurable"].get("thread_ts")
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
if thread_ts:
cur.execute(
self.GET_CHECKPOINT_BY_TS_QUERY,
{
"thread_id": thread_id,
"thread_ts": thread_ts,
},
)
value = cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config=config,
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if thread_ts
else None,
)
else:
cur.execute(
self.GET_CHECKPOINT_QUERY,
{
"thread_id": thread_id,
},
)
value = cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": parent_ts,
}
}
if parent_ts
else None,
)
return None
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get the checkpoint tuple for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
Returns:
The checkpoint tuple for the given configuration if it exists,
otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
"""
thread_id = config["configurable"]["thread_id"]
thread_ts = config["configurable"].get("thread_ts")
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
if thread_ts:
await cur.execute(
self.GET_CHECKPOINT_BY_TS_QUERY,
{
"thread_id": thread_id,
"thread_ts": thread_ts,
},
)
value = await cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config=config,
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if thread_ts
else None,
)
else:
await cur.execute(
self.GET_CHECKPOINT_QUERY,
{
"thread_id": thread_id,
},
)
value = await cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": parent_ts,
}
}
if parent_ts
else None,
)
return None
def _search_where(
self,
config: Optional[RunnableConfig],
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
) -> Tuple[str, List[Any]]:
"""Return WHERE clause predicates for given config, filter, and before parameters.
Args:
config (Optional[RunnableConfig]): The config to use for filtering.
filter (Optional[Dict[str, Any]]): Additional filtering criteria.
before (Optional[RunnableConfig]): A config to limit results before a certain timestamp.
Returns:
Tuple[str, Sequence[Any]]: A tuple containing the WHERE clause and parameter values.
"""
wheres = []
param_values = []
# Add predicate for config
if config is not None:
wheres.append("thread_id = %s ")
param_values.append(config["configurable"]["thread_id"])
if filter:
raise NotImplementedError()
# Add predicate for limiting results before a certain timestamp
if before is not None:
wheres.append("thread_ts < %s")
param_values.append(before["configurable"]["thread_ts"])
where_clause = "WHERE " + " AND ".join(wheres) if wheres else ""
return where_clause, param_values
"""Implementation of a langgraph checkpoint saver using Postgres."""
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Generator, Optional, Union, Tuple, List
import psycopg
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint import BaseCheckpointSaver
from langgraph.serde.jsonplus import JsonPlusSerializer
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, CheckpointTuple
from psycopg_pool import AsyncConnectionPool, ConnectionPool
class JsonAndBinarySerializer(JsonPlusSerializer):
def _default(self, obj):
if isinstance(obj, (bytes, bytearray)):
return self._encode_constructor_args(
obj.__class__, method="fromhex", args=[obj.hex()]
)
return super()._default(obj)
def dumps(self, obj: Any) -> tuple[str, bytes]:
if isinstance(obj, bytes):
return "bytes", obj
elif isinstance(obj, bytearray):
return "bytearray", obj
return "json", super().dumps(obj)
def loads(self, s: tuple[str, bytes]) -> Any:
if s[0] == "bytes":
return s[1]
elif s[0] == "bytearray":
return bytearray(s[1])
elif s[0] == "json":
return super().loads(s[1])
else:
raise NotImplementedError(f"Unknown serialization type: {s[0]}")
@contextmanager
def _get_sync_connection(
connection: Union[psycopg.Connection, ConnectionPool, None],
) -> Generator[psycopg.Connection, None, None]:
"""Get the connection to the Postgres database."""
if isinstance(connection, psycopg.Connection):
yield connection
elif isinstance(connection, ConnectionPool):
with connection.connection() as conn:
yield conn
else:
raise ValueError(
"Invalid sync connection object. Please initialize the check pointer "
f"with an appropriate sync connection object. "
f"Got {type(connection)}."
)
@asynccontextmanager
async def _get_async_connection(
connection: Union[psycopg.AsyncConnection, AsyncConnectionPool, None],
) -> AsyncGenerator[psycopg.AsyncConnection, None]:
"""Get the connection to the Postgres database."""
if isinstance(connection, psycopg.AsyncConnection):
yield connection
elif isinstance(connection, AsyncConnectionPool):
async with connection.connection() as conn:
yield conn
else:
raise ValueError(
"Invalid async connection object. Please initialize the check pointer "
f"with an appropriate async connection object. "
f"Got {type(connection)}."
)
class PostgresSaver(BaseCheckpointSaver):
sync_connection: Optional[Union[psycopg.Connection, ConnectionPool]] = None
"""The synchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open
and remember to close the connection when done.
"""
async_connection: Optional[
Union[psycopg.AsyncConnection, AsyncConnectionPool]
] = None
"""The asynchronous connection or pool to the Postgres database.
If providing a connection object, please ensure that the connection is open
and remember to close the connection when done.
"""
def __init__(
self,
sync_connection: Optional[Union[psycopg.Connection, ConnectionPool]] = None,
async_connection: Optional[
Union[psycopg.AsyncConnection, AsyncConnectionPool]
] = None
):
super().__init__(serde=JsonPlusSerializer())
self.sync_connection = sync_connection
self.async_connection = async_connection
@contextmanager
def _get_sync_connection(self) -> Generator[psycopg.Connection, None, None]:
"""Get the connection to the Postgres database."""
with _get_sync_connection(self.sync_connection) as connection:
yield connection
@asynccontextmanager
async def _get_async_connection(
self,
) -> AsyncGenerator[psycopg.AsyncConnection, None]:
"""Get the connection to the Postgres database."""
async with _get_async_connection(self.async_connection) as connection:
yield connection
CREATE_TABLES_QUERY = """
CREATE TABLE IF NOT EXISTS checkpoints (
thread_id TEXT NOT NULL,
thread_ts TEXT NOT NULL,
parent_ts TEXT,
checkpoint BYTEA NOT NULL,
metadata BYTEA NOT NULL,
PRIMARY KEY (thread_id, thread_ts)
);
"""
@staticmethod
def create_tables(connection: Union[psycopg.Connection, ConnectionPool], /) -> None:
"""Create the schema for the checkpoint saver."""
with _get_sync_connection(connection) as conn:
with conn.cursor() as cur:
cur.execute(PostgresSaver.CREATE_TABLES_QUERY)
@staticmethod
async def acreate_tables(
connection: Union[psycopg.AsyncConnection, AsyncConnectionPool], /
) -> None:
"""Create the schema for the checkpoint saver."""
async with _get_async_connection(connection) as conn:
async with conn.cursor() as cur:
await cur.execute(PostgresSaver.CREATE_TABLES_QUERY)
@staticmethod
def drop_tables(connection: psycopg.Connection, /) -> None:
"""Drop the table for the checkpoint saver."""
with connection.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS checkpoints;")
@staticmethod
async def adrop_tables(connection: psycopg.AsyncConnection, /) -> None:
"""Drop the table for the checkpoint saver."""
async with connection.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS checkpoints;")
UPSERT_CHECKPOINT_QUERY = """
INSERT INTO checkpoints
(thread_id, thread_ts, parent_ts, checkpoint, metadata)
VALUES
(%s, %s, %s, %s, %s)
ON CONFLICT (thread_id, thread_ts)
DO UPDATE SET checkpoint = EXCLUDED.checkpoint,
metadata = EXCLUDED.metadata;
"""
def put(self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata) -> RunnableConfig:
"""Put the checkpoint for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
checkpoint: The checkpoint to persist.
Returns:
The RunnableConfig that describes the checkpoint that was just created.
It'll contain the `thread_id` and `thread_ts` of the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
parent_ts = config["configurable"].get("thread_ts")
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
cur.execute(
self.UPSERT_CHECKPOINT_QUERY,
(
thread_id,
checkpoint["ts"],
parent_ts if parent_ts else None,
self.serde.dumps(checkpoint),
self.serde.dumps(metadata),
),
)
return {
"configurable": {
"thread_id": thread_id,
"thread_ts": checkpoint["ts"],
},
}
async def aput(
self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata
) -> RunnableConfig:
"""Put the checkpoint for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
checkpoint: The checkpoint to persist.
Returns:
The RunnableConfig that describes the checkpoint that was just created.
It'll contain the `thread_id` and `thread_ts` of the checkpoint.
"""
thread_id = config["configurable"]["thread_id"]
parent_ts = config["configurable"].get("thread_ts")
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
await cur.execute(
self.UPSERT_CHECKPOINT_QUERY,
(
thread_id,
checkpoint["ts"],
parent_ts if parent_ts else None,
self.serde.dumps(checkpoint),
self.serde.dumps(metadata),
),
)
return {
"configurable": {
"thread_id": thread_id,
"thread_ts": checkpoint["ts"],
},
}
LIST_CHECKPOINTS_QUERY_STR = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
{where}
ORDER BY thread_ts DESC
"""
def list(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> Generator[CheckpointTuple, None, None]:
"""Get all the checkpoints for the given configuration."""
where, args = self._search_where(config, filter, before)
query = self.LIST_CHECKPOINTS_QUERY_STR.format(where=where)
if limit:
query += f" LIMIT {limit}"
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
thread_id = config["configurable"]["thread_id"]
cur.execute(query, tuple(args))
for value in cur:
checkpoint, metadata, thread_ts, parent_ts = value
yield CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if parent_ts
else None,
)
async def alist(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
limit: Optional[int] = None,
) -> AsyncIterator[CheckpointTuple]:
"""Get all the checkpoints for the given configuration."""
where, args = self._search_where(config, filter, before)
query = self.LIST_CHECKPOINTS_QUERY_STR.format(where=where)
if limit:
query += f" LIMIT {limit}"
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
thread_id = config["configurable"]["thread_id"]
await cur.execute(query, tuple(args))
async for value in cur:
checkpoint, metadata, thread_ts, parent_ts = value
yield CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if parent_ts
else None,
)
GET_CHECKPOINT_BY_TS_QUERY = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
WHERE thread_id = %(thread_id)s AND thread_ts = %(thread_ts)s
"""
GET_CHECKPOINT_QUERY = """
SELECT checkpoint, metadata, thread_ts, parent_ts
FROM checkpoints
WHERE thread_id = %(thread_id)s
ORDER BY thread_ts DESC LIMIT 1
"""
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get the checkpoint tuple for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
Returns:
The checkpoint tuple for the given configuration if it exists,
otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
"""
thread_id = config["configurable"]["thread_id"]
thread_ts = config["configurable"].get("thread_ts")
with self._get_sync_connection() as conn:
with conn.cursor() as cur:
if thread_ts:
cur.execute(
self.GET_CHECKPOINT_BY_TS_QUERY,
{
"thread_id": thread_id,
"thread_ts": thread_ts,
},
)
value = cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config=config,
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if thread_ts
else None,
)
else:
cur.execute(
self.GET_CHECKPOINT_QUERY,
{
"thread_id": thread_id,
},
)
value = cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": parent_ts,
}
}
if parent_ts
else None,
)
return None
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get the checkpoint tuple for the given configuration.
Args:
config: The configuration for the checkpoint.
A dict with a `configurable` key which is a dict with
a `thread_id` key and an optional `thread_ts` key.
For example, { 'configurable': { 'thread_id': 'test_thread' } }
Returns:
The checkpoint tuple for the given configuration if it exists,
otherwise None.
If thread_ts is None, the latest checkpoint is returned if it exists.
"""
thread_id = config["configurable"]["thread_id"]
thread_ts = config["configurable"].get("thread_ts")
async with self._get_async_connection() as conn:
async with conn.cursor() as cur:
if thread_ts:
await cur.execute(
self.GET_CHECKPOINT_BY_TS_QUERY,
{
"thread_id": thread_id,
"thread_ts": thread_ts,
},
)
value = await cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config=config,
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
}
if thread_ts
else None,
)
else:
await cur.execute(
self.GET_CHECKPOINT_QUERY,
{
"thread_id": thread_id,
},
)
value = await cur.fetchone()
if value:
checkpoint, metadata, thread_ts, parent_ts = value
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"thread_ts": thread_ts,
}
},
checkpoint=self.serde.loads(checkpoint),
metadata=self.serde.loads(metadata),
parent_config={
"configurable": {
"thread_id": thread_id,
"thread_ts": parent_ts,
}
}
if parent_ts
else None,
)
return None
def _search_where(
self,
config: Optional[RunnableConfig],
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
) -> Tuple[str, List[Any]]:
"""Return WHERE clause predicates for given config, filter, and before parameters.
Args:
config (Optional[RunnableConfig]): The config to use for filtering.
filter (Optional[Dict[str, Any]]): Additional filtering criteria.
before (Optional[RunnableConfig]): A config to limit results before a certain timestamp.
Returns:
Tuple[str, Sequence[Any]]: A tuple containing the WHERE clause and parameter values.
"""
wheres = []
param_values = []
# Add predicate for config
if config is not None:
wheres.append("thread_id = %s ")
param_values.append(config["configurable"]["thread_id"])
if filter:
raise NotImplementedError()
# Add predicate for limiting results before a certain timestamp
if before is not None:
wheres.append("thread_ts < %s")
param_values.append(before["configurable"]["thread_ts"])
where_clause = "WHERE " + " AND ".join(wheres) if wheres else ""
return where_clause, param_values
Setup environment¶
In [3]:
Copied!
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
OPENAI_API_KEY: ········
Setup model and tools for the graph¶
In [4]:
Copied!
from typing import Literal
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-4o", temperature=0)
from typing import Literal
from langchain_core.runnables import ConfigurableField
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
@tool
def get_weather(city: Literal["nyc", "sf"]):
"""Use this to get weather information."""
if city == "nyc":
return "It might be cloudy in nyc"
elif city == "sf":
return "It's always sunny in sf"
else:
raise AssertionError("Unknown city")
tools = [get_weather]
model = ChatOpenAI(model_name="gpt-4o", temperature=0)
Use sync connection¶
In [5]:
Copied!
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
DB_URI = "postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable"
With a connection pool¶
In [6]:
Copied!
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
# Example configuration
conninfo=DB_URI,
max_size=20,
)
checkpointer = PostgresSaver(
sync_connection=pool
)
checkpointer.create_tables(pool)
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
# Example configuration
conninfo=DB_URI,
max_size=20,
)
checkpointer = PostgresSaver(
sync_connection=pool
)
checkpointer.create_tables(pool)
In [7]:
Copied!
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
In [8]:
Copied!
res
res
Out[8]:
{'messages': [HumanMessage(content="what's the weather in sf", id='8481d24b-d505-40fc-8c55-347277d50f2a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_cGApaEshW0YNp5WsirAcFTBv', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bf85107a-02a4-48d1-83ba-5448498f5320-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_cGApaEshW0YNp5WsirAcFTBv'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='f685da3d-b196-44a1-9fe5-c7ce4d3d7d1d', tool_call_id='call_cGApaEshW0YNp5WsirAcFTBv'), AIMessage(content='The weather in San Francisco is currently sunny.', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'stop', 'logprobs': None}, id='run-0d681a28-8116-4cb2-a25f-e4f7949481eb-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})]}
In [9]:
Copied!
checkpointer.get(config)
checkpointer.get(config)
Out[9]:
{'v': 1, 'ts': '2024-06-27T01:06:18.970784+00:00', 'id': '1ef34217-5fac-63f8-8003-9f1ea087dd14', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='8481d24b-d505-40fc-8c55-347277d50f2a'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_cGApaEshW0YNp5WsirAcFTBv', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-bf85107a-02a4-48d1-83ba-5448498f5320-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_cGApaEshW0YNp5WsirAcFTBv'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='f685da3d-b196-44a1-9fe5-c7ce4d3d7d1d', tool_call_id='call_cGApaEshW0YNp5WsirAcFTBv'), AIMessage(content='The weather in San Francisco is currently sunny.', response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 84, 'total_tokens': 94}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'stop', 'logprobs': None}, id='run-0d681a28-8116-4cb2-a25f-e4f7949481eb-0', usage_metadata={'input_tokens': 84, 'output_tokens': 10, 'total_tokens': 94})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 3, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': []}
With a connection¶
In [10]:
Copied!
from psycopg import Connection
with Connection.connect(DB_URI) as conn:
checkpointer = PostgresSaver(
sync_connection=conn
)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "2"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
checkpoint_tuple = checkpointer.get_tuple(config)
from psycopg import Connection
with Connection.connect(DB_URI) as conn:
checkpointer = PostgresSaver(
sync_connection=conn
)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "2"}}
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
checkpoint_tuple = checkpointer.get_tuple(config)
In [11]:
Copied!
checkpoint_tuple
checkpoint_tuple
Out[11]:
CheckpointTuple(config={'configurable': {'thread_id': '2', 'thread_ts': '2024-06-27T01:06:20.104263+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:20.104263+00:00', 'id': '1ef34217-6a7b-687e-8003-ab84b18fa9dc', 'channel_values': {'messages': [HumanMessage(content="what's the weather in sf", id='3ef79bb9-4249-419d-b142-8d18de63c2eb'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_zqfYbNrD30Qa9YHFSI7nJVJN', 'function': {'arguments': '{"city":"sf"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 57, 'total_tokens': 71}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5dd2d9d1-e291-43e4-86cd-96a50a546f48-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'sf'}, 'id': 'call_zqfYbNrD30Qa9YHFSI7nJVJN'}], usage_metadata={'input_tokens': 57, 'output_tokens': 14, 'total_tokens': 71}), ToolMessage(content="It's always sunny in sf", name='get_weather', id='f3eadc9a-9340-41d3-a0fb-521a78e6c438', tool_call_id='call_zqfYbNrD30Qa9YHFSI7nJVJN'), AIMessage(content='The weather in San Francisco is sunny! 🌞', response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 84, 'total_tokens': 95}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'stop', 'logprobs': None}, id='run-f897e17e-8f0b-4140-be91-35f351609d82-0', usage_metadata={'input_tokens': 84, 'output_tokens': 11, 'total_tokens': 95})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 3, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 3, 'writes': {'agent': {'messages': [AIMessage(content='The weather in San Francisco is sunny! 🌞', response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 84, 'total_tokens': 95}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'stop', 'logprobs': None}, id='run-f897e17e-8f0b-4140-be91-35f351609d82-0', usage_metadata={'input_tokens': 84, 'output_tokens': 11, 'total_tokens': 95})]}}}, parent_config={'configurable': {'thread_id': '2', 'thread_ts': '1ef34217-659f-62ec-8002-7f9b6b4c2a2f'}})
Use async connection¶
With a connection pool¶
In [12]:
Copied!
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(
# Example configuration
conninfo=DB_URI,
max_size=20,
)
checkpointer = PostgresSaver(
async_connection=pool
)
await checkpointer.acreate_tables(pool)
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(
# Example configuration
conninfo=DB_URI,
max_size=20,
)
checkpointer = PostgresSaver(
async_connection=pool
)
await checkpointer.acreate_tables(pool)
/Users/vadymbarda/.virtualenvs/langgraph-postgres/lib/python3.11/site-packages/psycopg_pool/pool_async.py:138: RuntimeWarning: opening the async pool AsyncConnectionPool in the constructor is deprecated and will not be supported anymore in a future release. Please use `await pool.open()`, or use the pool as context manager using: `async with AsyncConnectionPool(...) as pool: `... warnings.warn(
In [13]:
Copied!
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "3"}}
res = await graph.ainvoke({"messages": [("human", "what's the weather in nyc")]}, config)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "3"}}
res = await graph.ainvoke({"messages": [("human", "what's the weather in nyc")]}, config)
In [14]:
Copied!
checkpoint_tuple = await checkpointer.aget_tuple(config)
checkpoint_tuple = await checkpointer.aget_tuple(config)
In [15]:
Copied!
checkpoint_tuple
checkpoint_tuple
Out[15]:
CheckpointTuple(config={'configurable': {'thread_id': '3', 'thread_ts': '2024-06-27T01:06:21.430879+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:21.430879+00:00', 'id': '1ef34217-7722-6438-8003-0a35c0f65f25', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='cb5a1b8c-f329-4bee-857a-5d2c3d020147'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_gPUiz98ETlDNXttk98w6iThc', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_ce0793330f', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-2d35ac1c-0a99-4538-a8fc-4c182d9df69a-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_gPUiz98ETlDNXttk98w6iThc'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='f5f80d01-92e8-4961-b2b0-3b4d190c4d26', tool_call_id='call_gPUiz98ETlDNXttk98w6iThc'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'stop', 'logprobs': None}, id='run-aea6306e-f6cc-4f87-aaf4-b0dcfbdf9684-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 3, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 3, 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'stop', 'logprobs': None}, id='run-aea6306e-f6cc-4f87-aaf4-b0dcfbdf9684-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}}, parent_config={'configurable': {'thread_id': '3', 'thread_ts': '1ef34217-7099-68be-8002-1a2b75cf9196'}})
Use connection¶
In [16]:
Copied!
from psycopg import AsyncConnection
async with await AsyncConnection.connect(DB_URI) as conn:
checkpointer = PostgresSaver(
async_connection=conn
)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "4"}}
res = await graph.ainvoke({"messages": [("human", "what's the weather in nyc")]}, config)
checkpoint_tuples = [c async for c in checkpointer.alist(config)]
from psycopg import AsyncConnection
async with await AsyncConnection.connect(DB_URI) as conn:
checkpointer = PostgresSaver(
async_connection=conn
)
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "4"}}
res = await graph.ainvoke({"messages": [("human", "what's the weather in nyc")]}, config)
checkpoint_tuples = [c async for c in checkpointer.alist(config)]
In [17]:
Copied!
checkpoint_tuples
checkpoint_tuples
Out[17]:
[CheckpointTuple(config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.556635+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:22.556635+00:00', 'id': '1ef34217-81de-6c5a-8003-04d6ed02b672', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='9df9dda1-5e80-4938-bf24-9207738d9f5f'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TJWuPJovXruJVafVcI540OfS', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5ba0db1c-45e7-4389-975f-489135ff50b4-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TJWuPJovXruJVafVcI540OfS'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='04dd3f8a-c063-4885-bfcd-901aaeb63797', tool_call_id='call_TJWuPJovXruJVafVcI540OfS'), AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'stop', 'logprobs': None}, id='run-de373cba-dc06-4603-bed1-ba4534a2a45a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})], 'agent': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 5, 'start:agent': 3, 'agent': 5, 'branch:agent:should_continue:tools': 4, 'tools': 5}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 3, 'tools': 4}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 3, 'writes': {'agent': {'messages': [AIMessage(content='The weather in NYC might be cloudy.', response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 88, 'total_tokens': 97}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'stop', 'logprobs': None}, id='run-de373cba-dc06-4603-bed1-ba4534a2a45a-0', usage_metadata={'input_tokens': 88, 'output_tokens': 9, 'total_tokens': 97})]}}}, parent_config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.556635+00:00'}}), CheckpointTuple(config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.010058+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:22.010058+00:00', 'id': '1ef34217-7ca8-64ca-8002-c710e3b07a9c', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='9df9dda1-5e80-4938-bf24-9207738d9f5f'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TJWuPJovXruJVafVcI540OfS', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5ba0db1c-45e7-4389-975f-489135ff50b4-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TJWuPJovXruJVafVcI540OfS'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73}), ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='04dd3f8a-c063-4885-bfcd-901aaeb63797', tool_call_id='call_TJWuPJovXruJVafVcI540OfS')], 'tools': 'tools'}, 'channel_versions': {'__start__': 2, 'messages': 4, 'start:agent': 3, 'agent': 4, 'branch:agent:should_continue:tools': 4, 'tools': 4}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 2}, 'tools': {'branch:agent:should_continue:tools': 3}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 2, 'writes': {'tools': {'messages': [ToolMessage(content='It might be cloudy in nyc', name='get_weather', id='04dd3f8a-c063-4885-bfcd-901aaeb63797', tool_call_id='call_TJWuPJovXruJVafVcI540OfS')]}}}, parent_config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.010058+00:00'}}), CheckpointTuple(config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.004456+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:22.004456+00:00', 'id': '1ef34217-7c9a-6a64-8001-65cb5b71ff36', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='9df9dda1-5e80-4938-bf24-9207738d9f5f'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TJWuPJovXruJVafVcI540OfS', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5ba0db1c-45e7-4389-975f-489135ff50b4-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TJWuPJovXruJVafVcI540OfS'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})], 'agent': 'agent', 'branch:agent:should_continue:tools': 'agent'}, 'channel_versions': {'__start__': 2, 'messages': 3, 'start:agent': 3, 'agent': 3, 'branch:agent:should_continue:tools': 3}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {'start:agent': 2}, 'tools': {}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 1, 'writes': {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_TJWuPJovXruJVafVcI540OfS', 'function': {'arguments': '{"city":"nyc"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73}, 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5ba0db1c-45e7-4389-975f-489135ff50b4-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'nyc'}, 'id': 'call_TJWuPJovXruJVafVcI540OfS'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73})]}}}, parent_config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:22.004456+00:00'}}), CheckpointTuple(config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:21.494039+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:21.494039+00:00', 'id': '1ef34217-77bc-67e0-8000-0dcc15e3e392', 'channel_values': {'messages': [HumanMessage(content="what's the weather in nyc", id='9df9dda1-5e80-4938-bf24-9207738d9f5f')], 'start:agent': '__start__'}, 'channel_versions': {'__start__': 2, 'messages': 2, 'start:agent': 2}, 'versions_seen': {'__start__': {'__start__': 1}, 'agent': {}, 'tools': {}}, 'pending_sends': []}, metadata={'source': 'loop', 'step': 0, 'writes': None}, parent_config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:21.494039+00:00'}}), CheckpointTuple(config={'configurable': {'thread_id': '4', 'thread_ts': '2024-06-27T01:06:21.491815+00:00'}}, checkpoint={'v': 1, 'ts': '2024-06-27T01:06:21.491815+00:00', 'id': '1ef34217-77b7-6114-bfff-cceb4fa6453c', 'channel_values': {'messages': [], '__start__': {'messages': [['human', "what's the weather in nyc"]]}}, 'channel_versions': {'__start__': 1}, 'versions_seen': {}, 'pending_sends': []}, metadata={'source': 'input', 'step': -1, 'writes': {'messages': [['human', "what's the weather in nyc"]]}}, parent_config=None)]
In [ ]:
Copied!