Delayed Background Memory Processing¶
When conversations are active, an agent may receive many messages in quick succession. Instead of processing each message immediately for long-term memory management, you can wait for conversation activity to settle. This guide shows how to use ReflectionExecutor
to debounce memory processing.
Problem¶
Processing memories on every message has drawbacks: - Redundant work when messages arrive in quick succession - Incomplete context when processing mid-conversation - Unnecessary token consumption
ReflectionExecutor
defers memory processing and cancels redundant work:
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint
from langgraph.store.memory import InMemoryStore
from langmem import ReflectionExecutor, create_memory_store_manager
# Create memory manager to extract memories from conversations (1)
memory_manager = create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("memories",),
)
# Wrap memory_manager to handle deferred background processing (2)
executor = ReflectionExecutor(memory_manager)
store = InMemoryStore(
index={
"dims": 1536,
"embed": "openai:text-embedding-3-small",
}
)
@entrypoint(store=store)
def chat(message: str):
response = llm.invoke(message)
# Format conversation for memory processing
# Must follow OpenAI's message format
to_process = {"messages": [{"role": "user", "content": message}] + [response]}
# Wait 30 minutes before processing
# If new messages arrive before then:
# 1. Cancel pending processing task
# 2. Reschedule with new messages included
delay = 0.5 # In practice would choose longer (30-60 min)
# depending on app context.
executor.submit(to_process, after_seconds=delay)
return response.content
-
The
create_memory_store_manager
creates a Runnable that extracts memories from conversations. It processes messages in OpenAI's format: -
The
ReflectionExecutor
handles background processing of memories. For each conversation thread:- Maintains a queue of pending memory tasks
- Cancels old tasks when new messages arrive
- Only processes after the specified delay
This debouncing ensures you process complete conversation context instead of fragments.