Python SDK Reference¶
The LangGraph client implementations connect to the LangGraph API.
This module provides both asynchronous (get_client(url="http://localhost:2024")) or LangGraphClient) and synchronous (get_sync_client(url="http://localhost:2024")) or SyncLanggraphClient) clients to interacting with the LangGraph API's core resources such as Assistants, Threads, Runs, and Cron jobs, as well as its persistent document Store.
Classes:
Name | Description |
---|---|
LangGraphClient |
Top-level client for LangGraph API. |
HttpClient |
Handle async requests to the LangGraph API. |
AssistantsClient |
Client for managing assistants in LangGraph. |
ThreadsClient |
Client for managing threads in LangGraph. |
RunsClient |
Client for managing runs in LangGraph. |
CronClient |
Client for managing recurrent runs (cron jobs) in LangGraph. |
StoreClient |
Client for interacting with the graph's shared storage. |
SyncLangGraphClient |
Synchronous client for interacting with the LangGraph API. |
SyncHttpClient |
Handle synchronous requests to the LangGraph API. |
SyncAssistantsClient |
Client for managing assistants in LangGraph synchronously. |
SyncThreadsClient |
Synchronous client for managing threads in LangGraph. |
SyncRunsClient |
Synchronous client for managing runs in LangGraph. |
SyncCronClient |
Synchronous client for managing cron jobs in LangGraph. |
SyncStoreClient |
A client for synchronous operations on a key-value store. |
Functions:
Name | Description |
---|---|
get_client |
Get a LangGraphClient instance. |
get_sync_client |
Get a synchronous LangGraphClient instance. |
LangGraphClient
¶
Top-level client for LangGraph API.
Attributes:
Name | Type | Description |
---|---|---|
assistants |
Manages versioned configuration for your graphs. |
|
threads |
Handles (potentially) multi-turn interactions, such as conversational threads. |
|
runs |
Controls individual invocations of the graph. |
|
crons |
Manages scheduled operations. |
|
store |
Interfaces with persistent, shared data storage. |
HttpClient
¶
Handle async requests to the LangGraph API.
Adds additional error messaging & content handling above the provided httpx client.
Attributes:
Name | Type | Description |
---|---|---|
client |
AsyncClient
|
Underlying HTTPX async client. |
Methods:
Name | Description |
---|---|
get |
Send a GET request. |
post |
Send a POST request. |
put |
Send a PUT request. |
patch |
Send a PATCH request. |
delete |
Send a DELETE request. |
stream |
Stream results using SSE. |
get
async
¶
get(
path: str,
*,
params: Optional[QueryParamTypes] = None,
headers: Optional[dict[str, str]] = None
) -> Any
Send a GET request.
post
async
¶
Send a POST request.
put
async
¶
Send a PUT request.
patch
async
¶
Send a PATCH request.
AssistantsClient
¶
Client for managing assistants in LangGraph.
This class provides methods to interact with assistants, which are versioned configurations of your graph.
Example
Methods:
Name | Description |
---|---|
get |
Get an assistant by ID. |
get_graph |
Get the graph of an assistant by ID. |
get_schemas |
Get the schemas of an assistant by ID. |
get_subgraphs |
Get the schemas of an assistant by ID. |
create |
Create a new assistant. |
update |
Update an assistant. |
delete |
Delete an assistant. |
search |
Search for assistants. |
get_versions |
List all versions of an assistant. |
set_latest |
Change the version of an assistant. |
get
async
¶
Get an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
Assistant Object. |
Example Usage
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'},
'version': 1,
'name': 'my_assistant'
}
get_graph
async
¶
get_graph(
assistant_id: str,
*,
xray: Union[int, bool] = False,
headers: Optional[dict[str, str]] = None
) -> dict[str, list[dict[str, Any]]]
Get the graph of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the graph of. |
required |
xray
|
Union[int, bool]
|
Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Graph |
dict[str, list[dict[str, Any]]]
|
The graph information for the assistant in JSON format. |
Example Usage
client = get_client(url="http://localhost:2024")
graph_info = await client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas
async
¶
Get the schemas of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the schema of. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
GraphSchema |
GraphSchema
|
The graph schema for the assistant. |
Example Usage
client = get_client(url="http://localhost:2024")
schema = await client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs
async
¶
get_subgraphs(
assistant_id: str,
namespace: Optional[str] = None,
recurse: bool = False,
*,
headers: Optional[dict[str, str]] = None
) -> Subgraphs
Get the schemas of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the schema of. |
required |
namespace
|
Optional[str]
|
Optional namespace to filter by. |
None
|
recurse
|
bool
|
Whether to recursively get subgraphs. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Subgraphs |
Subgraphs
|
The graph schema for the assistant. |
create
async
¶
create(
graph_id: Optional[str],
config: Optional[Config] = None,
*,
metadata: Json = None,
assistant_id: Optional[str] = None,
if_exists: Optional[OnConflictBehavior] = None,
name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
description: Optional[str] = None
) -> Assistant
Create a new assistant.
Useful when graph is configurable and you want to create different assistants based on different configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph_id
|
Optional[str]
|
The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. |
required |
config
|
Optional[Config]
|
Configuration to use for the graph. |
None
|
metadata
|
Json
|
Metadata to add to assistant. |
None
|
assistant_id
|
Optional[str]
|
Assistant ID to use, will default to a random UUID if not provided. |
None
|
if_exists
|
Optional[OnConflictBehavior]
|
How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing assistant). |
None
|
name
|
Optional[str]
|
The name of the assistant. Defaults to 'Untitled' under the hood. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
description
|
Optional[str]
|
Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45 |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
The created assistant. |
update
async
¶
update(
assistant_id: str,
*,
graph_id: Optional[str] = None,
config: Optional[Config] = None,
metadata: Json = None,
name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
description: Optional[str] = None
) -> Assistant
Update an assistant.
Use this to point to a different graph, update the configuration, or change the metadata of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
Assistant to update. |
required |
graph_id
|
Optional[str]
|
The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. If None, assistant will keep pointing to same graph. |
None
|
config
|
Optional[Config]
|
Configuration to use for the graph. |
None
|
metadata
|
Json
|
Metadata to merge with existing assistant metadata. |
None
|
name
|
Optional[str]
|
The new name for the assistant. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
description
|
Optional[str]
|
Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45 |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
The updated assistant. |
delete
async
¶
search
async
¶
search(
*,
metadata: Json = None,
graph_id: Optional[str] = None,
limit: int = 10,
offset: int = 0,
sort_by: Optional[AssistantSortBy] = None,
sort_order: Optional[SortOrder] = None,
headers: Optional[dict[str, str]] = None
) -> list[Assistant]
Search for assistants.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Metadata to filter by. Exact match filter for each KV pair. |
None
|
graph_id
|
Optional[str]
|
The ID of the graph to filter by. The graph ID is normally set in your langgraph.json configuration. |
None
|
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
sort_by
|
Optional[AssistantSortBy]
|
The field to sort by. |
None
|
sort_order
|
Optional[SortOrder]
|
The order to sort by. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Assistant]
|
list[Assistant]: A list of assistants. |
get_versions
async
¶
get_versions(
assistant_id: str,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Optional[dict[str, str]] = None
) -> list[AssistantVersion]
List all versions of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID to get versions for. |
required |
metadata
|
Json
|
Metadata to filter versions by. Exact match filter for each KV pair. |
None
|
limit
|
int
|
The maximum number of versions to return. |
10
|
offset
|
int
|
The number of versions to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[AssistantVersion]
|
list[AssistantVersion]: A list of assistant versions. |
set_latest
async
¶
set_latest(
assistant_id: str,
version: int,
*,
headers: Optional[dict[str, str]] = None
) -> Assistant
Change the version of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID to delete. |
required |
version
|
int
|
The version to change to. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
Assistant Object. |
ThreadsClient
¶
Client for managing threads in LangGraph.
A thread maintains the state of a graph across multiple interactions/invocations (aka runs). It accumulates and persists the graph's state, allowing for continuity between separate invocations of the graph.
Example
Methods:
Name | Description |
---|---|
get |
Get a thread by ID. |
create |
Create a new thread. |
update |
Update a thread. |
delete |
Delete a thread. |
search |
Search for threads. |
copy |
Copy a thread. |
get_state |
Get the state of a thread. |
update_state |
Update the state of a thread. |
get_history |
Get the state history of a thread. |
get
async
¶
Get a thread by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
Thread object. |
create
async
¶
create(
*,
metadata: Json = None,
thread_id: Optional[str] = None,
if_exists: Optional[OnConflictBehavior] = None,
supersteps: Optional[
Sequence[dict[str, Sequence[dict[str, Any]]]]
] = None,
graph_id: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Thread
Create a new thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Metadata to add to thread. |
None
|
thread_id
|
Optional[str]
|
ID of thread. If None, ID will be a randomly generated UUID. |
None
|
if_exists
|
Optional[OnConflictBehavior]
|
How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing thread). |
None
|
supersteps
|
Optional[Sequence[dict[str, Sequence[dict[str, Any]]]]]
|
Apply a list of supersteps when creating a thread, each containing a sequence of updates.
Each update has |
None
|
graph_id
|
Optional[str]
|
Optional graph ID to associate with the thread. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
The created thread. |
update
async
¶
update(
thread_id: str,
*,
metadata: dict[str, Any],
headers: Optional[dict[str, str]] = None
) -> Thread
Update a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
ID of thread to update. |
required |
metadata
|
dict[str, Any]
|
Metadata to merge with existing thread metadata. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
The created thread. |
delete
async
¶
search
async
¶
search(
*,
metadata: Json = None,
values: Json = None,
status: Optional[ThreadStatus] = None,
limit: int = 10,
offset: int = 0,
sort_by: Optional[ThreadSortBy] = None,
sort_order: Optional[SortOrder] = None,
headers: Optional[dict[str, str]] = None
) -> list[Thread]
Search for threads.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Thread metadata to filter on. |
None
|
values
|
Json
|
State values to filter on. |
None
|
status
|
Optional[ThreadStatus]
|
Thread status to filter on. Must be one of 'idle', 'busy', 'interrupted' or 'error'. |
None
|
limit
|
int
|
Limit on number of threads to return. |
10
|
offset
|
int
|
Offset in threads table to start search from. |
0
|
sort_by
|
Optional[ThreadSortBy]
|
Sort by field. |
None
|
sort_order
|
Optional[SortOrder]
|
Sort order. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Thread]
|
list[Thread]: List of the threads matching the search parameters. |
copy
async
¶
get_state
async
¶
get_state(
thread_id: str,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
*,
subgraphs: bool = False,
headers: Optional[dict[str, str]] = None
) -> ThreadState
Get the state of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get the state of. |
required |
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to get the state of. |
None
|
checkpoint_id
|
Optional[str]
|
(deprecated) The checkpoint ID to get the state of. |
None
|
subgraphs
|
bool
|
Include subgraphs states. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ThreadState |
ThreadState
|
the thread of the state. |
Example Usage
client = get_client(url="http://localhost:2024)
thread_state = await client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state
async
¶
update_state(
thread_id: str,
values: Optional[Union[dict, Sequence[dict]]],
*,
as_node: Optional[str] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> ThreadUpdateStateResponse
Update the state of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to update. |
required |
values
|
Optional[Union[dict, Sequence[dict]]]
|
The values to update the state with. |
required |
as_node
|
Optional[str]
|
Update the state as if this node had just executed. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to update the state of. |
None
|
checkpoint_id
|
Optional[str]
|
(deprecated) The checkpoint ID to update the state of. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ThreadUpdateStateResponse |
ThreadUpdateStateResponse
|
Response after updating a thread's state. |
Example Usage
client = get_client(url="http://localhost:2024)
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history
async
¶
get_history(
thread_id: str,
*,
limit: int = 10,
before: Optional[str | Checkpoint] = None,
metadata: Optional[dict] = None,
checkpoint: Optional[Checkpoint] = None,
headers: Optional[dict[str, str]] = None
) -> list[ThreadState]
Get the state history of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get the state history for. |
required |
checkpoint
|
Optional[Checkpoint]
|
Return states for this subgraph. If empty defaults to root. |
None
|
limit
|
int
|
The maximum number of states to return. |
10
|
before
|
Optional[str | Checkpoint]
|
Return states before this checkpoint. |
None
|
metadata
|
Optional[dict]
|
Filter states by metadata key-value pairs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[ThreadState]
|
list[ThreadState]: the state history of the thread. |
RunsClient
¶
Client for managing runs in LangGraph.
A run is a single assistant invocation with optional input, config, and metadata. This client manages runs, which can be stateful (on threads) or stateless.
Example
Methods:
Name | Description |
---|---|
stream |
Create a run and stream the results. |
create |
Create a background run. |
create_batch |
Create a batch of stateless background runs. |
wait |
Create a run, wait until it finishes and return the final state. |
list |
List runs. |
get |
Get a run. |
cancel |
Get a run. |
join |
Block until a run is done. Returns the final state of the thread. |
join_stream |
Stream output from a run in real-time, until the run is done. |
delete |
Delete a run. |
stream
¶
stream(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
stream_mode: Union[
StreamMode, Sequence[StreamMode]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
feedback_keys: Optional[Sequence[str]] = None,
on_disconnect: Optional[DisconnectMode] = None,
on_completion: Optional[OnCompletionBehavior] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
if_not_exists: Optional[IfNotExists] = None,
after_seconds: Optional[int] = None,
headers: Optional[dict[str, str]] = None
) -> AsyncIterator[StreamPart]
Create a run and stream the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to assign to the thread. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
A command to execute. Cannot be combined with input. |
None
|
stream_mode
|
Union[StreamMode, Sequence[StreamMode]]
|
The stream mode(s) to use. |
'values'
|
stream_subgraphs
|
bool
|
Whether to stream output from subgraphs. |
False
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
feedback_keys
|
Optional[Sequence[str]]
|
Feedback keys to assign to run. |
None
|
on_disconnect
|
Optional[DisconnectMode]
|
The disconnect mode to use. Must be one of 'cancel' or 'continue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
Returns:
Type | Description |
---|---|
AsyncIterator[StreamPart]
|
AsyncIterator[StreamPart]: Asynchronous iterator of stream results. |
Example Usage
client = get_client(url="http://localhost:2024)
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create
async
¶
create(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
stream_mode: Union[
StreamMode, Sequence[StreamMode]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
if_not_exists: Optional[IfNotExists] = None,
on_completion: Optional[OnCompletionBehavior] = None,
after_seconds: Optional[int] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a background run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to assign to the thread. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
A command to execute. Cannot be combined with input. |
None
|
stream_mode
|
Union[StreamMode, Sequence[StreamMode]]
|
The stream mode(s) to use. |
'values'
|
stream_subgraphs
|
bool
|
Whether to stream output from subgraphs. |
False
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The created background run. |
Example Usage
background_run = await client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch
async
¶
Create a batch of stateless background runs.
wait
async
¶
wait(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
webhook: Optional[str] = None,
on_disconnect: Optional[DisconnectMode] = None,
on_completion: Optional[OnCompletionBehavior] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
if_not_exists: Optional[IfNotExists] = None,
after_seconds: Optional[int] = None,
raise_error: bool = True,
headers: Optional[dict[str, str]] = None
) -> Union[list[dict], dict[str, Any]]
Create a run, wait until it finishes and return the final state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to create the run on. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to run. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
A command to execute. Cannot be combined with input. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
on_disconnect
|
Optional[DisconnectMode]
|
The disconnect mode to use. Must be one of 'cancel' or 'continue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
Union[list[dict], dict[str, Any]]
|
Union[list[dict], dict[str, Any]]: The output of the run. |
Example Usage
client = get_client(url="http://localhost:2024")
final_state_of_run = await client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list
async
¶
list(
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
status: Optional[RunStatus] = None,
headers: Optional[dict[str, str]] = None
) -> List[Run]
List runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to list runs for. |
required |
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
status
|
Optional[RunStatus]
|
The status of the run to filter by. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
List[Run]
|
List[Run]: The runs for the thread. |
get
async
¶
cancel
async
¶
cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Optional[dict[str, str]] = None
) -> None
Get a run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to cancel. |
required |
run_id
|
str
|
The run ID to cancel. |
required |
wait
|
bool
|
Whether to wait until run has completed. |
False
|
action
|
CancelAction
|
Action to take when cancelling the run. Possible values
are |
'interrupt'
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
join
async
¶
Block until a run is done. Returns the final state of the thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to join. |
required |
run_id
|
str
|
The run ID to join. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
dict
|
None |
join_stream
¶
join_stream(
thread_id: str,
run_id: str,
*,
cancel_on_disconnect: bool = False,
stream_mode: Optional[
Union[StreamMode, Sequence[StreamMode]]
] = None,
headers: Optional[dict[str, str]] = None
) -> AsyncIterator[StreamPart]
Stream output from a run in real-time, until the run is done. Output is not buffered, so any output produced before this call will not be received here.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to join. |
required |
run_id
|
str
|
The run ID to join. |
required |
cancel_on_disconnect
|
bool
|
Whether to cancel the run when the stream is disconnected. |
False
|
stream_mode
|
Optional[Union[StreamMode, Sequence[StreamMode]]]
|
The stream mode(s) to use. Must be a subset of the stream modes passed when creating the run. Background runs default to having the union of all stream modes. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
AsyncIterator[StreamPart]
|
None |
CronClient
¶
Client for managing recurrent runs (cron jobs) in LangGraph.
A run is a single invocation of an assistant with optional input and config. This client allows scheduling recurring runs to occur automatically.
Example Usage
Feature Availability
The crons client functionality is not supported on all licenses. Please check the relevant license documentation for the most up-to-date details on feature availability.
Methods:
Name | Description |
---|---|
create_for_thread |
Create a cron job for a thread. |
create |
Create a cron run. |
delete |
Delete a cron. |
search |
Get a list of cron jobs. |
create_for_thread
async
¶
create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Optional[dict] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a cron job for a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
the thread ID to run the cron job on. |
required |
assistant_id
|
str
|
The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph. |
required |
schedule
|
str
|
The cron schedule to execute this job on. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the cron job runs. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[str]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The cron run. |
Example Usage
client = get_client(url="http://localhost:2024")
cron_run = await client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create
async
¶
create(
assistant_id: str,
*,
schedule: str,
input: Optional[dict] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a cron run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph. |
required |
schedule
|
str
|
The cron schedule to execute this job on. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the cron job runs. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[str]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The cron run. |
Example Usage
client = get_client(url="http://localhost:2024")
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete
async
¶
search
async
¶
search(
*,
assistant_id: Optional[str] = None,
thread_id: Optional[str] = None,
limit: int = 10,
offset: int = 0,
headers: Optional[dict[str, str]] = None
) -> list[Cron]
Get a list of cron jobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
Optional[str]
|
The assistant ID or graph name to search for. |
None
|
thread_id
|
Optional[str]
|
the thread ID to search for. |
None
|
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Cron]
|
list[Cron]: The list of cron jobs returned by the search, |
Example Usage
client = get_client(url="http://localhost:2024")
cron_jobs = await client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
StoreClient
¶
Client for interacting with the graph's shared storage.
The Store provides a key-value storage system for persisting data across graph executions, allowing for stateful operations and data sharing across threads.
Example
Methods:
Name | Description |
---|---|
put_item |
Store or update an item. |
get_item |
Retrieve a single item. |
delete_item |
Delete an item. |
search_items |
Search for items within a namespace prefix. |
list_namespaces |
List namespaces with optional match conditions. |
put_item
async
¶
put_item(
namespace: Sequence[str],
/,
key: str,
value: dict[str, Any],
index: Optional[
Union[Literal[False], list[str]]
] = None,
ttl: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
) -> None
Store or update an item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
Sequence[str]
|
A list of strings representing the namespace path. |
required |
key
|
str
|
The unique identifier for the item within the namespace. |
required |
value
|
dict[str, Any]
|
A dictionary containing the item's data. |
required |
index
|
Optional[Union[Literal[False], list[str]]]
|
Controls search indexing - None (use defaults), False (disable), or list of field paths to index. |
None
|
ttl
|
Optional[int]
|
Optional time-to-live in minutes for the item, or None for no expiration. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
get_item
async
¶
get_item(
namespace: Sequence[str],
/,
key: str,
*,
refresh_ttl: Optional[bool] = None,
headers: Optional[dict[str, str]] = None,
) -> Item
Retrieve a single item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
The unique identifier for the item. |
required |
namespace
|
Sequence[str]
|
Optional list of strings representing the namespace path. |
required |
refresh_ttl
|
Optional[bool]
|
Whether to refresh the TTL on this read operation. If None, uses the store's default behavior. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Item |
Item
|
The retrieved item. |
headers |
Item
|
Optional custom headers to include with the request. |
delete_item
async
¶
delete_item(
namespace: Sequence[str],
/,
key: str,
headers: Optional[dict[str, str]] = None,
) -> None
Delete an item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
The unique identifier for the item. |
required |
namespace
|
Sequence[str]
|
Optional list of strings representing the namespace path. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
search_items
async
¶
search_items(
namespace_prefix: Sequence[str],
/,
filter: Optional[dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
query: Optional[str] = None,
refresh_ttl: Optional[bool] = None,
headers: Optional[dict[str, str]] = None,
) -> SearchItemsResponse
Search for items within a namespace prefix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace_prefix
|
Sequence[str]
|
List of strings representing the namespace prefix. |
required |
filter
|
Optional[dict[str, Any]]
|
Optional dictionary of key-value pairs to filter results. |
None
|
limit
|
int
|
Maximum number of items to return (default is 10). |
10
|
offset
|
int
|
Number of items to skip before returning results (default is 0). |
0
|
query
|
Optional[str]
|
Optional query for natural language search. |
None
|
refresh_ttl
|
Optional[bool]
|
Whether to refresh the TTL on items returned by this search. If None, uses the store's default behavior. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
SearchItemsResponse
|
List[Item]: A list of items matching the search criteria. |
Example Usage
client = get_client(url="http://localhost:2024")
items = await client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces
async
¶
list_namespaces(
prefix: Optional[List[str]] = None,
suffix: Optional[List[str]] = None,
max_depth: Optional[int] = None,
limit: int = 100,
offset: int = 0,
headers: Optional[dict[str, str]] = None,
) -> ListNamespaceResponse
List namespaces with optional match conditions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix
|
Optional[List[str]]
|
Optional list of strings representing the prefix to filter namespaces. |
None
|
suffix
|
Optional[List[str]]
|
Optional list of strings representing the suffix to filter namespaces. |
None
|
max_depth
|
Optional[int]
|
Optional integer specifying the maximum depth of namespaces to return. |
None
|
limit
|
int
|
Maximum number of namespaces to return (default is 100). |
100
|
offset
|
int
|
Number of namespaces to skip before returning results (default is 0). |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
ListNamespaceResponse
|
List[List[str]]: A list of namespaces matching the criteria. |
Example Usage
client = get_client(url="http://localhost:2024")
namespaces = await client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
SyncLangGraphClient
¶
Synchronous client for interacting with the LangGraph API.
This class provides synchronous access to LangGraph API endpoints for managing assistants, threads, runs, cron jobs, and data storage.
Example
SyncHttpClient
¶
Handle synchronous requests to the LangGraph API.
Provides error messaging and content handling enhancements above the underlying httpx client, mirroring the interface of HttpClient but for sync usage.
Attributes:
Name | Type | Description |
---|---|---|
client |
Client
|
Underlying HTTPX sync client. |
Methods:
Name | Description |
---|---|
get |
Send a GET request. |
post |
Send a POST request. |
put |
Send a PUT request. |
patch |
Send a PATCH request. |
delete |
Send a DELETE request. |
stream |
Stream the results of a request using SSE. |
get
¶
get(
path: str,
*,
params: Optional[QueryParamTypes] = None,
headers: Optional[dict[str, str]] = None
) -> Any
Send a GET request.
post
¶
Send a POST request.
put
¶
Send a PUT request.
patch
¶
Send a PATCH request.
SyncAssistantsClient
¶
Client for managing assistants in LangGraph synchronously.
This class provides methods to interact with assistants, which are versioned configurations of your graph.
Examples
Methods:
Name | Description |
---|---|
get |
Get an assistant by ID. |
get_graph |
Get the graph of an assistant by ID. |
get_schemas |
Get the schemas of an assistant by ID. |
get_subgraphs |
Get the schemas of an assistant by ID. |
create |
Create a new assistant. |
update |
Update an assistant. |
delete |
Delete an assistant. |
search |
Search for assistants. |
get_versions |
List all versions of an assistant. |
set_latest |
Change the version of an assistant. |
get
¶
Get an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get OR the name of the graph (to use the default assistant). |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
Assistant Object. |
Example Usage
get_graph
¶
get_graph(
assistant_id: str,
*,
xray: Union[int, bool] = False,
headers: Optional[dict[str, str]] = None
) -> dict[str, list[dict[str, Any]]]
Get the graph of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the graph of. |
required |
xray
|
Union[int, bool]
|
Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Graph |
dict[str, list[dict[str, Any]]]
|
The graph information for the assistant in JSON format. |
Example Usage
client = get_sync_client(url="http://localhost:2024")
graph_info = client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas
¶
Get the schemas of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the schema of. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
GraphSchema |
GraphSchema
|
The graph schema for the assistant. |
Example Usage
client = get_sync_client(url="http://localhost:2024")
schema = client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs
¶
get_subgraphs(
assistant_id: str,
namespace: Optional[str] = None,
recurse: bool = False,
*,
headers: Optional[dict[str, str]] = None
) -> Subgraphs
Get the schemas of an assistant by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The ID of the assistant to get the schema of. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Subgraphs |
Subgraphs
|
The graph schema for the assistant. |
create
¶
create(
graph_id: Optional[str],
config: Optional[Config] = None,
*,
metadata: Json = None,
assistant_id: Optional[str] = None,
if_exists: Optional[OnConflictBehavior] = None,
name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
description: Optional[str] = None
) -> Assistant
Create a new assistant.
Useful when graph is configurable and you want to create different assistants based on different configurations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph_id
|
Optional[str]
|
The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. |
required |
config
|
Optional[Config]
|
Configuration to use for the graph. |
None
|
metadata
|
Json
|
Metadata to add to assistant. |
None
|
assistant_id
|
Optional[str]
|
Assistant ID to use, will default to a random UUID if not provided. |
None
|
if_exists
|
Optional[OnConflictBehavior]
|
How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing assistant). |
None
|
name
|
Optional[str]
|
The name of the assistant. Defaults to 'Untitled' under the hood. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
description
|
Optional[str]
|
Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45 |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
The created assistant. |
update
¶
update(
assistant_id: str,
*,
graph_id: Optional[str] = None,
config: Optional[Config] = None,
metadata: Json = None,
name: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
description: Optional[str] = None
) -> Assistant
Update an assistant.
Use this to point to a different graph, update the configuration, or change the metadata of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
Assistant to update. |
required |
graph_id
|
Optional[str]
|
The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. If None, assistant will keep pointing to same graph. |
None
|
config
|
Optional[Config]
|
Configuration to use for the graph. |
None
|
metadata
|
Json
|
Metadata to merge with existing assistant metadata. |
None
|
name
|
Optional[str]
|
The new name for the assistant. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
description
|
Optional[str]
|
Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45 |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
The updated assistant. |
delete
¶
search
¶
search(
*,
metadata: Json = None,
graph_id: Optional[str] = None,
limit: int = 10,
offset: int = 0,
headers: Optional[dict[str, str]] = None
) -> list[Assistant]
Search for assistants.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Metadata to filter by. Exact match filter for each KV pair. |
None
|
graph_id
|
Optional[str]
|
The ID of the graph to filter by. The graph ID is normally set in your langgraph.json configuration. |
None
|
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Assistant]
|
list[Assistant]: A list of assistants. |
get_versions
¶
get_versions(
assistant_id: str,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Optional[dict[str, str]] = None
) -> list[AssistantVersion]
List all versions of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID to get versions for. |
required |
metadata
|
Json
|
Metadata to filter versions by. Exact match filter for each KV pair. |
None
|
limit
|
int
|
The maximum number of versions to return. |
10
|
offset
|
int
|
The number of versions to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[AssistantVersion]
|
list[Assistant]: A list of assistants. |
set_latest
¶
set_latest(
assistant_id: str,
version: int,
*,
headers: Optional[dict[str, str]] = None
) -> Assistant
Change the version of an assistant.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID to delete. |
required |
version
|
int
|
The version to change to. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Assistant |
Assistant
|
Assistant Object. |
SyncThreadsClient
¶
Synchronous client for managing threads in LangGraph.
This class provides methods to create, retrieve, and manage threads, which represent conversations or stateful interactions.
Example
Methods:
Name | Description |
---|---|
get |
Get a thread by ID. |
create |
Create a new thread. |
update |
Update a thread. |
delete |
Delete a thread. |
search |
Search for threads. |
copy |
Copy a thread. |
get_state |
Get the state of a thread. |
update_state |
Update the state of a thread. |
get_history |
Get the state history of a thread. |
get
¶
Get a thread by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
Thread object. |
create
¶
create(
*,
metadata: Json = None,
thread_id: Optional[str] = None,
if_exists: Optional[OnConflictBehavior] = None,
supersteps: Optional[
Sequence[dict[str, Sequence[dict[str, Any]]]]
] = None,
graph_id: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Thread
Create a new thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Metadata to add to thread. |
None
|
thread_id
|
Optional[str]
|
ID of thread. If None, ID will be a randomly generated UUID. |
None
|
if_exists
|
Optional[OnConflictBehavior]
|
How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing thread). |
None
|
supersteps
|
Optional[Sequence[dict[str, Sequence[dict[str, Any]]]]]
|
Apply a list of supersteps when creating a thread, each containing a sequence of updates.
Each update has |
None
|
graph_id
|
Optional[str]
|
Optional graph ID to associate with the thread. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
The created thread. |
update
¶
update(
thread_id: str,
*,
metadata: dict[str, Any],
headers: Optional[dict[str, str]] = None
) -> Thread
Update a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
ID of thread to update. |
required |
metadata
|
dict[str, Any]
|
Metadata to merge with existing thread metadata. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Thread |
Thread
|
The created thread. |
delete
¶
search
¶
search(
*,
metadata: Json = None,
values: Json = None,
status: Optional[ThreadStatus] = None,
limit: int = 10,
offset: int = 0,
headers: Optional[dict[str, str]] = None
) -> list[Thread]
Search for threads.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata
|
Json
|
Thread metadata to filter on. |
None
|
values
|
Json
|
State values to filter on. |
None
|
status
|
Optional[ThreadStatus]
|
Thread status to filter on. Must be one of 'idle', 'busy', 'interrupted' or 'error'. |
None
|
limit
|
int
|
Limit on number of threads to return. |
10
|
offset
|
int
|
Offset in threads table to start search from. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Thread]
|
list[Thread]: List of the threads matching the search parameters. |
copy
¶
get_state
¶
get_state(
thread_id: str,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
*,
subgraphs: bool = False,
headers: Optional[dict[str, str]] = None
) -> ThreadState
Get the state of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get the state of. |
required |
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to get the state of. |
None
|
subgraphs
|
bool
|
Include subgraphs states. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ThreadState |
ThreadState
|
the thread of the state. |
Example Usage
client = get_sync_client(url="http://localhost:2024")
thread_state = client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state
¶
update_state(
thread_id: str,
values: Optional[Union[dict, Sequence[dict]]],
*,
as_node: Optional[str] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> ThreadUpdateStateResponse
Update the state of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to update. |
required |
values
|
Optional[Union[dict, Sequence[dict]]]
|
The values to update the state with. |
required |
as_node
|
Optional[str]
|
Update the state as if this node had just executed. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to update the state of. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
ThreadUpdateStateResponse |
ThreadUpdateStateResponse
|
Response after updating a thread's state. |
Example Usage
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history
¶
get_history(
thread_id: str,
*,
limit: int = 10,
before: Optional[str | Checkpoint] = None,
metadata: Optional[dict] = None,
checkpoint: Optional[Checkpoint] = None,
headers: Optional[dict[str, str]] = None
) -> list[ThreadState]
Get the state history of a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The ID of the thread to get the state history for. |
required |
checkpoint
|
Optional[Checkpoint]
|
Return states for this subgraph. If empty defaults to root. |
None
|
limit
|
int
|
The maximum number of states to return. |
10
|
before
|
Optional[str | Checkpoint]
|
Return states before this checkpoint. |
None
|
metadata
|
Optional[dict]
|
Filter states by metadata key-value pairs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[ThreadState]
|
list[ThreadState]: the state history of the thread. |
SyncRunsClient
¶
Synchronous client for managing runs in LangGraph.
This class provides methods to create, retrieve, and manage runs, which represent individual executions of graphs.
Example
Methods:
Name | Description |
---|---|
stream |
Create a run and stream the results. |
create |
Create a background run. |
create_batch |
Create a batch of stateless background runs. |
wait |
Create a run, wait until it finishes and return the final state. |
list |
List runs. |
get |
Get a run. |
cancel |
Get a run. |
join |
Block until a run is done. Returns the final state of the thread. |
join_stream |
Stream output from a run in real-time, until the run is done. |
delete |
Delete a run. |
stream
¶
stream(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
stream_mode: Union[
StreamMode, Sequence[StreamMode]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
feedback_keys: Optional[Sequence[str]] = None,
on_disconnect: Optional[DisconnectMode] = None,
on_completion: Optional[OnCompletionBehavior] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
if_not_exists: Optional[IfNotExists] = None,
after_seconds: Optional[int] = None,
headers: Optional[dict[str, str]] = None
) -> Iterator[StreamPart]
Create a run and stream the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to assign to the thread. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
The command to execute. |
None
|
stream_mode
|
Union[StreamMode, Sequence[StreamMode]]
|
The stream mode(s) to use. |
'values'
|
stream_subgraphs
|
bool
|
Whether to stream output from subgraphs. |
False
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
feedback_keys
|
Optional[Sequence[str]]
|
Feedback keys to assign to run. |
None
|
on_disconnect
|
Optional[DisconnectMode]
|
The disconnect mode to use. Must be one of 'cancel' or 'continue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
Iterator[StreamPart]
|
Iterator[StreamPart]: Iterator of stream results. |
Example Usage
client = get_sync_client(url="http://localhost:2024")
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create
¶
create(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
stream_mode: Union[
StreamMode, Sequence[StreamMode]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
on_completion: Optional[OnCompletionBehavior] = None,
if_not_exists: Optional[IfNotExists] = None,
after_seconds: Optional[int] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a background run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to assign to the thread. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
The command to execute. |
None
|
stream_mode
|
Union[StreamMode, Sequence[StreamMode]]
|
The stream mode(s) to use. |
'values'
|
stream_subgraphs
|
bool
|
Whether to stream output from subgraphs. |
False
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The created background run. |
Example Usage
client = get_sync_client(url="http://localhost:2024")
background_run = client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch
¶
Create a batch of stateless background runs.
wait
¶
wait(
thread_id: Optional[str],
assistant_id: str,
*,
input: Optional[dict] = None,
command: Optional[Command] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint_during: Optional[bool] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[
Union[All, Sequence[str]]
] = None,
interrupt_after: Optional[
Union[All, Sequence[str]]
] = None,
webhook: Optional[str] = None,
on_disconnect: Optional[DisconnectMode] = None,
on_completion: Optional[OnCompletionBehavior] = None,
multitask_strategy: Optional[MultitaskStrategy] = None,
if_not_exists: Optional[IfNotExists] = None,
after_seconds: Optional[int] = None,
headers: Optional[dict[str, str]] = None
) -> Union[list[dict], dict[str, Any]]
Create a run, wait until it finishes and return the final state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
Optional[str]
|
the thread ID to create the run on. If None will create a stateless run. |
required |
assistant_id
|
str
|
The assistant ID or graph name to run. If using graph name, will default to first assistant created from that graph. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
command
|
Optional[Command]
|
The command to execute. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the run. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint
|
Optional[Checkpoint]
|
The checkpoint to resume from. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, Sequence[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, Sequence[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
on_disconnect
|
Optional[DisconnectMode]
|
The disconnect mode to use. Must be one of 'cancel' or 'continue'. |
None
|
on_completion
|
Optional[OnCompletionBehavior]
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'. |
None
|
multitask_strategy
|
Optional[MultitaskStrategy]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
if_not_exists
|
Optional[IfNotExists]
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread). |
None
|
after_seconds
|
Optional[int]
|
The number of seconds to wait before starting the run. Use to schedule future runs. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
Union[list[dict], dict[str, Any]]
|
Union[list[dict], dict[str, Any]]: The output of the run. |
Example Usage
final_state_of_run = client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list
¶
list(
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
headers: Optional[dict[str, str]] = None
) -> List[Run]
List runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to list runs for. |
required |
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
List[Run]
|
List[Run]: The runs for the thread. |
get
¶
cancel
¶
cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Optional[dict[str, str]] = None
) -> None
Get a run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to cancel. |
required |
run_id
|
str
|
The run ID to cancel. |
required |
wait
|
bool
|
Whether to wait until run has completed. |
False
|
action
|
CancelAction
|
Action to take when cancelling the run. Possible values
are |
'interrupt'
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
join
¶
Block until a run is done. Returns the final state of the thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to join. |
required |
run_id
|
str
|
The run ID to join. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
dict
|
None |
join_stream
¶
join_stream(
thread_id: str,
run_id: str,
*,
stream_mode: Optional[
Union[StreamMode, Sequence[StreamMode]]
] = None,
cancel_on_disconnect: bool = False,
headers: Optional[dict[str, str]] = None
) -> Iterator[StreamPart]
Stream output from a run in real-time, until the run is done. Output is not buffered, so any output produced before this call will not be received here.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
The thread ID to join. |
required |
run_id
|
str
|
The run ID to join. |
required |
stream_mode
|
Optional[Union[StreamMode, Sequence[StreamMode]]]
|
The stream mode(s) to use. Must be a subset of the stream modes passed when creating the run. Background runs default to having the union of all stream modes. |
None
|
cancel_on_disconnect
|
bool
|
Whether to cancel the run when the stream is disconnected. |
False
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
Iterator[StreamPart]
|
None |
SyncCronClient
¶
Synchronous client for managing cron jobs in LangGraph.
This class provides methods to create and manage scheduled tasks (cron jobs) for automated graph executions.
Example
Feature Availability
The crons client functionality is not supported on all licenses. Please check the relevant license documentation for the most up-to-date details on feature availability.
Methods:
Name | Description |
---|---|
create_for_thread |
Create a cron job for a thread. |
create |
Create a cron run. |
delete |
Delete a cron. |
search |
Get a list of cron jobs. |
create_for_thread
¶
create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Optional[dict] = None,
metadata: Optional[dict] = None,
checkpoint_during: Optional[bool] = None,
config: Optional[Config] = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a cron job for a thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
thread_id
|
str
|
the thread ID to run the cron job on. |
required |
assistant_id
|
str
|
The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph. |
required |
schedule
|
str
|
The cron schedule to execute this job on. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the cron job runs. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[str]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The cron run. |
Example Usage
client = get_sync_client(url="http://localhost:8123")
cron_run = client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create
¶
create(
assistant_id: str,
*,
schedule: str,
input: Optional[dict] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint_during: Optional[bool] = None,
interrupt_before: Optional[
Union[All, list[str]]
] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
webhook: Optional[str] = None,
multitask_strategy: Optional[str] = None,
headers: Optional[dict[str, str]] = None
) -> Run
Create a cron run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
str
|
The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph. |
required |
schedule
|
str
|
The cron schedule to execute this job on. |
required |
input
|
Optional[dict]
|
The input to the graph. |
None
|
metadata
|
Optional[dict]
|
Metadata to assign to the cron job runs. |
None
|
config
|
Optional[Config]
|
The configuration for the assistant. |
None
|
checkpoint_during
|
Optional[bool]
|
Whether to checkpoint during the run (or only at the end/interruption). |
None
|
interrupt_before
|
Optional[Union[All, list[str]]]
|
Nodes to interrupt immediately before they get executed. |
None
|
interrupt_after
|
Optional[Union[All, list[str]]]
|
Nodes to Nodes to interrupt immediately after they get executed. |
None
|
webhook
|
Optional[str]
|
Webhook to call after LangGraph API call is done. |
None
|
multitask_strategy
|
Optional[str]
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Run |
Run
|
The cron run. |
Example Usage
client = get_sync_client(url="http://localhost:8123")
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
checkpoint_during=True,
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete
¶
search
¶
search(
*,
assistant_id: Optional[str] = None,
thread_id: Optional[str] = None,
limit: int = 10,
offset: int = 0,
headers: Optional[dict[str, str]] = None
) -> list[Cron]
Get a list of cron jobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assistant_id
|
Optional[str]
|
The assistant ID or graph name to search for. |
None
|
thread_id
|
Optional[str]
|
the thread ID to search for. |
None
|
limit
|
int
|
The maximum number of results to return. |
10
|
offset
|
int
|
The number of results to skip. |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
list[Cron]
|
list[Cron]: The list of cron jobs returned by the search, |
Example Usage
client = get_sync_client(url="http://localhost:8123")
cron_jobs = client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
SyncStoreClient
¶
A client for synchronous operations on a key-value store.
Provides methods to interact with a remote key-value store, allowing storage and retrieval of items within namespaced hierarchies.
Example
Methods:
Name | Description |
---|---|
put_item |
Store or update an item. |
get_item |
Retrieve a single item. |
delete_item |
Delete an item. |
search_items |
Search for items within a namespace prefix. |
list_namespaces |
List namespaces with optional match conditions. |
put_item
¶
put_item(
namespace: Sequence[str],
/,
key: str,
value: dict[str, Any],
index: Optional[
Union[Literal[False], list[str]]
] = None,
ttl: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
) -> None
Store or update an item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
Sequence[str]
|
A list of strings representing the namespace path. |
required |
key
|
str
|
The unique identifier for the item within the namespace. |
required |
value
|
dict[str, Any]
|
A dictionary containing the item's data. |
required |
index
|
Optional[Union[Literal[False], list[str]]]
|
Controls search indexing - None (use defaults), False (disable), or list of field paths to index. |
None
|
ttl
|
Optional[int]
|
Optional time-to-live in minutes for the item, or None for no expiration. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
get_item
¶
get_item(
namespace: Sequence[str],
/,
key: str,
*,
refresh_ttl: Optional[bool] = None,
headers: Optional[dict[str, str]] = None,
) -> Item
Retrieve a single item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
The unique identifier for the item. |
required |
namespace
|
Sequence[str]
|
Optional list of strings representing the namespace path. |
required |
refresh_ttl
|
Optional[bool]
|
Whether to refresh the TTL on this read operation. If None, uses the store's default behavior. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Item |
Item
|
The retrieved item. |
delete_item
¶
delete_item(
namespace: Sequence[str],
/,
key: str,
headers: Optional[dict[str, str]] = None,
) -> None
Delete an item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key
|
str
|
The unique identifier for the item. |
required |
namespace
|
Sequence[str]
|
Optional list of strings representing the namespace path. |
required |
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
None
|
None |
search_items
¶
search_items(
namespace_prefix: Sequence[str],
/,
filter: Optional[dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
query: Optional[str] = None,
refresh_ttl: Optional[bool] = None,
headers: Optional[dict[str, str]] = None,
) -> SearchItemsResponse
Search for items within a namespace prefix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace_prefix
|
Sequence[str]
|
List of strings representing the namespace prefix. |
required |
filter
|
Optional[dict[str, Any]]
|
Optional dictionary of key-value pairs to filter results. |
None
|
limit
|
int
|
Maximum number of items to return (default is 10). |
10
|
offset
|
int
|
Number of items to skip before returning results (default is 0). |
0
|
query
|
Optional[str]
|
Optional query for natural language search. |
None
|
refresh_ttl
|
Optional[bool]
|
Whether to refresh the TTL on items returned by this search. If None, uses the store's default behavior. |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
SearchItemsResponse
|
List[Item]: A list of items matching the search criteria. |
Example Usage
client = get_sync_client(url="http://localhost:8123")
items = client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces
¶
list_namespaces(
prefix: Optional[List[str]] = None,
suffix: Optional[List[str]] = None,
max_depth: Optional[int] = None,
limit: int = 100,
offset: int = 0,
headers: Optional[dict[str, str]] = None,
) -> ListNamespaceResponse
List namespaces with optional match conditions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix
|
Optional[List[str]]
|
Optional list of strings representing the prefix to filter namespaces. |
None
|
suffix
|
Optional[List[str]]
|
Optional list of strings representing the suffix to filter namespaces. |
None
|
max_depth
|
Optional[int]
|
Optional integer specifying the maximum depth of namespaces to return. |
None
|
limit
|
int
|
Maximum number of namespaces to return (default is 100). |
100
|
offset
|
int
|
Number of namespaces to skip before returning results (default is 0). |
0
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers to include with the request. |
None
|
Returns:
Type | Description |
---|---|
ListNamespaceResponse
|
List[List[str]]: A list of namespaces matching the criteria. |
get_client
¶
get_client(
*,
url: Optional[str] = None,
api_key: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
timeout: Optional[TimeoutTypes] = None
) -> LangGraphClient
Get a LangGraphClient instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url
|
Optional[str]
|
The URL of the LangGraph API. |
None
|
api_key
|
Optional[str]
|
The API key. If not provided, it will be read from the environment. Precedence: 1. explicit argument 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers |
None
|
timeout
|
Optional[TimeoutTypes]
|
Optional timeout configuration for the HTTP client. Accepts an httpx.Timeout instance, a float (seconds), or a tuple of timeouts. Tuple format is (connect, read, write, pool) If not provided, defaults to connect=5s, read=300s, write=300s, and pool=5s. |
None
|
Returns:
Name | Type | Description |
---|---|---|
LangGraphClient |
LangGraphClient
|
The top-level client for accessing AssistantsClient, |
LangGraphClient
|
ThreadsClient, RunsClient, and CronClient. |
get_sync_client
¶
get_sync_client(
*,
url: Optional[str] = None,
api_key: Optional[str] = None,
headers: Optional[dict[str, str]] = None,
timeout: Optional[TimeoutTypes] = None
) -> SyncLangGraphClient
Get a synchronous LangGraphClient instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url
|
Optional[str]
|
The URL of the LangGraph API. |
None
|
api_key
|
Optional[str]
|
The API key. If not provided, it will be read from the environment. Precedence: 1. explicit argument 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY |
None
|
headers
|
Optional[dict[str, str]]
|
Optional custom headers |
None
|
timeout
|
Optional[TimeoutTypes]
|
Optional timeout configuration for the HTTP client. Accepts an httpx.Timeout instance, a float (seconds), or a tuple of timeouts. Tuple format is (connect, read, write, pool) If not provided, defaults to connect=5s, read=300s, write=300s, and pool=5s. |
None
|
Returns: SyncLangGraphClient: The top-level synchronous client for accessing AssistantsClient, ThreadsClient, RunsClient, and CronClient.
Data models for interacting with the LangGraph API.
Classes:
Name | Description |
---|---|
Config |
Configuration options for a call. |
Checkpoint |
Represents a checkpoint in the execution process. |
GraphSchema |
Defines the structure and properties of a graph. |
AssistantBase |
Base model for an assistant. |
AssistantVersion |
Represents a specific version of an assistant. |
Assistant |
Represents an assistant with additional properties. |
Interrupt |
Represents an interruption in the execution flow. |
Thread |
Represents a conversation thread. |
ThreadTask |
Represents a task within a thread. |
ThreadState |
Represents the state of a thread. |
ThreadUpdateStateResponse |
Represents the response from updating a thread's state. |
Run |
Represents a single execution run. |
Cron |
Represents a scheduled task. |
RunCreate |
Defines the parameters for initiating a background run. |
Item |
Represents a single document or data entry in the graph's Store. |
ListNamespaceResponse |
Response structure for listing namespaces. |
SearchItem |
Item with an optional relevance score from search operations. |
SearchItemsResponse |
Response structure for searching items. |
StreamPart |
Represents a part of a stream response. |
Send |
Represents a message to be sent to a specific node in the graph. |
Command |
Represents one or more commands to control graph execution flow and state. |
Attributes:
Name | Type | Description |
---|---|---|
Json |
Represents a JSON-like structure, which can be None or a dictionary with string keys and any values. |
|
RunStatus |
Represents the status of a run: |
|
ThreadStatus |
Represents the status of a thread: |
|
StreamMode |
Defines the mode of streaming: |
|
DisconnectMode |
Specifies behavior on disconnection: |
|
MultitaskStrategy |
Defines how to handle multiple tasks: |
|
OnConflictBehavior |
Specifies behavior on conflict: |
|
OnCompletionBehavior |
Defines action after completion: |
|
All |
Represents a wildcard or 'all' selector. |
|
IfNotExists |
Specifies behavior if the thread doesn't exist: |
|
CancelAction |
Action to take when cancelling the run. |
|
AssistantSortBy |
The field to sort by. |
|
ThreadSortBy |
The field to sort by. |
|
SortOrder |
The order to sort by. |
Json
module-attribute
¶
Represents a JSON-like structure, which can be None or a dictionary with string keys and any values.
RunStatus
module-attribute
¶
RunStatus = Literal[
"pending",
"running",
"error",
"success",
"timeout",
"interrupted",
]
Represents the status of a run: - "pending": The run is waiting to start. - "running": The run is currently executing. - "error": The run encountered an error and stopped. - "success": The run completed successfully. - "timeout": The run exceeded its time limit. - "interrupted": The run was manually stopped or interrupted.
ThreadStatus
module-attribute
¶
ThreadStatus = Literal[
"idle", "busy", "interrupted", "error"
]
Represents the status of a thread: - "idle": The thread is not currently processing any task. - "busy": The thread is actively processing a task. - "interrupted": The thread's execution was interrupted. - "error": An exception occurred during task processing.
StreamMode
module-attribute
¶
StreamMode = Literal[
"values",
"messages",
"updates",
"events",
"debug",
"custom",
"messages-tuple",
]
Defines the mode of streaming: - "values": Stream only the values. - "messages": Stream complete messages. - "updates": Stream updates to the state. - "events": Stream events occurring during execution. - "debug": Stream detailed debug information. - "custom": Stream custom events.
DisconnectMode
module-attribute
¶
DisconnectMode = Literal['cancel', 'continue']
Specifies behavior on disconnection: - "cancel": Cancel the operation on disconnection. - "continue": Continue the operation even if disconnected.
MultitaskStrategy
module-attribute
¶
MultitaskStrategy = Literal[
"reject", "interrupt", "rollback", "enqueue"
]
Defines how to handle multiple tasks: - "reject": Reject new tasks when busy. - "interrupt": Interrupt current task for new ones. - "rollback": Roll back current task and start new one. - "enqueue": Queue new tasks for later execution.
OnConflictBehavior
module-attribute
¶
OnConflictBehavior = Literal['raise', 'do_nothing']
Specifies behavior on conflict: - "raise": Raise an exception when a conflict occurs. - "do_nothing": Ignore conflicts and proceed.
OnCompletionBehavior
module-attribute
¶
OnCompletionBehavior = Literal['delete', 'keep']
Defines action after completion: - "delete": Delete resources after completion. - "keep": Retain resources after completion.
IfNotExists
module-attribute
¶
IfNotExists = Literal['create', 'reject']
Specifies behavior if the thread doesn't exist: - "create": Create a new thread if it doesn't exist. - "reject": Reject the operation if the thread doesn't exist.
CancelAction
module-attribute
¶
CancelAction = Literal['interrupt', 'rollback']
Action to take when cancelling the run. - "interrupt": Simply cancel the run. - "rollback": Cancel the run. Then delete the run and associated checkpoints.
AssistantSortBy
module-attribute
¶
AssistantSortBy = Literal[
"assistant_id",
"graph_id",
"name",
"created_at",
"updated_at",
]
The field to sort by.
ThreadSortBy
module-attribute
¶
ThreadSortBy = Literal[
"thread_id", "status", "created_at", "updated_at"
]
The field to sort by.
Config
¶
Bases: TypedDict
Configuration options for a call.
Attributes:
Name | Type | Description |
---|---|---|
tags |
list[str]
|
Tags for this call and any sub-calls (eg. a Chain calling an LLM). |
recursion_limit |
int
|
Maximum number of times a call can recurse. If not provided, defaults to 25. |
configurable |
dict[str, Any]
|
Runtime values for attributes previously made configurable on this Runnable, |
tags
instance-attribute
¶
Tags for this call and any sub-calls (eg. a Chain calling an LLM). You can use these to filter calls.
recursion_limit
instance-attribute
¶
recursion_limit: int
Maximum number of times a call can recurse. If not provided, defaults to 25.
configurable
instance-attribute
¶
Runtime values for attributes previously made configurable on this Runnable, or sub-Runnables, through .configurable_fields() or .configurable_alternatives(). Check .output_schema() for a description of the attributes that have been made configurable.
Checkpoint
¶
Bases: TypedDict
Represents a checkpoint in the execution process.
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
str
|
Unique identifier for the thread associated with this checkpoint. |
checkpoint_ns |
str
|
Namespace for the checkpoint; used internally to manage subgraph state. |
checkpoint_id |
Optional[str]
|
Optional unique identifier for the checkpoint itself. |
checkpoint_map |
Optional[dict[str, Any]]
|
Optional dictionary containing checkpoint-specific data. |
thread_id
instance-attribute
¶
thread_id: str
Unique identifier for the thread associated with this checkpoint.
checkpoint_ns
instance-attribute
¶
checkpoint_ns: str
Namespace for the checkpoint; used internally to manage subgraph state.
checkpoint_id
instance-attribute
¶
Optional unique identifier for the checkpoint itself.
GraphSchema
¶
Bases: TypedDict
Defines the structure and properties of a graph.
Attributes:
Name | Type | Description |
---|---|---|
graph_id |
str
|
The ID of the graph. |
input_schema |
Optional[dict]
|
The schema for the graph input. |
output_schema |
Optional[dict]
|
The schema for the graph output. |
state_schema |
Optional[dict]
|
The schema for the graph state. |
config_schema |
Optional[dict]
|
The schema for the graph config. |
input_schema
instance-attribute
¶
The schema for the graph input. Missing if unable to generate JSON schema from graph.
output_schema
instance-attribute
¶
The schema for the graph output. Missing if unable to generate JSON schema from graph.
state_schema
instance-attribute
¶
The schema for the graph state. Missing if unable to generate JSON schema from graph.
AssistantBase
¶
Bases: TypedDict
Base model for an assistant.
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
str
|
The ID of the assistant. |
graph_id |
str
|
The ID of the graph. |
config |
Config
|
The assistant config. |
created_at |
datetime
|
The time the assistant was created. |
metadata |
Json
|
The assistant metadata. |
version |
int
|
The version of the assistant |
name |
str
|
The name of the assistant |
description |
Optional[str]
|
The description of the assistant |
AssistantVersion
¶
Bases: AssistantBase
Represents a specific version of an assistant.
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
str
|
The ID of the assistant. |
graph_id |
str
|
The ID of the graph. |
config |
Config
|
The assistant config. |
created_at |
datetime
|
The time the assistant was created. |
metadata |
Json
|
The assistant metadata. |
version |
int
|
The version of the assistant |
name |
str
|
The name of the assistant |
description |
Optional[str]
|
The description of the assistant |
Assistant
¶
Bases: AssistantBase
Represents an assistant with additional properties.
Attributes:
Name | Type | Description |
---|---|---|
updated_at |
datetime
|
The last time the assistant was updated. |
assistant_id |
str
|
The ID of the assistant. |
graph_id |
str
|
The ID of the graph. |
config |
Config
|
The assistant config. |
created_at |
datetime
|
The time the assistant was created. |
metadata |
Json
|
The assistant metadata. |
version |
int
|
The version of the assistant |
name |
str
|
The name of the assistant |
description |
Optional[str]
|
The description of the assistant |
Interrupt
¶
Bases: TypedDict
Represents an interruption in the execution flow.
Attributes:
Name | Type | Description |
---|---|---|
value |
Any
|
The value associated with the interrupt. |
when |
Literal['during']
|
When the interrupt occurred. |
resumable |
bool
|
Whether the interrupt can be resumed. |
ns |
Optional[list[str]]
|
Optional namespace for the interrupt. |
Thread
¶
Bases: TypedDict
Represents a conversation thread.
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
str
|
The ID of the thread. |
created_at |
datetime
|
The time the thread was created. |
updated_at |
datetime
|
The last time the thread was updated. |
metadata |
Json
|
The thread metadata. |
status |
ThreadStatus
|
The status of the thread, one of 'idle', 'busy', 'interrupted'. |
values |
Json
|
The current state of the thread. |
interrupts |
Dict[str, list[Interrupt]]
|
Interrupts which were thrown in this thread |
status
instance-attribute
¶
status: ThreadStatus
The status of the thread, one of 'idle', 'busy', 'interrupted'.
ThreadState
¶
Bases: TypedDict
Represents the state of a thread.
Attributes:
Name | Type | Description |
---|---|---|
values |
Union[list[dict], dict[str, Any]]
|
The state values. |
next |
Sequence[str]
|
The next nodes to execute. If empty, the thread is done until new input is |
checkpoint |
Checkpoint
|
The ID of the checkpoint. |
metadata |
Json
|
Metadata for this state |
created_at |
Optional[str]
|
Timestamp of state creation |
parent_checkpoint |
Optional[Checkpoint]
|
The ID of the parent checkpoint. If missing, this is the root checkpoint. |
tasks |
Sequence[ThreadTask]
|
Tasks to execute in this step. If already attempted, may contain an error. |
next
instance-attribute
¶
The next nodes to execute. If empty, the thread is done until new input is received.
parent_checkpoint
instance-attribute
¶
parent_checkpoint: Optional[Checkpoint]
The ID of the parent checkpoint. If missing, this is the root checkpoint.
tasks
instance-attribute
¶
tasks: Sequence[ThreadTask]
Tasks to execute in this step. If already attempted, may contain an error.
ThreadUpdateStateResponse
¶
Bases: TypedDict
Represents the response from updating a thread's state.
Attributes:
Name | Type | Description |
---|---|---|
checkpoint |
Checkpoint
|
Checkpoint of the latest state. |
Run
¶
Bases: TypedDict
Represents a single execution run.
Attributes:
Name | Type | Description |
---|---|---|
run_id |
str
|
The ID of the run. |
thread_id |
str
|
The ID of the thread. |
assistant_id |
str
|
The assistant that was used for this run. |
created_at |
datetime
|
The time the run was created. |
updated_at |
datetime
|
The last time the run was updated. |
status |
RunStatus
|
The status of the run. One of 'pending', 'running', "error", 'success', "timeout", "interrupted". |
metadata |
Json
|
The run metadata. |
multitask_strategy |
MultitaskStrategy
|
Strategy to handle concurrent runs on the same thread. |
status
instance-attribute
¶
status: RunStatus
The status of the run. One of 'pending', 'running', "error", 'success', "timeout", "interrupted".
multitask_strategy
instance-attribute
¶
multitask_strategy: MultitaskStrategy
Strategy to handle concurrent runs on the same thread.
Cron
¶
Bases: TypedDict
Represents a scheduled task.
Attributes:
Name | Type | Description |
---|---|---|
cron_id |
str
|
The ID of the cron. |
thread_id |
Optional[str]
|
The ID of the thread. |
end_time |
Optional[datetime]
|
The end date to stop running the cron. |
schedule |
str
|
The schedule to run, cron format. |
created_at |
datetime
|
The time the cron was created. |
updated_at |
datetime
|
The last time the cron was updated. |
payload |
dict
|
The run payload to use for creating new run. |
RunCreate
¶
Bases: TypedDict
Defines the parameters for initiating a background run.
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
Optional[str]
|
The identifier of the thread to run. If not provided, the run is stateless. |
assistant_id |
str
|
The identifier of the assistant to use for this run. |
input |
Optional[dict]
|
Initial input data for the run. |
metadata |
Optional[dict]
|
Additional metadata to associate with the run. |
config |
Optional[Config]
|
Configuration options for the run. |
checkpoint_id |
Optional[str]
|
The identifier of a checkpoint to resume from. |
interrupt_before |
Optional[list[str]]
|
List of node names to interrupt execution before. |
interrupt_after |
Optional[list[str]]
|
List of node names to interrupt execution after. |
webhook |
Optional[str]
|
URL to send webhook notifications about the run's progress. |
multitask_strategy |
Optional[MultitaskStrategy]
|
Strategy for handling concurrent runs on the same thread. |
thread_id
instance-attribute
¶
The identifier of the thread to run. If not provided, the run is stateless.
assistant_id
instance-attribute
¶
assistant_id: str
The identifier of the assistant to use for this run.
metadata
instance-attribute
¶
Additional metadata to associate with the run.
checkpoint_id
instance-attribute
¶
The identifier of a checkpoint to resume from.
interrupt_before
instance-attribute
¶
List of node names to interrupt execution before.
interrupt_after
instance-attribute
¶
List of node names to interrupt execution after.
webhook
instance-attribute
¶
URL to send webhook notifications about the run's progress.
multitask_strategy
instance-attribute
¶
multitask_strategy: Optional[MultitaskStrategy]
Strategy for handling concurrent runs on the same thread.
Item
¶
Bases: TypedDict
Represents a single document or data entry in the graph's Store.
Items are used to store cross-thread memories.
Attributes:
Name | Type | Description |
---|---|---|
namespace |
list[str]
|
The namespace of the item. A namespace is analogous to a document's directory. |
key |
str
|
The unique identifier of the item within its namespace. |
value |
dict[str, Any]
|
The value stored in the item. This is the document itself. |
created_at |
datetime
|
The timestamp when the item was created. |
updated_at |
datetime
|
The timestamp when the item was last updated. |
namespace
instance-attribute
¶
The namespace of the item. A namespace is analogous to a document's directory.
key
instance-attribute
¶
key: str
The unique identifier of the item within its namespace.
In general, keys needn't be globally unique.
value
instance-attribute
¶
The value stored in the item. This is the document itself.
ListNamespaceResponse
¶
Bases: TypedDict
Response structure for listing namespaces.
Attributes:
Name | Type | Description |
---|---|---|
namespaces |
list[list[str]]
|
A list of namespace paths, where each path is a list of strings. |
SearchItem
¶
Bases: Item
Item with an optional relevance score from search operations.
Attributes:
Name | Type | Description |
---|---|---|
score |
Optional[float]
|
Relevance/similarity score. Included when searching a compatible store with a natural language query. |
namespace
instance-attribute
¶
The namespace of the item. A namespace is analogous to a document's directory.
key
instance-attribute
¶
key: str
The unique identifier of the item within its namespace.
In general, keys needn't be globally unique.
value
instance-attribute
¶
The value stored in the item. This is the document itself.
SearchItemsResponse
¶
Bases: TypedDict
Response structure for searching items.
Attributes:
Name | Type | Description |
---|---|---|
items |
list[SearchItem]
|
A list of items matching the search criteria. |
StreamPart
¶
Bases: NamedTuple
Represents a part of a stream response.
Attributes:
Name | Type | Description |
---|---|---|
event |
str
|
The type of event for this stream part. |
data |
dict
|
The data payload associated with the event. |
Send
¶
Bases: TypedDict
Represents a message to be sent to a specific node in the graph.
This type is used to explicitly send messages to nodes in the graph, typically used within Command objects to control graph execution flow.
Attributes:
Name | Type | Description |
---|---|---|
node |
str
|
The name of the target node to send the message to. |
input |
Optional[dict[str, Any]]
|
Optional dictionary containing the input data to be passed to the node. |
Command
¶
Bases: TypedDict
Represents one or more commands to control graph execution flow and state.
This type defines the control commands that can be returned by nodes to influence graph execution. It lets you navigate to other nodes, update graph state, and resume from interruptions.
Attributes:
Name | Type | Description |
---|---|---|
goto |
Union[Send, str, Sequence[Union[Send, str]]]
|
Specifies where execution should continue. Can be: |
update |
Union[dict[str, Any], Sequence[Tuple[str, Any]]]
|
Updates to apply to the graph's state. Can be: |
resume |
Any
|
Value to resume execution with after an interruption. |
goto
instance-attribute
¶
Specifies where execution should continue. Can be:
- A string node name to navigate to
- A Send object to execute a node with specific input
- A sequence of node names or Send objects to execute in order
update
instance-attribute
¶
Updates to apply to the graph's state. Can be:
- A dictionary of state updates to merge
- A sequence of (key, value) tuples for ordered updates
Modules:
Name | Description |
---|---|
exceptions |
Exceptions used in the auth system. |
types |
Authentication and authorization types for LangGraph. |
Classes:
Name | Description |
---|---|
Auth |
Add custom authentication and authorization management to your LangGraph application. |
Auth
¶
Add custom authentication and authorization management to your LangGraph application.
The Auth class provides a unified system for handling authentication and authorization in LangGraph applications. It supports custom user authentication protocols and fine-grained authorization rules for different resources and actions.
To use, create a separate python file and add the path to the file to your
LangGraph API configuration file (langgraph.json
). Within that file, create
an instance of the Auth class and register authentication and authorization
handlers as needed.
Example langgraph.json
file:
{
"dependencies": ["."],
"graphs": {
"agent": "./my_agent/agent.py:graph"
},
"env": ".env",
"auth": {
"path": "./auth.py:my_auth"
}
Then the LangGraph server will load your auth file and run it server-side whenever a request comes in.
Basic Usage
from langgraph_sdk import Auth
my_auth = Auth()
async def verify_token(token: str) -> str:
# Verify token and return user_id
# This would typically be a call to your auth server
return "user_id"
@auth.authenticate
async def authenticate(authorization: str) -> str:
# Verify token and return user_id
result = await verify_token(authorization)
if result != "user_id":
raise Auth.exceptions.HTTPException(
status_code=401, detail="Unauthorized"
)
return result
# Global fallback handler
@auth.on
async def authorize_default(params: Auth.on.value):
return False # Reject all requests (default behavior)
@auth.on.threads.create
async def authorize_thread_create(params: Auth.on.threads.create.value):
# Allow the allowed user to create a thread
assert params.get("metadata", {}).get("owner") == "allowed_user"
@auth.on.store
async def authorize_store(ctx: Auth.types.AuthContext, value: Auth.types.on):
assert ctx.user.identity in value["namespace"], "Not authorized"
Request Processing Flow
- Authentication (your
@auth.authenticate
handler) is performed first on every request - For authorization, the most specific matching handler is called:
- If a handler exists for the exact resource and action, it is used (e.g.,
@auth.on.threads.create
) - Otherwise, if a handler exists for the resource with any action, it is used (e.g.,
@auth.on.threads
) - Finally, if no specific handlers match, the global handler is used (e.g.,
@auth.on
) - If no global handler is set, the request is accepted
- If a handler exists for the exact resource and action, it is used (e.g.,
This allows you to set default behavior with a global handler while overriding specific routes as needed.
Methods:
Name | Description |
---|---|
authenticate |
Register an authentication handler function. |
Attributes:
Name | Type | Description |
---|---|---|
types |
Reference to auth type definitions. |
|
exceptions |
Reference to auth exception definitions. |
|
on |
Entry point for authorization handlers that control access to specific resources. |
types
class-attribute
instance-attribute
¶
types = types
Reference to auth type definitions.
Provides access to all type definitions used in the auth system, like ThreadsCreate, AssistantsRead, etc.
exceptions
class-attribute
instance-attribute
¶
exceptions = exceptions
Reference to auth exception definitions.
Provides access to all exception definitions used in the auth system, like HTTPException, etc.
on
instance-attribute
¶
Entry point for authorization handlers that control access to specific resources.
The on class provides a flexible way to define authorization rules for different resources and actions in your application. It supports three main usage patterns:
- Global handlers that run for all resources and actions
- Resource-specific handlers that run for all actions on a resource
- Resource and action specific handlers for fine-grained control
Each handler must be an async function that accepts two parameters
- ctx (AuthContext): Contains request context and authenticated user info
- value: The data being authorized (type varies by endpoint)
The handler should return one of:
- None or True: Accept the request
- False: Reject with 403 error
- FilterType: Apply filtering rules to the response
Examples
Global handler for all requests:
@auth.on
async def reject_unhandled_requests(ctx: AuthContext, value: Any) -> None:
print(f"Request to {ctx.path} by {ctx.user.identity}")
return False
Resource-specific handler. This would take precedence over the global handler
for all actions on the threads
resource:
@auth.on.threads
async def check_thread_access(ctx: AuthContext, value: Any) -> bool:
# Allow access only to threads created by the user
return value.get("created_by") == ctx.user.identity
Resource and action specific handler:
@auth.on.threads.delete
async def prevent_thread_deletion(ctx: AuthContext, value: Any) -> bool:
# Only admins can delete threads
return "admin" in ctx.user.permissions
Multiple resources or actions:
@auth.on(resources=["threads", "runs"], actions=["create", "update"])
async def rate_limit_writes(ctx: AuthContext, value: Any) -> bool:
# Implement rate limiting for write operations
return await check_rate_limit(ctx.user.identity)
Auth for the store
resource is a bit different since its structure is developer defined.
You typically want to enforce user creds in the namespace. Y
authenticate
¶
Register an authentication handler function.
The authentication handler is responsible for verifying credentials and returning user scopes. It can accept any of the following parameters by name:
- request (Request): The raw ASGI request object
- body (dict): The parsed request body
- path (str): The request path, e.g., "/threads/abcd-1234-abcd-1234/runs/abcd-1234-abcd-1234/stream"
- method (str): The HTTP method, e.g., "GET"
- path_params (dict[str, str]): URL path parameters, e.g., {"thread_id": "abcd-1234-abcd-1234", "run_id": "abcd-1234-abcd-1234"}
- query_params (dict[str, str]): URL query parameters, e.g., {"stream": "true"}
- headers (dict[bytes, bytes]): Request headers
- authorization (str | None): The Authorization header value (e.g., "Bearer <token>")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
AH
|
The authentication handler function to register. Must return a representation of the user. This could be a: - string (the user id) - dict containing {"identity": str, "permissions": list[str]} - or an object with identity and permissions properties Permissions can be optionally used by your handlers downstream. |
required |
Returns:
Type | Description |
---|---|
AH
|
The registered handler function. |
Raises:
Type | Description |
---|---|
ValueError
|
If an authentication handler is already registered. |
Examples
Basic token authentication:
@auth.authenticate
async def authenticate(authorization: str) -> str:
user_id = verify_token(authorization)
return user_id
Accept the full request context:
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> str:
user = await verify_request(method, path, headers)
return user
Return user name and permissions:
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
permissions, user = await verify_request(method, path, headers)
# Permissions could be things like ["runs:read", "runs:write", "threads:read", "threads:write"]
return {
"identity": user["id"],
"permissions": permissions,
"display_name": user["name"],
}
Authentication and authorization types for LangGraph.
This module defines the core types used for authentication, authorization, and request handling in LangGraph. It includes user protocols, authentication contexts, and typed dictionaries for various API operations.
Note
All typing.TypedDict classes use total=False to make all fields typing.Optional by default.
Classes:
Name | Description |
---|---|
ThreadsCreate |
Parameters for creating a new thread. |
ThreadsRead |
Parameters for reading thread state or run information. |
ThreadsUpdate |
Parameters for updating a thread or run. |
ThreadsDelete |
Parameters for deleting a thread. |
ThreadsSearch |
Parameters for searching threads. |
RunsCreate |
Payload for creating a run. |
AssistantsCreate |
Payload for creating an assistant. |
AssistantsRead |
Payload for reading an assistant. |
AssistantsUpdate |
Payload for updating an assistant. |
AssistantsDelete |
Payload for deleting an assistant. |
AssistantsSearch |
Payload for searching assistants. |
StoreGet |
Operation to retrieve a specific item by its namespace and key. |
StoreSearch |
Operation to search for items within a specified namespace hierarchy. |
StoreListNamespaces |
Operation to list and filter namespaces in the store. |
StorePut |
Operation to store, update, or delete an item in the store. |
StoreDelete |
Operation to delete an item from the store. |
on |
Namespace for type definitions of different API operations. |
Attributes:
Name | Type | Description |
---|---|---|
MetadataInput |
Type for arbitrary metadata attached to entities. |
RunStatus
module-attribute
¶
RunStatus = Literal[
"pending", "error", "success", "timeout", "interrupted"
]
Status of a run execution.
Values
- pending: Run is queued or in progress
- error: Run failed with an error
- success: Run completed successfully
- timeout: Run exceeded time limit
- interrupted: Run was manually interrupted
MultitaskStrategy
module-attribute
¶
MultitaskStrategy = Literal[
"reject", "rollback", "interrupt", "enqueue"
]
Strategy for handling multiple concurrent tasks.
Values
- reject: Reject new tasks while one is in progress
- rollback: Cancel current task and start new one
- interrupt: Interrupt current task and start new one
- enqueue: Queue new tasks to run after current one
OnConflictBehavior
module-attribute
¶
OnConflictBehavior = Literal['raise', 'do_nothing']
Behavior when encountering conflicts.
Values
- raise: Raise an exception on conflict
- do_nothing: Silently ignore conflicts
IfNotExists
module-attribute
¶
IfNotExists = Literal['create', 'reject']
Behavior when an entity doesn't exist.
Values
- create: Create the entity
- reject: Reject the operation
FilterType
module-attribute
¶
FilterType = Union[
Dict[
str,
Union[str, Dict[Literal["$eq", "$contains"], str]],
],
Dict[str, str],
]
Response type for authorization handlers.
Supports exact matches and operators
- Exact match shorthand: {"field": "value"}
- Exact match: {"field": {"$eq": "value"}}
- Contains: {"field": {"$contains": "value"}}
Examples
Simple exact match filter for the resource owner:
Explicit version of the exact match filter:
Containment:
Combining filters (treated as a logical AND
):
ThreadStatus
module-attribute
¶
ThreadStatus = Literal[
"idle", "busy", "interrupted", "error"
]
Status of a thread.
Values
- idle: Thread is available for work
- busy: Thread is currently processing
- interrupted: Thread was interrupted
- error: Thread encountered an error
MetadataInput
module-attribute
¶
HandlerResult
module-attribute
¶
HandlerResult = Union[None, bool, FilterType]
The result of a handler can be: * None | True: accept the request. * False: reject the request with a 403 error * FilterType: filter to apply
Authenticator
module-attribute
¶
Authenticator = Callable[
...,
Awaitable[
Union[
MinimalUser,
str,
BaseUser,
MinimalUserDict,
Mapping[str, Any],
],
],
]
Type for authentication functions.
An authenticator can return either: 1. A string (user_id) 2. A dict containing {"identity": str, "permissions": list[str]} 3. An object with identity and permissions properties
Permissions can be used downstream by your authorization logic to determine access permissions to different resources.
The authenticate decorator will automatically inject any of the following parameters by name if they are included in your function signature:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request
|
Request
|
The raw ASGI request object |
required |
body
|
dict
|
The parsed request body |
required |
path
|
str
|
The request path |
required |
method
|
str
|
The HTTP method (GET, POST, etc.) |
required |
path_params
|
dict[str, str] | None
|
URL path parameters |
required |
query_params
|
dict[str, str] | None
|
URL query parameters |
required |
headers
|
dict[str, bytes] | None
|
Request headers |
required |
authorization
|
str | None
|
The Authorization header value (e.g. "Bearer |
required |
Examples
Basic authentication with token:
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate1(authorization: str) -> Auth.types.MinimalUserDict:
return await get_user(authorization)
Authentication with multiple parameters:
@auth.authenticate
async def authenticate2(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
# Custom auth logic using method, path and headers
user = verify_request(method, path, headers)
return user
Accepting the raw ASGI request:
MY_SECRET = "my-secret-key"
@auth.authenticate
async def get_current_user(request: Request) -> Auth.types.MinimalUserDict:
try:
token = (request.headers.get("authorization") or "").split(" ", 1)[1]
payload = jwt.decode(token, MY_SECRET, algorithms=["HS256"])
except (IndexError, InvalidTokenError):
raise HTTPException(
status_code=401,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.myauth-provider.com/auth/v1/user",
headers={"Authorization": f"Bearer {MY_SECRET}"}
)
if response.status_code != 200:
raise HTTPException(status_code=401, detail="User not found")
user_data = response.json()
return {
"identity": user_data["id"],
"display_name": user_data.get("name"),
"permissions": user_data.get("permissions", []),
"is_authenticated": True,
}
MinimalUser
¶
MinimalUserDict
¶
Bases: TypedDict
The dictionary representation of a user.
Attributes:
Name | Type | Description |
---|---|---|
identity |
Required[str]
|
The required unique identifier for the user. |
display_name |
str
|
The typing.Optional display name for the user. |
is_authenticated |
bool
|
Whether the user is authenticated. Defaults to True. |
permissions |
Sequence[str]
|
A list of permissions associated with the user. |
BaseUser
¶
Bases: Protocol
The base ASGI user protocol
Methods:
Name | Description |
---|---|
__getitem__ |
Get a key from your minimal user dict. |
__contains__ |
Check if a property exists. |
__iter__ |
Iterate over the keys of the user. |
Attributes:
Name | Type | Description |
---|---|---|
is_authenticated |
bool
|
Whether the user is authenticated. |
display_name |
str
|
The display name of the user. |
identity |
str
|
The unique identifier for the user. |
permissions |
Sequence[str]
|
The permissions associated with the user. |
StudioUser
¶
A user object that's populated from authenticated requests from the LangGraph studio.
Note: Studio auth can be disabled in your langgraph.json
config.
You can use isinstance
checks in your authorization handlers (@auth.on
) to control access specifically
for developers accessing the instance from the LangGraph Studio UI.
Examples
BaseAuthContext
¶
Base class for authentication context.
Provides the fundamental authentication information needed for authorization decisions.
Attributes:
Name | Type | Description |
---|---|---|
permissions |
Sequence[str]
|
The permissions granted to the authenticated user. |
user |
BaseUser
|
The authenticated user. |
AuthContext
¶
Bases: BaseAuthContext
Complete authentication context with resource and action information.
Extends BaseAuthContext with specific resource and action being accessed, allowing for fine-grained access control decisions.
Attributes:
Name | Type | Description |
---|---|---|
resource |
Literal['runs', 'threads', 'crons', 'assistants', 'store']
|
The resource being accessed. |
action |
Literal['create', 'read', 'update', 'delete', 'search', 'create_run', 'put', 'get', 'list_namespaces']
|
The action being performed on the resource. |
permissions |
Sequence[str]
|
The permissions granted to the authenticated user. |
user |
BaseUser
|
The authenticated user. |
resource
instance-attribute
¶
resource: Literal[
"runs", "threads", "crons", "assistants", "store"
]
The resource being accessed.
action
instance-attribute
¶
action: Literal[
"create",
"read",
"update",
"delete",
"search",
"create_run",
"put",
"get",
"list_namespaces",
]
The action being performed on the resource.
Most resources support the following actions: - create: Create a new resource - read: Read information about a resource - update: Update an existing resource - delete: Delete a resource - search: Search for resources
The store supports the following actions: - put: Add or update a document in the store - get: Get a document from the store - list_namespaces: List the namespaces in the store
permissions
instance-attribute
¶
The permissions granted to the authenticated user.
ThreadsCreate
¶
Bases: TypedDict
Parameters for creating a new thread.
Examples
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
UUID
|
Unique identifier for the thread. |
metadata |
MetadataInput
|
typing.Optional metadata to attach to the thread. |
if_exists |
OnConflictBehavior
|
Behavior when a thread with the same ID already exists. |
metadata
instance-attribute
¶
metadata: MetadataInput
typing.Optional metadata to attach to the thread.
if_exists
instance-attribute
¶
if_exists: OnConflictBehavior
Behavior when a thread with the same ID already exists.
ThreadsRead
¶
Bases: TypedDict
Parameters for reading thread state or run information.
This type is used in three contexts: 1. Reading thread, thread version, or thread state information: Only thread_id is provided 2. Reading run information: Both thread_id and run_id are provided
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
UUID
|
Unique identifier for the thread. |
run_id |
Optional[UUID]
|
Run ID to filter by. Only used when reading run information within a thread. |
ThreadsUpdate
¶
Bases: TypedDict
Parameters for updating a thread or run.
Called for updates to a thread, thread version, or run cancellation.
Attributes:
Name | Type | Description |
---|---|---|
thread_id |
UUID
|
Unique identifier for the thread. |
metadata |
MetadataInput
|
typing.Optional metadata to update. |
action |
Optional[Literal['interrupt', 'rollback']]
|
typing.Optional action to perform on the thread. |
ThreadsDelete
¶
ThreadsSearch
¶
Bases: TypedDict
Parameters for searching threads.
Called for searches to threads or runs.
Attributes:
Name | Type | Description |
---|---|---|
metadata |
MetadataInput
|
typing.Optional metadata to filter by. |
values |
MetadataInput
|
typing.Optional values to filter by. |
status |
Optional[ThreadStatus]
|
typing.Optional status to filter by. |
limit |
int
|
Maximum number of results to return. |
offset |
int
|
Offset for pagination. |
thread_id |
Optional[UUID]
|
typing.Optional thread ID to filter by. |
RunsCreate
¶
Bases: TypedDict
Payload for creating a run.
Examples
create_params = {
"assistant_id": UUID("123e4567-e89b-12d3-a456-426614174000"),
"thread_id": UUID("123e4567-e89b-12d3-a456-426614174001"),
"run_id": UUID("123e4567-e89b-12d3-a456-426614174002"),
"status": "pending",
"metadata": {"owner": "user123"},
"prevent_insert_if_inflight": True,
"multitask_strategy": "reject",
"if_not_exists": "create",
"after_seconds": 10,
"kwargs": {"key": "value"},
"action": "interrupt"
}
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
Optional[UUID]
|
typing.Optional assistant ID to use for this run. |
thread_id |
Optional[UUID]
|
typing.Optional thread ID to use for this run. |
run_id |
Optional[UUID]
|
typing.Optional run ID to use for this run. |
status |
Optional[RunStatus]
|
typing.Optional status for this run. |
metadata |
MetadataInput
|
typing.Optional metadata for the run. |
prevent_insert_if_inflight |
bool
|
Prevent inserting a new run if one is already in flight. |
multitask_strategy |
MultitaskStrategy
|
Multitask strategy for this run. |
if_not_exists |
IfNotExists
|
IfNotExists for this run. |
after_seconds |
int
|
Number of seconds to wait before creating the run. |
kwargs |
Dict[str, Any]
|
Keyword arguments to pass to the run. |
action |
Optional[Literal['interrupt', 'rollback']]
|
Action to take if updating an existing run. |
assistant_id
instance-attribute
¶
typing.Optional assistant ID to use for this run.
thread_id
instance-attribute
¶
typing.Optional thread ID to use for this run.
prevent_insert_if_inflight
instance-attribute
¶
prevent_insert_if_inflight: bool
Prevent inserting a new run if one is already in flight.
multitask_strategy
instance-attribute
¶
multitask_strategy: MultitaskStrategy
Multitask strategy for this run.
AssistantsCreate
¶
Bases: TypedDict
Payload for creating an assistant.
Examples
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
UUID
|
Unique identifier for the assistant. |
graph_id |
str
|
Graph ID to use for this assistant. |
config |
Optional[Union[Dict[str, Any], Any]]
|
typing.Optional configuration for the assistant. |
metadata |
MetadataInput
|
typing.Optional metadata to attach to the assistant. |
if_exists |
OnConflictBehavior
|
Behavior when an assistant with the same ID already exists. |
name |
str
|
Name of the assistant. |
config
instance-attribute
¶
typing.Optional configuration for the assistant.
metadata
instance-attribute
¶
metadata: MetadataInput
typing.Optional metadata to attach to the assistant.
if_exists
instance-attribute
¶
if_exists: OnConflictBehavior
Behavior when an assistant with the same ID already exists.
AssistantsRead
¶
Bases: TypedDict
Payload for reading an assistant.
Examples
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
UUID
|
Unique identifier for the assistant. |
metadata |
MetadataInput
|
typing.Optional metadata to filter by. |
AssistantsUpdate
¶
Bases: TypedDict
Payload for updating an assistant.
Examples
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
UUID
|
Unique identifier for the assistant. |
graph_id |
Optional[str]
|
typing.Optional graph ID to update. |
config |
Optional[Union[Dict[str, Any], Any]]
|
typing.Optional configuration to update. |
metadata |
MetadataInput
|
typing.Optional metadata to update. |
name |
Optional[str]
|
typing.Optional name to update. |
version |
Optional[int]
|
typing.Optional version to update. |
config
instance-attribute
¶
typing.Optional configuration to update.
AssistantsDelete
¶
Bases: TypedDict
Payload for deleting an assistant.
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
UUID
|
Unique identifier for the assistant. |
AssistantsSearch
¶
CronsCreate
¶
Bases: TypedDict
Payload for creating a cron job.
Examples
Attributes:
Name | Type | Description |
---|---|---|
payload |
Dict[str, Any]
|
Payload for the cron job. |
schedule |
str
|
Schedule for the cron job. |
cron_id |
Optional[UUID]
|
typing.Optional unique identifier for the cron job. |
thread_id |
Optional[UUID]
|
typing.Optional thread ID to use for this cron job. |
user_id |
Optional[str]
|
typing.Optional user ID to use for this cron job. |
end_time |
Optional[datetime]
|
typing.Optional end time for the cron job. |
CronsDelete
¶
CronsRead
¶
CronsUpdate
¶
CronsSearch
¶
Bases: TypedDict
Payload for searching cron jobs.
Examples
Attributes:
Name | Type | Description |
---|---|---|
assistant_id |
Optional[UUID]
|
typing.Optional assistant ID to filter by. |
thread_id |
Optional[UUID]
|
typing.Optional thread ID to filter by. |
limit |
int
|
Maximum number of results to return. |
offset |
int
|
Offset for pagination. |
assistant_id
instance-attribute
¶
typing.Optional assistant ID to filter by.
StoreGet
¶
StoreSearch
¶
Bases: TypedDict
Operation to search for items within a specified namespace hierarchy.
Attributes:
Name | Type | Description |
---|---|---|
namespace |
tuple[str, ...]
|
Prefix filter for defining the search scope. |
filter |
Optional[dict[str, Any]]
|
Key-value pairs for filtering results based on exact matches or comparison operators. |
limit |
int
|
Maximum number of items to return in the search results. |
offset |
int
|
Number of matching items to skip for pagination. |
query |
Optional[str]
|
Naturalj language search query for semantic search capabilities. |
StoreListNamespaces
¶
Bases: TypedDict
Operation to list and filter namespaces in the store.
Attributes:
Name | Type | Description |
---|---|---|
namespace |
Optional[tuple[str, ...]]
|
Prefix filter namespaces. |
suffix |
Optional[tuple[str, ...]]
|
Optional conditions for filtering namespaces. |
max_depth |
Optional[int]
|
Maximum depth of namespace hierarchy to return. |
limit |
int
|
Maximum number of namespaces to return. |
offset |
int
|
Number of namespaces to skip for pagination. |
StorePut
¶
Bases: TypedDict
Operation to store, update, or delete an item in the store.
Attributes:
Name | Type | Description |
---|---|---|
namespace |
tuple[str, ...]
|
Hierarchical path that identifies the location of the item. |
key |
str
|
Unique identifier for the item within its namespace. |
value |
Optional[dict[str, Any]]
|
The data to store, or None to mark the item for deletion. |
index |
Optional[Union[Literal[False], list[str]]]
|
Optional index configuration for full-text search. |
StoreDelete
¶
on
¶
Namespace for type definitions of different API operations.
This class organizes type definitions for create, read, update, delete, and search operations across different resources (threads, assistants, crons).
Usage
from langgraph_sdk import Auth
auth = Auth()
@auth.on
def handle_all(params: Auth.on.value):
raise Exception("Not authorized")
@auth.on.threads.create
def handle_thread_create(params: Auth.on.threads.create.value):
# Handle thread creation
pass
@auth.on.assistants.search
def handle_assistant_search(params: Auth.on.assistants.search.value):
# Handle assistant search
pass
Classes:
Name | Description |
---|---|
threads |
Types for thread-related operations. |
assistants |
Types for assistant-related operations. |
crons |
Types for cron-related operations. |
store |
Types for store-related operations. |
threads
¶
Types for thread-related operations.
Classes:
Name | Description |
---|---|
create |
Type for thread creation parameters. |
create_run |
Type for creating or streaming a run. |
read |
Type for thread read parameters. |
update |
Type for thread update parameters. |
delete |
Type for thread deletion parameters. |
search |
Type for thread search parameters. |
assistants
¶
crons
¶
Exceptions used in the auth system.
Classes:
Name | Description |
---|---|
HTTPException |
HTTP exception that you can raise to return a specific HTTP error response. |
HTTPException
¶
Bases: Exception
HTTP exception that you can raise to return a specific HTTP error response.
Since this is defined in the auth module, we default to a 401 status code.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status_code
|
int
|
HTTP status code for the error. Defaults to 401 "Unauthorized". |
401
|
detail
|
Optional[str]
|
Detailed error message. If None, uses a default message based on the status code. |
None
|
headers
|
Optional[Mapping[str, str]]
|
Additional HTTP headers to include in the error response. |
None
|
Example
Default:
Add headers:
raise HTTPException(headers={"X-Custom-Header": "Custom Value"})
# HTTPException(status_code=401, detail='Unauthorized', headers={"WWW-Authenticate": "Bearer"})
Custom error: