How to add cross-thread persistence (functional API)¶
Prerequisites
This guide assumes familiarity with the following:
LangGraph allows you to persist data across different threads. For instance, you can store information about users (their names or preferences) in a shared (cross-thread) memory and reuse them in the new threads (e.g., new conversations).
When using the functional API, you can set it up to store and retrieve memories by using the Store interface:
-
Create an instance of a
Store
-
Pass the
store
instance to theentrypoint()
wrapper function. It will be passed to the workflow asconfig.store
.
In this guide, we will show how to construct and use a workflow that has a shared memory implemented using the Store interface.
Note
If you need to add cross-thread persistence to a StateGraph
, check out this how-to guide.
Setup¶
Note
This guide requires @langchain/langgraph>=0.2.42
.
First, install the required dependencies for this example:
Next, we need to set API keys for Anthropic and OpenAI (the LLM and embeddings we will use):
Set up LangSmith for LangGraph development
Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here
Example: simple chatbot with long-term memory¶
Define store¶
In this example we will create a workflow that will be able to retrieve information about a user's preferences. We will do so by defining an InMemoryStore
- an object that can store data in memory and query that data.
When storing objects using the Store
interface you define two things:
- the namespace for the object, a tuple (similar to directories)
- the object key (similar to filenames)
In our example, we'll be using ["memories", <user_id>]
as namespace and random UUID as key for each new memory.
Let's first define our store:
import { InMemoryStore } from "@langchain/langgraph";
import { OpenAIEmbeddings } from "@langchain/openai";
const inMemoryStore = new InMemoryStore({
index: {
embeddings: new OpenAIEmbeddings({
model: "text-embedding-3-small",
}),
dims: 1536,
},
});
Create workflow¶
Now let's create our workflow:
import { v4 } from "uuid";
import { ChatAnthropic } from "@langchain/anthropic";
import {
entrypoint,
task,
MemorySaver,
addMessages,
type BaseStore,
getStore,
} from "@langchain/langgraph";
import type { BaseMessage, BaseMessageLike } from "@langchain/core/messages";
const model = new ChatAnthropic({
model: "claude-3-5-sonnet-latest",
});
const callModel = task("callModel", async (
messages: BaseMessage[],
memoryStore: BaseStore,
userId: string
) => {
const namespace = ["memories", userId];
const lastMessage = messages.at(-1);
if (typeof lastMessage?.content !== "string") {
throw new Error("Received non-string message content.");
}
const memories = await memoryStore.search(namespace, {
query: lastMessage.content,
});
const info = memories.map((memory) => memory.value.data).join("\n");
const systemMessage = `You are a helpful assistant talking to the user. User info: ${info}`;
// Store new memories if the user asks the model to remember
if (lastMessage.content.toLowerCase().includes("remember")) {
// Hard-coded for demo
const memory = `Username is Bob`;
await memoryStore.put(namespace, v4(), { data: memory });
}
const response = await model.invoke([
{
role: "system",
content: systemMessage
},
...messages
]);
return response;
});
// NOTE: we're passing the store object here when creating a workflow via entrypoint()
const workflow = entrypoint({
checkpointer: new MemorySaver(),
store: inMemoryStore,
name: "workflow",
}, async (params: {
messages: BaseMessageLike[];
userId: string;
}, config) => {
const messages = addMessages([], params.messages)
const response = await callModel(messages, config.store, params.userId);
return entrypoint.final({
value: response,
save: addMessages(messages, response),
});
});
The current store is passed in as part of the entrypoint's second argument, as config.store
.
Note
If you're using LangGraph Cloud or LangGraph Studio, you don't need to pass store into the entrypoint, since it's done automatically.
Run the workflow!¶
Now let's specify a user ID in the config and tell the model our name:
const config = {
configurable: {
thread_id: "1",
},
streamMode: "values" as const,
};
const inputMessage = {
role: "user",
content: "Hi! Remember: my name is Bob",
};
const stream = await workflow.stream({ messages: [inputMessage], userId: "1" }, config);
for await (const chunk of stream) {
console.log(chunk);
}
AIMessage {
"id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
"content": "Hi Bob! Nice to meet you. I'll remember that your name is Bob. How can I help you today?",
"additional_kwargs": {
"id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
"type": "message",
"role": "assistant",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 28,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 27
}
},
"response_metadata": {
"id": "msg_01U4xHvf4REPSCGWzpLeh1qJ",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 28,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 27
},
"type": "message",
"role": "assistant"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 28,
"output_tokens": 27,
"total_tokens": 55,
"input_token_details": {
"cache_creation": 0,
"cache_read": 0
}
}
}
const config2 = {
configurable: {
thread_id: "2",
},
streamMode: "values" as const,
};
const followupStream = await workflow.stream({
messages: [{
role: "user",
content: "what is my name?",
}],
userId: "1"
}, config2);
for await (const chunk of followupStream) {
console.log(chunk);
}
AIMessage {
"id": "msg_01LB4YapkFawBUbpiu3oeWbF",
"content": "Your name is Bob.",
"additional_kwargs": {
"id": "msg_01LB4YapkFawBUbpiu3oeWbF",
"type": "message",
"role": "assistant",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 28,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 8
}
},
"response_metadata": {
"id": "msg_01LB4YapkFawBUbpiu3oeWbF",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 28,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 8
},
"type": "message",
"role": "assistant"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 28,
"output_tokens": 8,
"total_tokens": 36,
"input_token_details": {
"cache_creation": 0,
"cache_read": 0
}
}
}
const memories = await inMemoryStore.search(["memories", "1"]);
for (const memory of memories) {
console.log(memory.value);
}
const config3 = {
configurable: {
thread_id: "3",
},
streamMode: "values" as const,
};
const otherUserStream = await workflow.stream({
messages: [{
role: "user",
content: "what is my name?",
}],
userId: "2"
}, config3);
for await (const chunk of otherUserStream) {
console.log(chunk);
}
AIMessage {
"id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
"content": "I don't have any information about your name. While I aim to be helpful, I can only know what you directly tell me during our conversation.",
"additional_kwargs": {
"id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
"type": "message",
"role": "assistant",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 25,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 33
}
},
"response_metadata": {
"id": "msg_01KK7CweVY4ZdHxU5bPa4skv",
"model": "claude-3-5-sonnet-20241022",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 25,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"output_tokens": 33
},
"type": "message",
"role": "assistant"
},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": {
"input_tokens": 25,
"output_tokens": 33,
"total_tokens": 58,
"input_token_details": {
"cache_creation": 0,
"cache_read": 0
}
}
}