Python SDK Reference¶
The LangGraph client implementations connect to the LangGraph API.
This module provides both asynchronous (LangGraphClient) and synchronous (SyncLanggraphClient) clients to interacting with the LangGraph API's core resources such as Assistants, Threads, Runs, and Cron jobs, as well as its persistent document Store.
LangGraphClient
¶
Top-level client for LangGraph API.
Attributes:
-
–assistants Manages versioned configuration for your graphs.
-
–threads Handles (potentially) multi-turn interactions, such as conversational threads.
-
–runs Controls individual invocations of the graph.
-
–crons Manages scheduled operations.
-
–store Interfaces with persistent, shared data storage.
HttpClient
¶
Handle async requests to the LangGraph API.
Adds additional error messaging & content handling above the provided httpx client.
Attributes:
-
(client
) –AsyncClient Underlying HTTPX async client.
get
async
¶
get(
path: str ,
*,
params: Optional [QueryParamTypes ] = None,
headers: Optional [dict [str , str ]] = None
) -> Any
Send a GET request.
post
async
¶
Send a POST request.
put
async
¶
Send a PUT request.
patch
async
¶
Send a PATCH request.
delete
async
¶
delete(
path: str ,
*,
json: Optional [Any ] = None,
headers: Optional [dict [str , str ]] = None
) -> None
Send a DELETE request.
stream
async
¶
stream(
path: str ,
method: str ,
*,
json: Optional [dict ] = None,
params: Optional [QueryParamTypes ] = None,
headers: Optional [dict [str , str ]] = None
) -> AsyncIterator [StreamPart ]
Stream results using SSE.
AssistantsClient
¶
Client for managing assistants in LangGraph.
This class provides methods to interact with assistants, which are versioned configurations of your graph.
Example:
client = get_client()
assistant = await client.assistants.get("assistant_id_123")
get
async
¶
Get an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Assistant
(
) –Assistant Assistant Object.
Example Usage:
assistant = await client.assistants.get(
assistant_id="my_assistant_id"
)
print(assistant)
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'},
'version': 1,
'name': 'my_assistant'
}
get_graph
async
¶
get_graph(
assistant_id: str ,
*,
xray: Union [int , bool ] = False,
headers: Optional [dict [str , str ]] = None
) -> dict [str , list [dict [str , Any ]]]
Get the graph of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the graph of.
-
xray
(
, default:Union [int ,bool ]False
) –Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Graph
(
) –dict [str ,list [dict [str ,Any ]]]The graph information for the assistant in JSON format.
Example Usage:
graph_info = await client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas
async
¶
Get the schemas of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the schema of.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
GraphSchema
(
) –GraphSchema The graph schema for the assistant.
Example Usage:
schema = await client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs
async
¶
get_subgraphs(
assistant_id: str ,
namespace: Optional [str ] = None,
recurse: bool = False,
*,
headers: Optional [dict [str , str ]] = None
) -> Subgraphs
Get the schemas of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the schema of.
-
namespace
(
, default:Optional [str ]None
) –Optional namespace to filter by.
-
recurse
(
, default:bool False
) –Whether to recursively get subgraphs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Subgraphs
(
) –Subgraphs The graph schema for the assistant.
create
async
¶
create(
graph_id: Optional [str ],
config: Optional [Config ] = None,
*,
metadata: Json = None,
assistant_id: Optional [str ] = None,
if_exists: Optional [OnConflictBehavior ] = None,
name: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None,
description: Optional [str ] = None
) -> Assistant
Create a new assistant.
Useful when graph is configurable and you want to create different assistants based on different configurations.
Parameters:
-
graph_id
(
) –Optional [str ]The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration.
-
config
(
, default:Optional [Config ]None
) –Configuration to use for the graph.
-
metadata
(
, default:Json None
) –Metadata to add to assistant.
-
assistant_id
(
, default:Optional [str ]None
) –Assistant ID to use, will default to a random UUID if not provided.
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing assistant).
-
name
(
, default:Optional [str ]None
) –The name of the assistant. Defaults to 'Untitled' under the hood.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
-
description
(
, default:Optional [str ]None
) –Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45
Returns:
-
Assistant
(
) –Assistant The created assistant.
Example Usage:
assistant = await client.assistants.create(
graph_id="agent",
config={"configurable": {"model_name": "openai"}},
metadata={"number":1},
assistant_id="my-assistant-id",
if_exists="do_nothing",
name="my_name"
)
update
async
¶
update(
assistant_id: str ,
*,
graph_id: Optional [str ] = None,
config: Optional [Config ] = None,
metadata: Json = None,
name: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None,
description: Optional [str ] = None
) -> Assistant
Update an assistant.
Use this to point to a different graph, update the configuration, or change the metadata of an assistant.
Parameters:
-
assistant_id
(
) –str Assistant to update.
-
graph_id
(
, default:Optional [str ]None
) –The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. If None, assistant will keep pointing to same graph.
-
config
(
, default:Optional [Config ]None
) –Configuration to use for the graph.
-
metadata
(
, default:Json None
) –Metadata to merge with existing assistant metadata.
-
name
(
, default:Optional [str ]None
) –The new name for the assistant.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
-
description
(
, default:Optional [str ]None
) –Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45
Returns:
-
Assistant
(
) –Assistant The updated assistant.
Example Usage:
assistant = await client.assistants.update(
assistant_id='e280dad7-8618-443f-87f1-8e41841c180f',
graph_id="other-graph",
config={"configurable": {"model_name": "anthropic"}},
metadata={"number":2}
)
delete
async
¶
Delete an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.assistants.delete(
assistant_id="my_assistant_id"
)
search
async
¶
search(
*,
metadata: Json = None,
graph_id: Optional [str ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Assistant ]
Search for assistants.
Parameters:
-
metadata
(
, default:Json None
) –Metadata to filter by. Exact match filter for each KV pair.
-
graph_id
(
, default:Optional [str ]None
) –The ID of the graph to filter by. The graph ID is normally set in your langgraph.json configuration.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Assistant ]list[Assistant]: A list of assistants.
Example Usage:
assistants = await client.assistants.search(
metadata = {"name":"my_name"},
graph_id="my_graph_id",
limit=5,
offset=5
)
get_versions
async
¶
get_versions(
assistant_id: str ,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Optional [dict [str , str ]] = None
) -> list [AssistantVersion ]
List all versions of an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to get versions for.
-
metadata
(
, default:Json None
) –Metadata to filter versions by. Exact match filter for each KV pair.
-
limit
(
, default:int 10
) –The maximum number of versions to return.
-
offset
(
, default:int 0
) –The number of versions to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [AssistantVersion ]list[AssistantVersion]: A list of assistant versions.
Example Usage:
assistant_versions = await client.assistants.get_versions(
assistant_id="my_assistant_id"
)
set_latest
async
¶
set_latest(
assistant_id: str ,
version: int ,
*,
headers: Optional [dict [str , str ]] = None
) -> Assistant
Change the version of an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to delete.
-
version
(
) –int The version to change to.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Assistant
(
) –Assistant Assistant Object.
Example Usage:
new_version_assistant = await client.assistants.set_latest(
assistant_id="my_assistant_id",
version=3
)
ThreadsClient
¶
Client for managing threads in LangGraph.
A thread maintains the state of a graph across multiple interactions/invocations (aka runs). It accumulates and persists the graph's state, allowing for continuity between separate invocations of the graph.
Example:
client = get_client()
new_thread = await client.threads.create(metadata={"user_id": "123"})
get
async
¶
Get a thread by ID.
Parameters:
-
thread_id
(
) –str The ID of the thread to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread Thread object.
Example Usage:
thread = await client.threads.get(
thread_id="my_thread_id"
)
print(thread)
-----------------------------------------------------
{
'thread_id': 'my_thread_id',
'created_at': '2024-07-18T18:35:15.540834+00:00',
'updated_at': '2024-07-18T18:35:15.540834+00:00',
'metadata': {'graph_id': 'agent'}
}
create
async
¶
create(
*,
metadata: Json = None,
thread_id: Optional [str ] = None,
if_exists: Optional [OnConflictBehavior ] = None,
supersteps: Optional [
Sequence [dict [str , Sequence [dict [str , Any ]]]]
] = None,
graph_id: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Thread
Create a new thread.
Parameters:
-
metadata
(
, default:Json None
) –Metadata to add to thread.
-
thread_id
(
, default:Optional [str ]None
) –ID of thread. If None, ID will be a randomly generated UUID.
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing thread).
-
supersteps
(
, default:Optional [Sequence [dict [str ,Sequence [dict [str ,Any ]]]]]None
) –Apply a list of supersteps when creating a thread, each containing a sequence of updates. Each update has
values
orcommand
andas_node
. Used for copying a thread between deployments. -
graph_id
(
, default:Optional [str ]None
) –Optional graph ID to associate with the thread.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread The created thread.
Example Usage:
thread = await client.threads.create(
metadata={"number":1},
thread_id="my-thread-id",
if_exists="raise"
)
update
async
¶
update(
thread_id: str ,
*,
metadata: dict [str , Any ],
headers: Optional [dict [str , str ]] = None
) -> Thread
Update a thread.
Parameters:
-
thread_id
(
) –str ID of thread to update.
-
metadata
(
) –dict [str ,Any ]Metadata to merge with existing thread metadata.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread The created thread.
Example Usage:
thread = await client.threads.update(
thread_id="my-thread-id",
metadata={"number":1},
)
delete
async
¶
Delete a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.threads.delete(
thread_id="my_thread_id"
)
search
async
¶
search(
*,
metadata: Json = None,
values: Json = None,
status: Optional [ThreadStatus ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Thread ]
Search for threads.
Parameters:
-
metadata
(
, default:Json None
) –Thread metadata to filter on.
-
values
(
, default:Json None
) –State values to filter on.
-
status
(
, default:Optional [ThreadStatus ]None
) –Thread status to filter on. Must be one of 'idle', 'busy', 'interrupted' or 'error'.
-
limit
(
, default:int 10
) –Limit on number of threads to return.
-
offset
(
, default:int 0
) –Offset in threads table to start search from.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Thread ]list[Thread]: List of the threads matching the search parameters.
Example Usage:
threads = await client.threads.search(
metadata={"number":1},
status="interrupted",
limit=15,
offset=5
)
copy
async
¶
Copy a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to copy.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.threads.copy(
thread_id="my_thread_id"
)
get_state
async
¶
get_state(
thread_id: str ,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
*,
subgraphs: bool = False,
headers: Optional [dict [str , str ]] = None
) -> ThreadState
Get the state of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to get the state of.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to get the state of.
-
checkpoint_id
(
, default:Optional [str ]None
) –(deprecated) The checkpoint ID to get the state of.
-
subgraphs
(
, default:bool False
) –Include subgraphs states.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
ThreadState
(
) –ThreadState the thread of the state.
Example Usage:
thread_state = await client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state
async
¶
update_state(
thread_id: str ,
values: Optional [Union [dict , Sequence [dict ]]],
*,
as_node: Optional [str ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> ThreadUpdateStateResponse
Update the state of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to update.
-
values
(
) –Optional [Union [dict ,Sequence [dict ]]]The values to update the state with.
-
as_node
(
, default:Optional [str ]None
) –Update the state as if this node had just executed.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to update the state of.
-
checkpoint_id
(
, default:Optional [str ]None
) –(deprecated) The checkpoint ID to update the state of.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
ThreadUpdateStateResponse
(
) –ThreadUpdateStateResponse Response after updating a thread's state.
Example Usage:
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history
async
¶
get_history(
thread_id: str ,
*,
limit: int = 10,
before: Optional [str | Checkpoint ] = None,
metadata: Optional [dict ] = None,
checkpoint: Optional [Checkpoint ] = None,
headers: Optional [dict [str , str ]] = None
) -> list [ThreadState ]
Get the state history of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to get the state history for.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –Return states for this subgraph. If empty defaults to root.
-
limit
(
, default:int 10
) –The maximum number of states to return.
-
before
(
, default:Optional [str |Checkpoint ]None
) –Return states before this checkpoint.
-
metadata
(
, default:Optional [dict ]None
) –Filter states by metadata key-value pairs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [ThreadState ]list[ThreadState]: the state history of the thread.
Example Usage:
thread_state = await client.threads.get_history(
thread_id="my_thread_id",
limit=5,
)
RunsClient
¶
Client for managing runs in LangGraph.
A run is a single assistant invocation with optional input, config, and metadata. This client manages runs, which can be stateful (on threads) or stateless.
Example:
client = get_client()
run = await client.runs.create(assistant_id="asst_123", thread_id="thread_456", input={"query": "Hello"})
stream
¶
stream(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
stream_mode: Union [
StreamMode , Sequence [StreamMode ]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
feedback_keys: Optional [Sequence [str ]] = None,
on_disconnect: Optional [DisconnectMode ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
if_not_exists: Optional [IfNotExists ] = None,
after_seconds: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None
) -> AsyncIterator [StreamPart ]
Create a run and stream the results.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to assign to the thread. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –A command to execute. Cannot be combined with input.
-
stream_mode
(
, default:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –The stream mode(s) to use.
-
stream_subgraphs
(
, default:bool False
) –Whether to stream output from subgraphs.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
feedback_keys
(
, default:Optional [Sequence [str ]]None
) –Feedback keys to assign to run.
-
on_disconnect
(
, default:Optional [DisconnectMode ]None
) –The disconnect mode to use. Must be one of 'cancel' or 'continue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
Returns:
-
–AsyncIterator [StreamPart ]AsyncIterator[StreamPart]: Asynchronous iterator of stream results.
Example Usage:
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create
async
¶
create(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
stream_mode: Union [
StreamMode , Sequence [StreamMode ]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
if_not_exists: Optional [IfNotExists ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
after_seconds: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a background run.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to assign to the thread. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –A command to execute. Cannot be combined with input.
-
stream_mode
(
, default:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –The stream mode(s) to use.
-
stream_subgraphs
(
, default:bool False
) –Whether to stream output from subgraphs.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The created background run.
Example Usage:
background_run = await client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch
async
¶
Create a batch of stateless background runs.
wait
async
¶
wait(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
webhook: Optional [str ] = None,
on_disconnect: Optional [DisconnectMode ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
if_not_exists: Optional [IfNotExists ] = None,
after_seconds: Optional [int ] = None,
raise_error: bool = True,
headers: Optional [dict [str , str ]] = None
) -> Union [list [dict ], dict [str , Any ]]
Create a run, wait until it finishes and return the final state.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to create the run on. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to run. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –A command to execute. Cannot be combined with input.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
on_disconnect
(
, default:Optional [DisconnectMode ]None
) –The disconnect mode to use. Must be one of 'cancel' or 'continue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–Union [list [dict ],dict [str ,Any ]]Union[list[dict], dict[str, Any]]: The output of the run.
Example Usage:
final_state_of_run = await client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list
async
¶
list(
thread_id: str ,
*,
limit: int = 10,
offset: int = 0,
status: Optional [RunStatus ] = None,
headers: Optional [dict [str , str ]] = None
) -> List [Run ]
List runs.
Parameters:
-
thread_id
(
) –str The thread ID to list runs for.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
status
(
, default:Optional [RunStatus ]None
) –The status of the run to filter by.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–List [Run ]List[Run]: The runs for the thread.
Example Usage:
await client.runs.list(
thread_id="thread_id",
limit=5,
offset=5,
)
get
async
¶
Get a run.
Parameters:
-
thread_id
(
) –str The thread ID to get.
-
run_id
(
) –str The run ID to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run Run object.
Example Usage:
run = await client.runs.get(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete",
)
cancel
async
¶
cancel(
thread_id: str ,
run_id: str ,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Optional [dict [str , str ]] = None
) -> None
Get a run.
Parameters:
-
thread_id
(
) –str The thread ID to cancel.
-
run_id
(
) –str The run ID to cancel.
-
wait
(
, default:bool False
) –Whether to wait until run has completed.
-
action
(
, default:CancelAction 'interrupt'
) –Action to take when cancelling the run. Possible values are
interrupt
orrollback
. Default isinterrupt
. -
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.runs.cancel(
thread_id="thread_id_to_cancel",
run_id="run_id_to_cancel",
wait=True,
action="interrupt"
)
join
async
¶
Block until a run is done. Returns the final state of the thread.
Parameters:
-
thread_id
(
) –str The thread ID to join.
-
run_id
(
) –str The run ID to join.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–dict None
Example Usage:
result =await client.runs.join(
thread_id="thread_id_to_join",
run_id="run_id_to_join"
)
join_stream
¶
join_stream(
thread_id: str ,
run_id: str ,
*,
cancel_on_disconnect: bool = False,
stream_mode: Optional [
Union [StreamMode , Sequence [StreamMode ]]
] = None,
headers: Optional [dict [str , str ]] = None
) -> AsyncIterator [StreamPart ]
Stream output from a run in real-time, until the run is done. Output is not buffered, so any output produced before this call will not be received here.
Parameters:
-
thread_id
(
) –str The thread ID to join.
-
run_id
(
) –str The run ID to join.
-
cancel_on_disconnect
(
, default:bool False
) –Whether to cancel the run when the stream is disconnected.
-
stream_mode
(
, default:Optional [Union [StreamMode ,Sequence [StreamMode ]]]None
) –The stream mode(s) to use. Must be a subset of the stream modes passed when creating the run. Background runs default to having the union of all stream modes.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–AsyncIterator [StreamPart ]None
Example Usage:
await client.runs.join_stream(
thread_id="thread_id_to_join",
run_id="run_id_to_join",
stream_mode=["values", "debug"]
)
delete
async
¶
Delete a run.
Parameters:
-
thread_id
(
) –str The thread ID to delete.
-
run_id
(
) –str The run ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.runs.delete(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete"
)
CronClient
¶
Client for managing recurrent runs (cron jobs) in LangGraph.
A run is a single invocation of an assistant with optional input and config. This client allows scheduling recurring runs to occur automatically.
Example:
client = get_client()
cron_job = await client.crons.create_for_thread(
thread_id="thread_123",
assistant_id="asst_456",
schedule="0 9 * * *",
input={"message": "Daily update"}
)
create_for_thread
async
¶
create_for_thread(
thread_id: str ,
assistant_id: str ,
*,
schedule: str ,
input: Optional [dict ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
interrupt_before: Optional [
Union [All , list [str ]]
] = None,
interrupt_after: Optional [Union [All , list [str ]]] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a cron job for a thread.
Parameters:
-
thread_id
(
) –str the thread ID to run the cron job on.
-
assistant_id
(
) –str The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph.
-
schedule
(
) –str The cron schedule to execute this job on.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the cron job runs.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
interrupt_before
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [str ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The cron run.
Example Usage:
cron_run = await client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create
async
¶
create(
assistant_id: str ,
*,
schedule: str ,
input: Optional [dict ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
interrupt_before: Optional [
Union [All , list [str ]]
] = None,
interrupt_after: Optional [Union [All , list [str ]]] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a cron run.
Parameters:
-
assistant_id
(
) –str The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph.
-
schedule
(
) –str The cron schedule to execute this job on.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the cron job runs.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
interrupt_before
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [str ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The cron run.
Example Usage:
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete
async
¶
Delete a cron.
Parameters:
-
cron_id
(
) –str The cron ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.crons.delete(
cron_id="cron_to_delete"
)
search
async
¶
search(
*,
assistant_id: Optional [str ] = None,
thread_id: Optional [str ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Cron ]
Get a list of cron jobs.
Parameters:
-
assistant_id
(
, default:Optional [str ]None
) –The assistant ID or graph name to search for.
-
thread_id
(
, default:Optional [str ]None
) –the thread ID to search for.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Cron ]list[Cron]: The list of cron jobs returned by the search,
Example Usage:
cron_jobs = await client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
StoreClient
¶
Client for interacting with the graph's shared storage.
The Store provides a key-value storage system for persisting data across graph executions, allowing for stateful operations and data sharing across threads.
Example:
client = get_client()
await client.store.put_item(["users", "user123"], "mem-123451342", {"name": "Alice", "score": 100})
put_item
async
¶
put_item(
namespace: Sequence [str ],
/,
key: str ,
value: dict [str , Any ],
index: Optional [
Union [Literal [False], list [str ]]
] = None,
ttl: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None,
) -> None
Store or update an item.
Parameters:
-
namespace
(
) –Sequence [str ]A list of strings representing the namespace path.
-
key
(
) –str The unique identifier for the item within the namespace.
-
value
(
) –dict [str ,Any ]A dictionary containing the item's data.
-
index
(
, default:Optional [Union [Literal [False],list [str ]]]None
) –Controls search indexing - None (use defaults), False (disable), or list of field paths to index.
-
ttl
(
, default:Optional [int ]None
) –Optional time-to-live in minutes for the item, or None for no expiration.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.store.put_item(
["documents", "user123"],
key="item456",
value={"title": "My Document", "content": "Hello World"}
)
get_item
async
¶
get_item(
namespace: Sequence [str ],
/,
key: str ,
*,
refresh_ttl: Optional [bool ] = None,
headers: Optional [dict [str , str ]] = None,
) -> Item
Retrieve a single item.
Parameters:
-
key
(
) –str The unique identifier for the item.
-
namespace
(
) –Sequence [str ]Optional list of strings representing the namespace path.
-
refresh_ttl
(
, default:Optional [bool ]None
) –Whether to refresh the TTL on this read operation. If None, uses the store's default behavior.
Returns:
-
Item
(
) –Item The retrieved item.
-
headers
(
) –Item Optional custom headers to include with the request.
Example Usage:
item = await client.store.get_item(
["documents", "user123"],
key="item456",
)
print(item)
----------------------------------------------------------------
{
'namespace': ['documents', 'user123'],
'key': 'item456',
'value': {'title': 'My Document', 'content': 'Hello World'},
'created_at': '2024-07-30T12:00:00Z',
'updated_at': '2024-07-30T12:00:00Z'
}
delete_item
async
¶
delete_item(
namespace: Sequence [str ],
/,
key: str ,
headers: Optional [dict [str , str ]] = None,
) -> None
Delete an item.
Parameters:
-
key
(
) –str The unique identifier for the item.
-
namespace
(
) –Sequence [str ]Optional list of strings representing the namespace path.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
await client.store.delete_item(
["documents", "user123"],
key="item456",
)
search_items
async
¶
search_items(
namespace_prefix: Sequence [str ],
/,
filter: Optional [dict [str , Any ]] = None,
limit: int = 10,
offset: int = 0,
query: Optional [str ] = None,
refresh_ttl: Optional [bool ] = None,
headers: Optional [dict [str , str ]] = None,
) -> SearchItemsResponse
Search for items within a namespace prefix.
Parameters:
-
namespace_prefix
(
) –Sequence [str ]List of strings representing the namespace prefix.
-
filter
(
, default:Optional [dict [str ,Any ]]None
) –Optional dictionary of key-value pairs to filter results.
-
limit
(
, default:int 10
) –Maximum number of items to return (default is 10).
-
offset
(
, default:int 0
) –Number of items to skip before returning results (default is 0).
-
query
(
, default:Optional [str ]None
) –Optional query for natural language search.
-
refresh_ttl
(
, default:Optional [bool ]None
) –Whether to refresh the TTL on items returned by this search. If None, uses the store's default behavior.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–SearchItemsResponse List[Item]: A list of items matching the search criteria.
Example Usage:
items = await client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces
async
¶
list_namespaces(
prefix: Optional [List [str ]] = None,
suffix: Optional [List [str ]] = None,
max_depth: Optional [int ] = None,
limit: int = 100,
offset: int = 0,
headers: Optional [dict [str , str ]] = None,
) -> ListNamespaceResponse
List namespaces with optional match conditions.
Parameters:
-
prefix
(
, default:Optional [List [str ]]None
) –Optional list of strings representing the prefix to filter namespaces.
-
suffix
(
, default:Optional [List [str ]]None
) –Optional list of strings representing the suffix to filter namespaces.
-
max_depth
(
, default:Optional [int ]None
) –Optional integer specifying the maximum depth of namespaces to return.
-
limit
(
, default:int 100
) –Maximum number of namespaces to return (default is 100).
-
offset
(
, default:int 0
) –Number of namespaces to skip before returning results (default is 0).
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–ListNamespaceResponse List[List[str]]: A list of namespaces matching the criteria.
Example Usage:
namespaces = await client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
SyncLangGraphClient
¶
Synchronous client for interacting with the LangGraph API.
This class provides synchronous access to LangGraph API endpoints for managing assistants, threads, runs, cron jobs, and data storage.
Example:
client = get_sync_client()
assistant = client.assistants.get("asst_123")
SyncHttpClient
¶
get
¶
get(
path: str ,
*,
params: Optional [QueryParamTypes ] = None,
headers: Optional [dict [str , str ]] = None
) -> Any
Send a GET request.
post
¶
Send a POST request.
put
¶
Send a PUT request.
patch
¶
Send a PATCH request.
delete
¶
delete(
path: str ,
*,
json: Optional [Any ] = None,
headers: Optional [dict [str , str ]] = None
) -> None
Send a DELETE request.
stream
¶
stream(
path: str ,
method: str ,
*,
json: Optional [dict ] = None,
params: Optional [QueryParamTypes ] = None,
headers: Optional [dict [str , str ]] = None
) -> Iterator [StreamPart ]
Stream the results of a request using SSE.
SyncAssistantsClient
¶
Client for managing assistants in LangGraph synchronously.
This class provides methods to interact with assistants, which are versioned configurations of your graph.
Example:
client = get_client()
assistant = client.assistants.get("assistant_id_123")
get
¶
Get an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Assistant
(
) –Assistant Assistant Object.
Example Usage:
assistant = client.assistants.get(
assistant_id="my_assistant_id"
)
print(assistant)
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'}
}
get_graph
¶
get_graph(
assistant_id: str ,
*,
xray: Union [int , bool ] = False,
headers: Optional [dict [str , str ]] = None
) -> dict [str , list [dict [str , Any ]]]
Get the graph of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the graph of.
-
xray
(
, default:Union [int ,bool ]False
) –Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Graph
(
) –dict [str ,list [dict [str ,Any ]]]The graph information for the assistant in JSON format.
Example Usage:
graph_info = client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas
¶
Get the schemas of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the schema of.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
GraphSchema
(
) –GraphSchema The graph schema for the assistant.
Example Usage:
schema = client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs
¶
get_subgraphs(
assistant_id: str ,
namespace: Optional [str ] = None,
recurse: bool = False,
*,
headers: Optional [dict [str , str ]] = None
) -> Subgraphs
Get the schemas of an assistant by ID.
Parameters:
-
assistant_id
(
) –str The ID of the assistant to get the schema of.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Subgraphs
(
) –Subgraphs The graph schema for the assistant.
create
¶
create(
graph_id: Optional [str ],
config: Optional [Config ] = None,
*,
metadata: Json = None,
assistant_id: Optional [str ] = None,
if_exists: Optional [OnConflictBehavior ] = None,
name: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None,
description: Optional [str ] = None
) -> Assistant
Create a new assistant.
Useful when graph is configurable and you want to create different assistants based on different configurations.
Parameters:
-
graph_id
(
) –Optional [str ]The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration.
-
config
(
, default:Optional [Config ]None
) –Configuration to use for the graph.
-
metadata
(
, default:Json None
) –Metadata to add to assistant.
-
assistant_id
(
, default:Optional [str ]None
) –Assistant ID to use, will default to a random UUID if not provided.
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing assistant).
-
name
(
, default:Optional [str ]None
) –The name of the assistant. Defaults to 'Untitled' under the hood.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
-
description
(
, default:Optional [str ]None
) –Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45
Returns:
-
Assistant
(
) –Assistant The created assistant.
Example Usage:
assistant = client.assistants.create(
graph_id="agent",
config={"configurable": {"model_name": "openai"}},
metadata={"number":1},
assistant_id="my-assistant-id",
if_exists="do_nothing",
name="my_name"
)
update
¶
update(
assistant_id: str ,
*,
graph_id: Optional [str ] = None,
config: Optional [Config ] = None,
metadata: Json = None,
name: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None,
description: Optional [str ] = None
) -> Assistant
Update an assistant.
Use this to point to a different graph, update the configuration, or change the metadata of an assistant.
Parameters:
-
assistant_id
(
) –str Assistant to update.
-
graph_id
(
, default:Optional [str ]None
) –The ID of the graph the assistant should use. The graph ID is normally set in your langgraph.json configuration. If None, assistant will keep pointing to same graph.
-
config
(
, default:Optional [Config ]None
) –Configuration to use for the graph.
-
metadata
(
, default:Json None
) –Metadata to merge with existing assistant metadata.
-
name
(
, default:Optional [str ]None
) –The new name for the assistant.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
-
description
(
, default:Optional [str ]None
) –Optional description of the assistant. The description field is available for langgraph-api server version>=0.0.45
Returns:
-
Assistant
(
) –Assistant The updated assistant.
Example Usage:
assistant = client.assistants.update(
assistant_id='e280dad7-8618-443f-87f1-8e41841c180f',
graph_id="other-graph",
config={"configurable": {"model_name": "anthropic"}},
metadata={"number":2}
)
delete
¶
Delete an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.assistants.delete(
assistant_id="my_assistant_id"
)
search
¶
search(
*,
metadata: Json = None,
graph_id: Optional [str ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Assistant ]
Search for assistants.
Parameters:
-
metadata
(
, default:Json None
) –Metadata to filter by. Exact match filter for each KV pair.
-
graph_id
(
, default:Optional [str ]None
) –The ID of the graph to filter by. The graph ID is normally set in your langgraph.json configuration.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Assistant ]list[Assistant]: A list of assistants.
Example Usage:
assistants = client.assistants.search(
metadata = {"name":"my_name"},
graph_id="my_graph_id",
limit=5,
offset=5
)
get_versions
¶
get_versions(
assistant_id: str ,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Optional [dict [str , str ]] = None
) -> list [AssistantVersion ]
List all versions of an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to get versions for.
-
metadata
(
, default:Json None
) –Metadata to filter versions by. Exact match filter for each KV pair.
-
limit
(
, default:int 10
) –The maximum number of versions to return.
-
offset
(
, default:int 0
) –The number of versions to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [AssistantVersion ]list[Assistant]: A list of assistants.
Example Usage:
assistant_versions = await client.assistants.get_versions(
assistant_id="my_assistant_id"
)
set_latest
¶
set_latest(
assistant_id: str ,
version: int ,
*,
headers: Optional [dict [str , str ]] = None
) -> Assistant
Change the version of an assistant.
Parameters:
-
assistant_id
(
) –str The assistant ID to delete.
-
version
(
) –int The version to change to.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Assistant
(
) –Assistant Assistant Object.
Example Usage:
new_version_assistant = await client.assistants.set_latest(
assistant_id="my_assistant_id",
version=3
)
SyncThreadsClient
¶
Synchronous client for managing threads in LangGraph.
This class provides methods to create, retrieve, and manage threads, which represent conversations or stateful interactions.
Example:
client = get_sync_client()
thread = client.threads.create(metadata={"user_id": "123"})
get
¶
Get a thread by ID.
Parameters:
-
thread_id
(
) –str The ID of the thread to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread Thread object.
Example Usage:
thread = client.threads.get(
thread_id="my_thread_id"
)
print(thread)
-----------------------------------------------------
{
'thread_id': 'my_thread_id',
'created_at': '2024-07-18T18:35:15.540834+00:00',
'updated_at': '2024-07-18T18:35:15.540834+00:00',
'metadata': {'graph_id': 'agent'}
}
create
¶
create(
*,
metadata: Json = None,
thread_id: Optional [str ] = None,
if_exists: Optional [OnConflictBehavior ] = None,
supersteps: Optional [
Sequence [dict [str , Sequence [dict [str , Any ]]]]
] = None,
graph_id: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Thread
Create a new thread.
Parameters:
-
metadata
(
, default:Json None
) –Metadata to add to thread.
-
thread_id
(
, default:Optional [str ]None
) –ID of thread. If None, ID will be a randomly generated UUID.
-
if_exists
(
, default:Optional [OnConflictBehavior ]None
) –How to handle duplicate creation. Defaults to 'raise' under the hood. Must be either 'raise' (raise error if duplicate), or 'do_nothing' (return existing thread).
-
supersteps
(
, default:Optional [Sequence [dict [str ,Sequence [dict [str ,Any ]]]]]None
) –Apply a list of supersteps when creating a thread, each containing a sequence of updates. Each update has
values
orcommand
andas_node
. Used for copying a thread between deployments. -
graph_id
(
, default:Optional [str ]None
) –Optional graph ID to associate with the thread.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread The created thread.
Example Usage:
thread = client.threads.create(
metadata={"number":1},
thread_id="my-thread-id",
if_exists="raise"
)
update
¶
update(
thread_id: str ,
*,
metadata: dict [str , Any ],
headers: Optional [dict [str , str ]] = None
) -> Thread
Update a thread.
Parameters:
-
thread_id
(
) –str ID of thread to update.
-
metadata
(
) –dict [str ,Any ]Metadata to merge with existing thread metadata.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Thread
(
) –Thread The created thread.
Example Usage:
thread = client.threads.update(
thread_id="my-thread-id",
metadata={"number":1},
)
delete
¶
Delete a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.threads.delete(
thread_id="my_thread_id"
)
search
¶
search(
*,
metadata: Json = None,
values: Json = None,
status: Optional [ThreadStatus ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Thread ]
Search for threads.
Parameters:
-
metadata
(
, default:Json None
) –Thread metadata to filter on.
-
values
(
, default:Json None
) –State values to filter on.
-
status
(
, default:Optional [ThreadStatus ]None
) –Thread status to filter on. Must be one of 'idle', 'busy', 'interrupted' or 'error'.
-
limit
(
, default:int 10
) –Limit on number of threads to return.
-
offset
(
, default:int 0
) –Offset in threads table to start search from.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Thread ]list[Thread]: List of the threads matching the search parameters.
Example Usage:
threads = client.threads.search(
metadata={"number":1},
status="interrupted",
limit=15,
offset=5
)
copy
¶
Copy a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to copy.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.threads.copy(
thread_id="my_thread_id"
)
get_state
¶
get_state(
thread_id: str ,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
*,
subgraphs: bool = False,
headers: Optional [dict [str , str ]] = None
) -> ThreadState
Get the state of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to get the state of.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to get the state of.
-
subgraphs
(
, default:bool False
) –Include subgraphs states.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
ThreadState
(
) –ThreadState the thread of the state.
Example Usage:
thread_state = client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state
¶
update_state(
thread_id: str ,
values: Optional [Union [dict , Sequence [dict ]]],
*,
as_node: Optional [str ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> ThreadUpdateStateResponse
Update the state of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to update.
-
values
(
) –Optional [Union [dict ,Sequence [dict ]]]The values to update the state with.
-
as_node
(
, default:Optional [str ]None
) –Update the state as if this node had just executed.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to update the state of.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
ThreadUpdateStateResponse
(
) –ThreadUpdateStateResponse Response after updating a thread's state.
Example Usage:
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history
¶
get_history(
thread_id: str ,
*,
limit: int = 10,
before: Optional [str | Checkpoint ] = None,
metadata: Optional [dict ] = None,
checkpoint: Optional [Checkpoint ] = None,
headers: Optional [dict [str , str ]] = None
) -> list [ThreadState ]
Get the state history of a thread.
Parameters:
-
thread_id
(
) –str The ID of the thread to get the state history for.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –Return states for this subgraph. If empty defaults to root.
-
limit
(
, default:int 10
) –The maximum number of states to return.
-
before
(
, default:Optional [str |Checkpoint ]None
) –Return states before this checkpoint.
-
metadata
(
, default:Optional [dict ]None
) –Filter states by metadata key-value pairs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [ThreadState ]list[ThreadState]: the state history of the thread.
Example Usage:
thread_state = client.threads.get_history(
thread_id="my_thread_id",
limit=5,
before="my_timestamp",
metadata={"name":"my_name"}
)
SyncRunsClient
¶
Synchronous client for managing runs in LangGraph.
This class provides methods to create, retrieve, and manage runs, which represent individual executions of graphs.
Example:
client = get_sync_client()
run = client.runs.create(thread_id="thread_123", assistant_id="asst_456")
stream
¶
stream(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
stream_mode: Union [
StreamMode , Sequence [StreamMode ]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
feedback_keys: Optional [Sequence [str ]] = None,
on_disconnect: Optional [DisconnectMode ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
if_not_exists: Optional [IfNotExists ] = None,
after_seconds: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None
) -> Iterator [StreamPart ]
Create a run and stream the results.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to assign to the thread. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –The command to execute.
-
stream_mode
(
, default:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –The stream mode(s) to use.
-
stream_subgraphs
(
, default:bool False
) –Whether to stream output from subgraphs.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
feedback_keys
(
, default:Optional [Sequence [str ]]None
) –Feedback keys to assign to run.
-
on_disconnect
(
, default:Optional [DisconnectMode ]None
) –The disconnect mode to use. Must be one of 'cancel' or 'continue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–Iterator [StreamPart ]Iterator[StreamPart]: Iterator of stream results.
Example Usage:
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create
¶
create(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
stream_mode: Union [
StreamMode , Sequence [StreamMode ]
] = "values",
stream_subgraphs: bool = False,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
if_not_exists: Optional [IfNotExists ] = None,
after_seconds: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a background run.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to assign to the thread. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –The command to execute.
-
stream_mode
(
, default:Union [StreamMode ,Sequence [StreamMode ]]'values'
) –The stream mode(s) to use.
-
stream_subgraphs
(
, default:bool False
) –Whether to stream output from subgraphs.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The created background run.
Example Usage:
background_run = client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'model_name': "openai",
'assistant_id': 'my_assistant_id'
}
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch
¶
Create a batch of stateless background runs.
wait
¶
wait(
thread_id: Optional [str ],
assistant_id: str ,
*,
input: Optional [dict ] = None,
command: Optional [Command ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
checkpoint: Optional [Checkpoint ] = None,
checkpoint_id: Optional [str ] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
webhook: Optional [str ] = None,
on_disconnect: Optional [DisconnectMode ] = None,
on_completion: Optional [OnCompletionBehavior ] = None,
multitask_strategy: Optional [MultitaskStrategy ] = None,
if_not_exists: Optional [IfNotExists ] = None,
after_seconds: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None
) -> Union [list [dict ], dict [str , Any ]]
Create a run, wait until it finishes and return the final state.
Parameters:
-
thread_id
(
) –Optional [str ]the thread ID to create the run on. If None will create a stateless run.
-
assistant_id
(
) –str The assistant ID or graph name to run. If using graph name, will default to first assistant created from that graph.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
command
(
, default:Optional [Command ]None
) –The command to execute.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the run.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
checkpoint
(
, default:Optional [Checkpoint ]None
) –The checkpoint to resume from.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
on_disconnect
(
, default:Optional [DisconnectMode ]None
) –The disconnect mode to use. Must be one of 'cancel' or 'continue'.
-
on_completion
(
, default:Optional [OnCompletionBehavior ]None
) –Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
-
multitask_strategy
(
, default:Optional [MultitaskStrategy ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
if_not_exists
(
, default:Optional [IfNotExists ]None
) –How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
-
after_seconds
(
, default:Optional [int ]None
) –The number of seconds to wait before starting the run. Use to schedule future runs.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–Union [list [dict ],dict [str ,Any ]]Union[list[dict], dict[str, Any]]: The output of the run.
Example Usage:
final_state_of_run = client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list
¶
list(
thread_id: str ,
*,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> List [Run ]
List runs.
Parameters:
-
thread_id
(
) –str The thread ID to list runs for.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–List [Run ]List[Run]: The runs for the thread.
Example Usage:
client.runs.list(
thread_id="thread_id",
limit=5,
offset=5,
)
get
¶
Get a run.
Parameters:
-
thread_id
(
) –str The thread ID to get.
-
run_id
(
) –str The run ID to get.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run Run object.
Example Usage:
run = client.runs.get(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete",
)
cancel
¶
cancel(
thread_id: str ,
run_id: str ,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Optional [dict [str , str ]] = None
) -> None
Get a run.
Parameters:
-
thread_id
(
) –str The thread ID to cancel.
-
run_id
(
) –str The run ID to cancel.
-
wait
(
, default:bool False
) –Whether to wait until run has completed.
-
action
(
, default:CancelAction 'interrupt'
) –Action to take when cancelling the run. Possible values are
interrupt
orrollback
. Default isinterrupt
. -
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.runs.cancel(
thread_id="thread_id_to_cancel",
run_id="run_id_to_cancel",
wait=True,
action="interrupt"
)
join
¶
Block until a run is done. Returns the final state of the thread.
Parameters:
-
thread_id
(
) –str The thread ID to join.
-
run_id
(
) –str The run ID to join.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–dict None
Example Usage:
client.runs.join(
thread_id="thread_id_to_join",
run_id="run_id_to_join"
)
join_stream
¶
join_stream(
thread_id: str ,
run_id: str ,
*,
stream_mode: Optional [
Union [StreamMode , Sequence [StreamMode ]]
] = None,
cancel_on_disconnect: bool = False,
headers: Optional [dict [str , str ]] = None
) -> Iterator [StreamPart ]
Stream output from a run in real-time, until the run is done. Output is not buffered, so any output produced before this call will not be received here.
Parameters:
-
thread_id
(
) –str The thread ID to join.
-
run_id
(
) –str The run ID to join.
-
stream_mode
(
, default:Optional [Union [StreamMode ,Sequence [StreamMode ]]]None
) –The stream mode(s) to use. Must be a subset of the stream modes passed when creating the run. Background runs default to having the union of all stream modes.
-
cancel_on_disconnect
(
, default:bool False
) –Whether to cancel the run when the stream is disconnected.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–Iterator [StreamPart ]None
Example Usage:
client.runs.join_stream(
thread_id="thread_id_to_join",
run_id="run_id_to_join",
stream_mode=["values", "debug"]
)
delete
¶
Delete a run.
Parameters:
-
thread_id
(
) –str The thread ID to delete.
-
run_id
(
) –str The run ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.runs.delete(
thread_id="thread_id_to_delete",
run_id="run_id_to_delete"
)
SyncCronClient
¶
Synchronous client for managing cron jobs in LangGraph.
This class provides methods to create and manage scheduled tasks (cron jobs) for automated graph executions.
Example:
client = get_sync_client()
cron_job = client.crons.create_for_thread(thread_id="thread_123", assistant_id="asst_456", schedule="0 * * * *")
create_for_thread
¶
create_for_thread(
thread_id: str ,
assistant_id: str ,
*,
schedule: str ,
input: Optional [dict ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
interrupt_before: Optional [
Union [All , list [str ]]
] = None,
interrupt_after: Optional [Union [All , list [str ]]] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a cron job for a thread.
Parameters:
-
thread_id
(
) –str the thread ID to run the cron job on.
-
assistant_id
(
) –str The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph.
-
schedule
(
) –str The cron schedule to execute this job on.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the cron job runs.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
interrupt_before
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [str ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The cron run.
Example Usage:
cron_run = client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create
¶
create(
assistant_id: str ,
*,
schedule: str ,
input: Optional [dict ] = None,
metadata: Optional [dict ] = None,
config: Optional [Config ] = None,
interrupt_before: Optional [
Union [All , list [str ]]
] = None,
interrupt_after: Optional [Union [All , list [str ]]] = None,
webhook: Optional [str ] = None,
multitask_strategy: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> Run
Create a cron run.
Parameters:
-
assistant_id
(
) –str The assistant ID or graph name to use for the cron job. If using graph name, will default to first assistant created from that graph.
-
schedule
(
) –str The cron schedule to execute this job on.
-
input
(
, default:Optional [dict ]None
) –The input to the graph.
-
metadata
(
, default:Optional [dict ]None
) –Metadata to assign to the cron job runs.
-
config
(
, default:Optional [Config ]None
) –The configuration for the assistant.
-
interrupt_before
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to interrupt immediately before they get executed.
-
interrupt_after
(
, default:Optional [Union [All ,list [str ]]]None
) –Nodes to Nodes to interrupt immediately after they get executed.
-
webhook
(
, default:Optional [str ]None
) –Webhook to call after LangGraph API call is done.
-
multitask_strategy
(
, default:Optional [str ]None
) –Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Run
(
) –Run The cron run.
Example Usage:
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete
¶
Delete a cron.
Parameters:
-
cron_id
(
) –str The cron ID to delete.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.crons.delete(
cron_id="cron_to_delete"
)
search
¶
search(
*,
assistant_id: Optional [str ] = None,
thread_id: Optional [str ] = None,
limit: int = 10,
offset: int = 0,
headers: Optional [dict [str , str ]] = None
) -> list [Cron ]
Get a list of cron jobs.
Parameters:
-
assistant_id
(
, default:Optional [str ]None
) –The assistant ID or graph name to search for.
-
thread_id
(
, default:Optional [str ]None
) –the thread ID to search for.
-
limit
(
, default:int 10
) –The maximum number of results to return.
-
offset
(
, default:int 0
) –The number of results to skip.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–list [Cron ]list[Cron]: The list of cron jobs returned by the search,
Example Usage:
cron_jobs = client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
SyncStoreClient
¶
A client for synchronous operations on a key-value store.
Provides methods to interact with a remote key-value store, allowing storage and retrieval of items within namespaced hierarchies.
Example:
client = get_sync_client()
client.store.put_item(["users", "profiles"], "user123", {"name": "Alice", "age": 30})
put_item
¶
put_item(
namespace: Sequence [str ],
/,
key: str ,
value: dict [str , Any ],
index: Optional [
Union [Literal [False], list [str ]]
] = None,
ttl: Optional [int ] = None,
headers: Optional [dict [str , str ]] = None,
) -> None
Store or update an item.
Parameters:
-
namespace
(
) –Sequence [str ]A list of strings representing the namespace path.
-
key
(
) –str The unique identifier for the item within the namespace.
-
value
(
) –dict [str ,Any ]A dictionary containing the item's data.
-
index
(
, default:Optional [Union [Literal [False],list [str ]]]None
) –Controls search indexing - None (use defaults), False (disable), or list of field paths to index.
-
ttl
(
, default:Optional [int ]None
) –Optional time-to-live in minutes for the item, or None for no expiration.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.store.put_item(
["documents", "user123"],
key="item456",
value={"title": "My Document", "content": "Hello World"}
)
get_item
¶
get_item(
namespace: Sequence [str ],
/,
key: str ,
*,
refresh_ttl: Optional [bool ] = None,
headers: Optional [dict [str , str ]] = None,
) -> Item
Retrieve a single item.
Parameters:
-
key
(
) –str The unique identifier for the item.
-
namespace
(
) –Sequence [str ]Optional list of strings representing the namespace path.
-
refresh_ttl
(
, default:Optional [bool ]None
) –Whether to refresh the TTL on this read operation. If None, uses the store's default behavior.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
Item
(
) –Item The retrieved item.
Example Usage:
item = client.store.get_item(
["documents", "user123"],
key="item456",
)
print(item)
----------------------------------------------------------------
{
'namespace': ['documents', 'user123'],
'key': 'item456',
'value': {'title': 'My Document', 'content': 'Hello World'},
'created_at': '2024-07-30T12:00:00Z',
'updated_at': '2024-07-30T12:00:00Z'
}
delete_item
¶
delete_item(
namespace: Sequence [str ],
/,
key: str ,
headers: Optional [dict [str , str ]] = None,
) -> None
Delete an item.
Parameters:
-
key
(
) –str The unique identifier for the item.
-
namespace
(
) –Sequence [str ]Optional list of strings representing the namespace path.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
None
–None
Example Usage:
client.store.delete_item(
["documents", "user123"],
key="item456",
)
search_items
¶
search_items(
namespace_prefix: Sequence [str ],
/,
filter: Optional [dict [str , Any ]] = None,
limit: int = 10,
offset: int = 0,
query: Optional [str ] = None,
refresh_ttl: Optional [bool ] = None,
headers: Optional [dict [str , str ]] = None,
) -> SearchItemsResponse
Search for items within a namespace prefix.
Parameters:
-
namespace_prefix
(
) –Sequence [str ]List of strings representing the namespace prefix.
-
filter
(
, default:Optional [dict [str ,Any ]]None
) –Optional dictionary of key-value pairs to filter results.
-
limit
(
, default:int 10
) –Maximum number of items to return (default is 10).
-
offset
(
, default:int 0
) –Number of items to skip before returning results (default is 0).
-
query
(
, default:Optional [str ]None
) –Optional query for natural language search.
-
refresh_ttl
(
, default:Optional [bool ]None
) –Whether to refresh the TTL on items returned by this search. If None, uses the store's default behavior.
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–SearchItemsResponse List[Item]: A list of items matching the search criteria.
Example Usage:
items = client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces
¶
list_namespaces(
prefix: Optional [List [str ]] = None,
suffix: Optional [List [str ]] = None,
max_depth: Optional [int ] = None,
limit: int = 100,
offset: int = 0,
headers: Optional [dict [str , str ]] = None,
) -> ListNamespaceResponse
List namespaces with optional match conditions.
Parameters:
-
prefix
(
, default:Optional [List [str ]]None
) –Optional list of strings representing the prefix to filter namespaces.
-
suffix
(
, default:Optional [List [str ]]None
) –Optional list of strings representing the suffix to filter namespaces.
-
max_depth
(
, default:Optional [int ]None
) –Optional integer specifying the maximum depth of namespaces to return.
-
limit
(
, default:int 100
) –Maximum number of namespaces to return (default is 100).
-
offset
(
, default:int 0
) –Number of namespaces to skip before returning results (default is 0).
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers to include with the request.
Returns:
-
–ListNamespaceResponse List[List[str]]: A list of namespaces matching the criteria.
Example Usage:
namespaces = client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
get_headers
¶
Combine api_key and custom user-provided headers.
get_client
¶
get_client(
*,
url: Optional [str ] = None,
api_key: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> LangGraphClient
Get a LangGraphClient instance.
Parameters:
-
url
(
, default:Optional [str ]None
) –The URL of the LangGraph API.
-
api_key
(
, default:Optional [str ]None
) –The API key. If not provided, it will be read from the environment. Precedence: 1. explicit argument 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers
Returns:
-
LangGraphClient
(
) –LangGraphClient The top-level client for accessing AssistantsClient,
-
–LangGraphClient ThreadsClient, RunsClient, and CronClient.
Example:
from langgraph_sdk import get_client
# get top-level LangGraphClient
client = get_client(url="http://localhost:8123")
# example usage: client.<model>.<method_name>()
assistants = await client.assistants.get(assistant_id="some_uuid")
get_sync_client
¶
get_sync_client(
*,
url: Optional [str ] = None,
api_key: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None
) -> SyncLangGraphClient
Get a synchronous LangGraphClient instance.
Parameters:
-
url
(
, default:Optional [str ]None
) –The URL of the LangGraph API.
-
api_key
(
, default:Optional [str ]None
) –The API key. If not provided, it will be read from the environment. Precedence: 1. explicit argument 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY
-
headers
(
, default:Optional [dict [str ,str ]]None
) –Optional custom headers
Returns: SyncLangGraphClient: The top-level synchronous client for accessing AssistantsClient, ThreadsClient, RunsClient, and CronClient.
Example:
from langgraph_sdk import get_sync_client
# get top-level synchronous LangGraphClient
client = get_sync_client(url="http://localhost:8123")
# example usage: client.<model>.<method_name>()
assistant = client.assistants.get(assistant_id="some_uuid")
Data models for interacting with the LangGraph API.
Json
module-attribute
¶
Represents a JSON-like structure, which can be None or a dictionary with string keys and any values.
RunStatus
module-attribute
¶
Represents the status of a run: - "pending": The run is waiting to start. - "error": The run encountered an error and stopped. - "success": The run completed successfully. - "timeout": The run exceeded its time limit. - "interrupted": The run was manually stopped or interrupted.
ThreadStatus
module-attribute
¶
Represents the status of a thread: - "idle": The thread is not currently processing any task. - "busy": The thread is actively processing a task. - "interrupted": The thread's execution was interrupted. - "error": An exception occurred during task processing.
StreamMode
module-attribute
¶
StreamMode = Literal [
"values",
"messages",
"updates",
"events",
"debug",
"custom",
"messages-tuple",
]
Defines the mode of streaming: - "values": Stream only the values. - "messages": Stream complete messages. - "updates": Stream updates to the state. - "events": Stream events occurring during execution. - "debug": Stream detailed debug information. - "custom": Stream custom events.
DisconnectMode
module-attribute
¶
Specifies behavior on disconnection: - "cancel": Cancel the operation on disconnection. - "continue": Continue the operation even if disconnected.
MultitaskStrategy
module-attribute
¶
Defines how to handle multiple tasks: - "reject": Reject new tasks when busy. - "interrupt": Interrupt current task for new ones. - "rollback": Roll back current task and start new one. - "enqueue": Queue new tasks for later execution.
OnConflictBehavior
module-attribute
¶
Specifies behavior on conflict: - "raise": Raise an exception when a conflict occurs. - "do_nothing": Ignore conflicts and proceed.
OnCompletionBehavior
module-attribute
¶
Defines action after completion: - "delete": Delete resources after completion. - "keep": Retain resources after completion.
IfNotExists
module-attribute
¶
Specifies behavior if the thread doesn't exist: - "create": Create a new thread if it doesn't exist. - "reject": Reject the operation if the thread doesn't exist.
CancelAction
module-attribute
¶
Action to take when cancelling the run. - "interrupt": Simply cancel the run. - "rollback": Cancel the run. Then delete the run and associated checkpoints.
Config
¶
Bases:
Configuration options for a call.
tags
instance-attribute
¶
Tags for this call and any sub-calls (eg. a Chain calling an LLM). You can use these to filter calls.
recursion_limit
instance-attribute
¶
Maximum number of times a call can recurse. If not provided, defaults to 25.
configurable
instance-attribute
¶
Runtime values for attributes previously made configurable on this Runnable, or sub-Runnables, through .configurable_fields() or .configurable_alternatives(). Check .output_schema() for a description of the attributes that have been made configurable.
Checkpoint
¶
Bases:
Represents a checkpoint in the execution process.
thread_id
instance-attribute
¶
Unique identifier for the thread associated with this checkpoint.
checkpoint_ns
instance-attribute
¶
Namespace for the checkpoint; used internally to manage subgraph state.
checkpoint_id
instance-attribute
¶
Optional unique identifier for the checkpoint itself.
checkpoint_map
instance-attribute
¶
Optional dictionary containing checkpoint-specific data.
GraphSchema
¶
Bases:
Defines the structure and properties of a graph.
input_schema
instance-attribute
¶
The schema for the graph input. Missing if unable to generate JSON schema from graph.
output_schema
instance-attribute
¶
The schema for the graph output. Missing if unable to generate JSON schema from graph.
state_schema
instance-attribute
¶
The schema for the graph state. Missing if unable to generate JSON schema from graph.
config_schema
instance-attribute
¶
The schema for the graph config. Missing if unable to generate JSON schema from graph.
AssistantBase
¶
Bases:
Base model for an assistant.
AssistantVersion
¶
Bases:
Represents a specific version of an assistant.
Assistant
¶
Bases:
Represents an assistant with additional properties.
Interrupt
¶
Bases:
Represents an interruption in the execution flow.
Thread
¶
ThreadTask
¶
Bases:
Represents a task within a thread.
ThreadState
¶
Bases:
Represents the state of a thread.
next
instance-attribute
¶
The next nodes to execute. If empty, the thread is done until new input is received.
parent_checkpoint
instance-attribute
¶
The ID of the parent checkpoint. If missing, this is the root checkpoint.
tasks
instance-attribute
¶
Tasks to execute in this step. If already attempted, may contain an error.
ThreadUpdateStateResponse
¶
Bases:
Represents the response from updating a thread's state.
Run
¶
Cron
¶
Bases:
Represents a scheduled task.
RunCreate
¶
Bases:
Defines the parameters for initiating a background run.
thread_id
instance-attribute
¶
The identifier of the thread to run. If not provided, the run is stateless.
assistant_id
instance-attribute
¶
The identifier of the assistant to use for this run.
metadata
instance-attribute
¶
Additional metadata to associate with the run.
checkpoint_id
instance-attribute
¶
The identifier of a checkpoint to resume from.
interrupt_before
instance-attribute
¶
List of node names to interrupt execution before.
interrupt_after
instance-attribute
¶
List of node names to interrupt execution after.
webhook
instance-attribute
¶
URL to send webhook notifications about the run's progress.
multitask_strategy
instance-attribute
¶
Strategy for handling concurrent runs on the same thread.
Item
¶
Bases:
Represents a single document or data entry in the graph's Store.
Items are used to store cross-thread memories.
namespace
instance-attribute
¶
The namespace of the item. A namespace is analogous to a document's directory.
key
instance-attribute
¶
The unique identifier of the item within its namespace.
In general, keys needn't be globally unique.
value
instance-attribute
¶
The value stored in the item. This is the document itself.
ListNamespaceResponse
¶
Bases:
Response structure for listing namespaces.
namespaces
instance-attribute
¶
A list of namespace paths, where each path is a list of strings.
SearchItem
¶
Bases:
Item with an optional relevance score from search operations.
Attributes:
-
(score
) –Optional [float ]Relevance/similarity score. Included when searching a compatible store with a natural language query.
namespace
instance-attribute
¶
The namespace of the item. A namespace is analogous to a document's directory.
key
instance-attribute
¶
The unique identifier of the item within its namespace.
In general, keys needn't be globally unique.
value
instance-attribute
¶
The value stored in the item. This is the document itself.
SearchItemsResponse
¶
Bases:
Response structure for searching items.
StreamPart
¶
Send
¶
Bases:
Represents a message to be sent to a specific node in the graph.
This type is used to explicitly send messages to nodes in the graph, typically used within Command objects to control graph execution flow.
Command
¶
Bases:
Represents one or more commands to control graph execution flow and state.
This type defines the control commands that can be returned by nodes to influence graph execution. It lets you navigate to other nodes, update graph state, and resume from interruptions.
goto
instance-attribute
¶
Specifies where execution should continue. Can be:
- A string node name to navigate to
- A Send object to execute a node with specific input
- A sequence of node names or Send objects to execute in order
update
instance-attribute
¶
Updates to apply to the graph's state. Can be:
- A dictionary of state updates to merge
- A sequence of (key, value) tuples for ordered updates
resume
instance-attribute
¶
Value to resume execution with after an interruption. Used in conjunction with interrupt() to implement control flow.
Auth
¶
Add custom authentication and authorization management to your LangGraph application.
The Auth class provides a unified system for handling authentication and authorization in LangGraph applications. It supports custom user authentication protocols and fine-grained authorization rules for different resources and actions.
To use, create a separate python file and add the path to the file to your
LangGraph API configuration file (langgraph.json
). Within that file, create
an instance of the Auth class and register authentication and authorization
handlers as needed.
Example langgraph.json
file:
{
"dependencies": ["."],
"graphs": {
"agent": "./my_agent/agent.py:graph"
},
"env": ".env",
"auth": {
"path": "./auth.py:my_auth"
}
Then the LangGraph server will load your auth file and run it server-side whenever a request comes in.
Basic Usage
from langgraph_sdk import Auth
my_auth = Auth()
async def verify_token(token: str) -> str:
# Verify token and return user_id
# This would typically be a call to your auth server
return "user_id"
@auth.authenticate
async def authenticate(authorization: str) -> str:
# Verify token and return user_id
result = await verify_token(authorization)
if result != "user_id":
raise Auth.exceptions.HTTPException(
status_code=401, detail="Unauthorized"
)
return result
# Global fallback handler
@auth.on
async def authorize_default(params: Auth.on.value):
return False # Reject all requests (default behavior)
@auth.on.threads.create
async def authorize_thread_create(params: Auth.on.threads.create.value):
# Allow the allowed user to create a thread
assert params.get("metadata", {}).get("owner") == "allowed_user"
@auth.on.store
async def authorize_store(ctx: Auth.types.AuthContext, value: Auth.types.on):
assert ctx.user.identity in value["namespace"], "Not authorized"
Request Processing Flow
- Authentication (your
@auth.authenticate
handler) is performed first on every request - For authorization, the most specific matching handler is called:
- If a handler exists for the exact resource and action, it is used (e.g.,
@auth.on.threads.create
) - Otherwise, if a handler exists for the resource with any action, it is used (e.g.,
@auth.on.threads
) - Finally, if no specific handlers match, the global handler is used (e.g.,
@auth.on
) - If no global handler is set, the request is accepted
- If a handler exists for the exact resource and action, it is used (e.g.,
This allows you to set default behavior with a global handler while overriding specific routes as needed.
types
class-attribute
instance-attribute
¶
Reference to auth type definitions.
Provides access to all type definitions used in the auth system, like ThreadsCreate, AssistantsRead, etc.
exceptions
class-attribute
instance-attribute
¶
Reference to auth exception definitions.
Provides access to all exception definitions used in the auth system, like HTTPException, etc.
on
instance-attribute
¶
Entry point for authorization handlers that control access to specific resources.
The on class provides a flexible way to define authorization rules for different resources and actions in your application. It supports three main usage patterns:
- Global handlers that run for all resources and actions
- Resource-specific handlers that run for all actions on a resource
- Resource and action specific handlers for fine-grained control
Each handler must be an async function that accepts two parameters
- ctx (AuthContext): Contains request context and authenticated user info
- value: The data being authorized (type varies by endpoint)
The handler should return one of:
- None or True: Accept the request
- False: Reject with 403 error
- FilterType: Apply filtering rules to the response
Examples
Global handler for all requests:
@auth.on
async def reject_unhandled_requests(ctx: AuthContext, value: Any) -> None:
print(f"Request to {ctx.path} by {ctx.user.identity}")
return False
Resource-specific handler. This would take precedence over the global handler
for all actions on the threads
resource:
@auth.on.threads
async def check_thread_access(ctx: AuthContext, value: Any) -> bool:
# Allow access only to threads created by the user
return value.get("created_by") == ctx.user.identity
Resource and action specific handler:
@auth.on.threads.delete
async def prevent_thread_deletion(ctx: AuthContext, value: Any) -> bool:
# Only admins can delete threads
return "admin" in ctx.user.permissions
Multiple resources or actions:
@auth.on(resources=["threads", "runs"], actions=["create", "update"])
async def rate_limit_writes(ctx: AuthContext, value: Any) -> bool:
# Implement rate limiting for write operations
return await check_rate_limit(ctx.user.identity)
Auth for the store
resource is a bit different since its structure is developer defined.
You typically want to enforce user creds in the namespace. Y
authenticate
¶
Register an authentication handler function.
The authentication handler is responsible for verifying credentials and returning user scopes. It can accept any of the following parameters by name:
- request (Request): The raw ASGI request object
- body (dict): The parsed request body
- path (str): The request path, e.g., "/threads/abcd-1234-abcd-1234/runs/abcd-1234-abcd-1234/stream"
- method (str): The HTTP method, e.g., "GET"
- path_params (dict[str, str]): URL path parameters, e.g., {"thread_id": "abcd-1234-abcd-1234", "run_id": "abcd-1234-abcd-1234"}
- query_params (dict[str, str]): URL query parameters, e.g., {"stream": "true"}
- headers (dict[bytes, bytes]): Request headers
- authorization (str | None): The Authorization header value (e.g., "Bearer <token>")
Parameters:
-
fn
(
) –Callable The authentication handler function to register. Must return a representation of the user. This could be a: - string (the user id) - dict containing {"identity": str, "permissions": list[str]} - or an object with identity and permissions properties Permissions can be optionally used by your handlers downstream.
Returns:
-
–AH The registered handler function.
Raises:
-
–ValueError If an authentication handler is already registered.
Examples
Basic token authentication:
@auth.authenticate
async def authenticate(authorization: str) -> str:
user_id = verify_token(authorization)
return user_id
Accept the full request context:
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> str:
user = await verify_request(method, path, headers)
return user
Return user name and permissions:
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
permissions, user = await verify_request(method, path, headers)
# Permissions could be things like ["runs:read", "runs:write", "threads:read", "threads:write"]
return {
"identity": user["id"],
"permissions": permissions,
"display_name": user["name"],
}
Authentication and authorization types for LangGraph.
This module defines the core types used for authentication, authorization, and request handling in LangGraph. It includes user protocols, authentication contexts, and typed dictionaries for various API operations.
Note
All typing.TypedDict classes use total=False to make all fields typing.Optional by default.
RunStatus
module-attribute
¶
Status of a run execution.
Values
- pending: Run is queued or in progress
- error: Run failed with an error
- success: Run completed successfully
- timeout: Run exceeded time limit
- interrupted: Run was manually interrupted
MultitaskStrategy
module-attribute
¶
Strategy for handling multiple concurrent tasks.
Values
- reject: Reject new tasks while one is in progress
- rollback: Cancel current task and start new one
- interrupt: Interrupt current task and start new one
- enqueue: Queue new tasks to run after current one
OnConflictBehavior
module-attribute
¶
Behavior when encountering conflicts.
Values
- raise: Raise an exception on conflict
- do_nothing: Silently ignore conflicts
IfNotExists
module-attribute
¶
Behavior when an entity doesn't exist.
Values
- create: Create the entity
- reject: Reject the operation
FilterType
module-attribute
¶
FilterType = Union [
Dict [
str ,
Union [str , Dict [Literal ["$eq", "$contains"], str ]],
],
Dict [str , str ],
]
Response type for authorization handlers.
Supports exact matches and operators
- Exact match shorthand: {"field": "value"}
- Exact match: {"field": {"$eq": "value"}}
- Contains: {"field": {"$contains": "value"}}
Examples
Simple exact match filter for the resource owner:
Explicit version of the exact match filter:
Containment:
Combining filters (treated as a logical AND
):
ThreadStatus
module-attribute
¶
Status of a thread.
Values
- idle: Thread is available for work
- busy: Thread is currently processing
- interrupted: Thread was interrupted
- error: Thread encountered an error
MetadataInput
module-attribute
¶
HandlerResult
module-attribute
¶
The result of a handler can be: * None | True: accept the request. * False: reject the request with a 403 error * FilterType: filter to apply
Authenticator
module-attribute
¶
Authenticator = Callable [
...,
Awaitable [
Union [
MinimalUser ,
str ,
BaseUser ,
MinimalUserDict ,
Mapping [str , Any ],
],
],
]
Type for authentication functions.
An authenticator can return either: 1. A string (user_id) 2. A dict containing {"identity": str, "permissions": list[str]} 3. An object with identity and permissions properties
Permissions can be used downstream by your authorization logic to determine access permissions to different resources.
The authenticate decorator will automatically inject any of the following parameters by name if they are included in your function signature:
Parameters:
-
request
(
) –Request The raw ASGI request object
-
body
(
) –dict The parsed request body
-
path
(
) –str The request path
-
method
(
) –str The HTTP method (GET, POST, etc.)
-
path_params
(
) –dict [str ,str ] | NoneURL path parameters
-
query_params
(
) –dict [str ,str ] | NoneURL query parameters
-
headers
(
) –dict [str ,bytes ] | NoneRequest headers
-
authorization
(
) –str | NoneThe Authorization header value (e.g. "Bearer
")
Examples
Basic authentication with token:
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate1(authorization: str) -> Auth.types.MinimalUserDict:
return await get_user(authorization)
Authentication with multiple parameters:
@auth.authenticate
async def authenticate2(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
# Custom auth logic using method, path and headers
user = verify_request(method, path, headers)
return user
Accepting the raw ASGI request:
MY_SECRET = "my-secret-key"
@auth.authenticate
async def get_current_user(request: Request) -> Auth.types.MinimalUserDict:
try:
token = (request.headers.get("authorization") or "").split(" ", 1)[1]
payload = jwt.decode(token, MY_SECRET, algorithms=["HS256"])
except (IndexError, InvalidTokenError):
raise HTTPException(
status_code=401,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.myauth-provider.com/auth/v1/user",
headers={"Authorization": f"Bearer {MY_SECRET}"}
)
if response.status_code != 200:
raise HTTPException(status_code=401, detail="User not found")
user_data = response.json()
return {
"identity": user_data["id"],
"display_name": user_data.get("name"),
"permissions": user_data.get("permissions", []),
"is_authenticated": True,
}
MinimalUser
¶
Bases:
User objects must at least expose the identity property.
identity
property
¶
The unique identifier for the user.
This could be a username, email, or any other unique identifier used to distinguish between different users in the system.
MinimalUserDict
¶
Bases:
The dictionary representation of a user.
BaseUser
¶
Bases:
The base ASGI user protocol
StudioUser
¶
A user object that's populated from authenticated requests from the LangGraph studio.
Note: Studio auth can be disabled in your langgraph.json
config.
You can use isinstance
checks in your authorization handlers (@auth.on
) to control access specifically
for developers accessing the instance from the LangGraph Studio UI.
Examples
BaseAuthContext
¶
Base class for authentication context.
Provides the fundamental authentication information needed for authorization decisions.
AuthContext
¶
Bases:
Complete authentication context with resource and action information.
Extends BaseAuthContext with specific resource and action being accessed, allowing for fine-grained access control decisions.
permissions
instance-attribute
¶
The permissions granted to the authenticated user.
resource
instance-attribute
¶
The resource being accessed.
action
instance-attribute
¶
action: Literal [
"create",
"read",
"update",
"delete",
"search",
"create_run",
"put",
"get",
"list_namespaces",
]
The action being performed on the resource.
Most resources support the following actions: - create: Create a new resource - read: Read information about a resource - update: Update an existing resource - delete: Delete a resource - search: Search for resources
The store supports the following actions: - put: Add or update a document in the store - get: Get a document from the store - list_namespaces: List the namespaces in the store
ThreadsCreate
¶
ThreadsRead
¶
Bases:
Parameters for reading thread state or run information.
This type is used in three contexts: 1. Reading thread, thread version, or thread state information: Only thread_id is provided 2. Reading run information: Both thread_id and run_id are provided
ThreadsUpdate
¶
Bases:
Parameters for updating a thread or run.
Called for updates to a thread, thread version, or run cancellation.
ThreadsDelete
¶
ThreadsSearch
¶
Bases:
Parameters for searching threads.
Called for searches to threads or runs.
RunsCreate
¶
Bases:
Payload for creating a run.
Examples
create_params = {
"assistant_id": UUID("123e4567-e89b-12d3-a456-426614174000"),
"thread_id": UUID("123e4567-e89b-12d3-a456-426614174001"),
"run_id": UUID("123e4567-e89b-12d3-a456-426614174002"),
"status": "pending",
"metadata": {"owner": "user123"},
"prevent_insert_if_inflight": True,
"multitask_strategy": "reject",
"if_not_exists": "create",
"after_seconds": 10,
"kwargs": {"key": "value"},
"action": "interrupt"
}
assistant_id
instance-attribute
¶
typing.Optional assistant ID to use for this run.
thread_id
instance-attribute
¶
typing.Optional thread ID to use for this run.
prevent_insert_if_inflight
instance-attribute
¶
Prevent inserting a new run if one is already in flight.
multitask_strategy
instance-attribute
¶
Multitask strategy for this run.
after_seconds
instance-attribute
¶
Number of seconds to wait before creating the run.
action
instance-attribute
¶
Action to take if updating an existing run.
AssistantsCreate
¶
Bases:
Payload for creating an assistant.
Examples
AssistantsRead
¶
Bases:
Payload for reading an assistant.
Examples
AssistantsUpdate
¶
Bases:
Payload for updating an assistant.
Examples
config
instance-attribute
¶
typing.Optional configuration to update.
AssistantsDelete
¶
Bases:
Payload for deleting an assistant.
AssistantsSearch
¶
Bases:
Payload for searching assistants.
Examples
CronsCreate
¶
Bases:
Payload for creating a cron job.
Examples
cron_id
instance-attribute
¶
typing.Optional unique identifier for the cron job.
thread_id
instance-attribute
¶
typing.Optional thread ID to use for this cron job.
user_id
instance-attribute
¶
typing.Optional user ID to use for this cron job.
end_time
instance-attribute
¶
typing.Optional end time for the cron job.
CronsDelete
¶
Bases:
Payload for deleting a cron job.
CronsRead
¶
Bases:
Payload for reading a cron job.
CronsUpdate
¶
Bases:
Payload for updating a cron job.
Examples
CronsSearch
¶
Bases:
Payload for searching cron jobs.
Examples
assistant_id
instance-attribute
¶
typing.Optional assistant ID to filter by.
StoreGet
¶
StoreSearch
¶
Bases:
Operation to search for items within a specified namespace hierarchy.
StoreListNamespaces
¶
StorePut
¶
Bases:
Operation to store, update, or delete an item in the store.
StoreDelete
¶
on
¶
Namespace for type definitions of different API operations.
This class organizes type definitions for create, read, update, delete, and search operations across different resources (threads, assistants, crons).
Usage
from langgraph_sdk import Auth
auth = Auth()
@auth.on
def handle_all(params: Auth.on.value):
raise Exception("Not authorized")
@auth.on.threads.create
def handle_thread_create(params: Auth.on.threads.create.value):
# Handle thread creation
pass
@auth.on.assistants.search
def handle_assistant_search(params: Auth.on.assistants.search.value):
# Handle assistant search
pass
Exceptions used in the auth system.
HTTPException
¶
Bases:
HTTP exception that you can raise to return a specific HTTP error response.
Since this is defined in the auth module, we default to a 401 status code.
Parameters:
-
status_code
(
, default:int 401
) –HTTP status code for the error. Defaults to 401 "Unauthorized".
-
detail
(
, default:str | NoneNone
) –Detailed error message. If None, uses a default message based on the status code.
-
headers
(
, default:Mapping [str ,str ] | NoneNone
) –Additional HTTP headers to include in the error response.
Example
Default:
Add headers:
raise HTTPException(headers={"X-Custom-Header": "Custom Value"})
# HTTPException(status_code=401, detail='Unauthorized', headers={"WWW-Authenticate": "Bearer"})
Custom error: