Skip to content

Python SDK Reference

The Python SDK provides four underlying clients (AssistantsClient, ThreadsClient, RunsClient, CronClient) that correspond to each of the core API models and one top-level client (LangGraphClient) to access them.

get_client()

The get_client() function returns the top-level LangGraphClient client.

from langgraph_sdk import get_client

# get top-level LangGraphClient
client = get_client(url="http://localhost:8123")

# example usage: client.<model>.<method_name>()
assistants = await client.assistants.get(assistant_id="some_uuid")

Get a LangGraphClient instance.

Parameters:

  • url (str, default: 'http://localhost:8123' ) –

    The URL of the LangGraph API. Defaults to "http://localhost:8123".

  • api_key (str, default: None ) –

    The API key. If not provided, it will be read from the environment. Precedence: 1. explicit argument 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY

Source code in libs/sdk-py/langgraph_sdk/client.py
def get_client(
    *, url: str = "http://localhost:8123", api_key: Optional[str] = None
) -> LangGraphClient:
    """Get a LangGraphClient instance.

    Args:
        url (str, optional): The URL of the LangGraph API. Defaults to "http://localhost:8123".
        api_key (str, optional): The API key. If not provided, it will be read from the environment.
            Precedence:
                1. explicit argument
                2. LANGGRAPH_API_KEY
                3. LANGSMITH_API_KEY
                4. LANGCHAIN_API_KEY
    """
    headers = {
        "User-Agent": f"langgraph-sdk-py/{langgraph_sdk.__version__}",
    }
    api_key = _get_api_key(api_key)
    if api_key:
        headers["x-api-key"] = api_key
    client = httpx.AsyncClient(
        base_url=url,
        transport=httpx.AsyncHTTPTransport(retries=5),
        timeout=httpx.Timeout(connect=5, read=60, write=60, pool=5),
        headers=headers,
    )
    return LangGraphClient(client)

LangGraphClient

LangGraphClient is the top-level client for accessing AssistantsClient, ThreadsClient, RunsClient, and CronClient.

Source code in libs/sdk-py/langgraph_sdk/client.py
class LangGraphClient:
    def __init__(self, client: httpx.AsyncClient) -> None:
        self.http = HttpClient(client)
        self.assistants = AssistantsClient(self.http)
        self.threads = ThreadsClient(self.http)
        self.runs = RunsClient(self.http)
        self.crons = CronClient(self.http)

AssistantsClient

Access the AssistantsClient via the LangGraphClient.assistants attribute.

from langgraph_sdk import get_client

client = get_client(url="http://localhost:8123")
await client.assistants.<method_name>()

Source code in libs/sdk-py/langgraph_sdk/client.py
class AssistantsClient:
    def __init__(self, http: HttpClient) -> None:
        self.http = http

    async def get(self, assistant_id: str) -> Assistant:
        """Get an assistant by ID."""
        return await self.http.get(f"/assistants/{assistant_id}")

    async def get_graph(self, assistant_id: str) -> dict[str, list[dict[str, Any]]]:
        """Get the graph of an assistant by ID."""
        return await self.http.get(f"/assistants/{assistant_id}/graph")

    async def get_schemas(self, assistant_id: str) -> GraphSchema:
        """Get the schemas of an assistant by ID."""
        return await self.http.get(f"/assistants/{assistant_id}/schemas")

    async def create(
        self,
        graph_id: Optional[str],
        config: Optional[Config] = None,
        *,
        metadata: Metadata = None,
        assistant_id: Optional[str] = None,
    ) -> Assistant:
        """Create a new assistant."""
        payload: Dict[str, Any] = {
            "graph_id": graph_id,
        }
        if config:
            payload["config"] = config
        if metadata:
            payload["metadata"] = metadata
        if assistant_id:
            payload["assistant_id"] = assistant_id
        return await self.http.post("/assistants", json=payload)

    async def update(
        self,
        assistant_id: str,
        *,
        graph_id: Optional[str] = None,
        config: Optional[Config] = None,
        metadata: Metadata = None,
    ) -> Assistant:
        """Update an assistant."""
        payload: Dict[str, Any] = {}
        if graph_id:
            payload["graph_id"] = graph_id
        if config:
            payload["config"] = config
        if metadata:
            payload["metadata"] = metadata
        return await self.http.patch(
            f"/assistants/{assistant_id}",
            json=payload,
        )

    async def delete(
        self,
        assistant_id: str,
    ) -> None:
        """Delete an assistant."""
        await self.http.delete(f"/assistants/{assistant_id}")

    async def search(
        self,
        *,
        metadata: Metadata = None,
        graph_id: Optional[str] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Assistant]:
        """Search for assistants.

        Args:
            metadata (dict, optional): Metadata to filter by. Defaults to None.
            graph_id (str, optional): The ID of the graph to filter by. Defaults to None.
                The graph ID is normally set in your langgraph.json configuration.
            limit (int, optional): The maximum number of results to return. Defaults to 10.
            offset (int, optional): The number of results to skip. Defaults to 0.

        Returns:
            list[Assistant]: A list of assistants.
        """
        payload: Dict[str, Any] = {
            "limit": limit,
            "offset": offset,
        }
        if metadata:
            payload["metadata"] = metadata
        if graph_id:
            payload["graph_id"] = graph_id
        return await self.http.post(
            "/assistants/search",
            json=payload,
        )

get(assistant_id) async

Get an assistant by ID.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get(self, assistant_id: str) -> Assistant:
    """Get an assistant by ID."""
    return await self.http.get(f"/assistants/{assistant_id}")

get_graph(assistant_id) async

Get the graph of an assistant by ID.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get_graph(self, assistant_id: str) -> dict[str, list[dict[str, Any]]]:
    """Get the graph of an assistant by ID."""
    return await self.http.get(f"/assistants/{assistant_id}/graph")

get_schemas(assistant_id) async

Get the schemas of an assistant by ID.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get_schemas(self, assistant_id: str) -> GraphSchema:
    """Get the schemas of an assistant by ID."""
    return await self.http.get(f"/assistants/{assistant_id}/schemas")

create(graph_id, config=None, *, metadata=None, assistant_id=None) async

Create a new assistant.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def create(
    self,
    graph_id: Optional[str],
    config: Optional[Config] = None,
    *,
    metadata: Metadata = None,
    assistant_id: Optional[str] = None,
) -> Assistant:
    """Create a new assistant."""
    payload: Dict[str, Any] = {
        "graph_id": graph_id,
    }
    if config:
        payload["config"] = config
    if metadata:
        payload["metadata"] = metadata
    if assistant_id:
        payload["assistant_id"] = assistant_id
    return await self.http.post("/assistants", json=payload)

update(assistant_id, *, graph_id=None, config=None, metadata=None) async

Update an assistant.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def update(
    self,
    assistant_id: str,
    *,
    graph_id: Optional[str] = None,
    config: Optional[Config] = None,
    metadata: Metadata = None,
) -> Assistant:
    """Update an assistant."""
    payload: Dict[str, Any] = {}
    if graph_id:
        payload["graph_id"] = graph_id
    if config:
        payload["config"] = config
    if metadata:
        payload["metadata"] = metadata
    return await self.http.patch(
        f"/assistants/{assistant_id}",
        json=payload,
    )

delete(assistant_id) async

Delete an assistant.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def delete(
    self,
    assistant_id: str,
) -> None:
    """Delete an assistant."""
    await self.http.delete(f"/assistants/{assistant_id}")

search(*, metadata=None, graph_id=None, limit=10, offset=0) async

Search for assistants.

Parameters:

  • metadata (dict, default: None ) –

    Metadata to filter by. Defaults to None.

  • graph_id (str, default: None ) –

    The ID of the graph to filter by. Defaults to None. The graph ID is normally set in your langgraph.json configuration.

  • limit (int, default: 10 ) –

    The maximum number of results to return. Defaults to 10.

  • offset (int, default: 0 ) –

    The number of results to skip. Defaults to 0.

Returns:

  • list[Assistant]

    list[Assistant]: A list of assistants.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def search(
    self,
    *,
    metadata: Metadata = None,
    graph_id: Optional[str] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Assistant]:
    """Search for assistants.

    Args:
        metadata (dict, optional): Metadata to filter by. Defaults to None.
        graph_id (str, optional): The ID of the graph to filter by. Defaults to None.
            The graph ID is normally set in your langgraph.json configuration.
        limit (int, optional): The maximum number of results to return. Defaults to 10.
        offset (int, optional): The number of results to skip. Defaults to 0.

    Returns:
        list[Assistant]: A list of assistants.
    """
    payload: Dict[str, Any] = {
        "limit": limit,
        "offset": offset,
    }
    if metadata:
        payload["metadata"] = metadata
    if graph_id:
        payload["graph_id"] = graph_id
    return await self.http.post(
        "/assistants/search",
        json=payload,
    )

ThreadsClient

Access the ThreadsClient via the LangGraphClient.threads attribute.

from langgraph_sdk import get_client

client = get_client(url="http://localhost:8123")
await client.threads.<method_name>()

Source code in libs/sdk-py/langgraph_sdk/client.py
class ThreadsClient:
    def __init__(self, http: HttpClient) -> None:
        self.http = http

    async def get(self, thread_id: str) -> Thread:
        """Get a thread by ID."""
        return await self.http.get(f"/threads/{thread_id}")

    async def create(
        self,
        *,
        metadata: Metadata = None,
        thread_id: Optional[str] = None,
    ) -> Thread:
        """Create a new thread."""
        payload: Dict[str, Any] = {}
        if thread_id:
            payload["thread_id"] = thread_id
        if metadata:
            payload["metadata"] = metadata
        return await self.http.post("/threads", json=payload)

    async def update(self, thread_id: str, *, metadata: dict[str, Any]) -> Thread:
        """Update a thread."""
        return await self.http.patch(
            f"/threads/{thread_id}", json={"metadata": metadata}
        )

    async def delete(self, thread_id: str) -> None:
        """Delete a thread."""
        await self.http.delete(f"/threads/{thread_id}")

    async def search(
        self,
        *,
        metadata: Metadata = None,
        status: Optional[ThreadStatus] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Thread]:
        """Search for threads."""
        payload: Dict[str, Any] = {
            "limit": limit,
            "offset": offset,
        }
        if metadata:
            payload["metadata"] = metadata
        if status:
            payload["status"] = status
        return await self.http.post(
            "/threads/search",
            json=payload,
        )

    async def get_state(
        self, thread_id: str, checkpoint_id: Optional[str] = None
    ) -> ThreadState:
        """Get the state of a thread."""
        if checkpoint_id:
            return await self.http.get(f"/threads/{thread_id}/state/{checkpoint_id}")
        else:
            return await self.http.get(f"/threads/{thread_id}/state")

    async def update_state(
        self,
        thread_id: str,
        values: dict,
        *,
        as_node: Optional[str] = None,
        checkpoint_id: Optional[str] = None,
    ) -> None:
        """Update the state of a thread."""
        payload: Dict[str, Any] = {
            "values": values,
        }
        if checkpoint_id:
            payload["checkpoint_id"] = checkpoint_id
        if as_node:
            payload["as_node"] = as_node
        return await self.http.post(f"/threads/{thread_id}/state", json=payload)

    async def patch_state(
        self,
        thread_id: Union[str, Config],
        metadata: dict,
    ) -> None:
        """Patch the state of a thread."""
        if isinstance(thread_id, dict):
            thread_id_: str = thread_id["configurable"]["thread_id"]
        else:
            thread_id_ = thread_id
        return await self.http.patch(
            f"/threads/{thread_id_}/state",
            json={"metadata": metadata},
        )

    async def get_history(
        self,
        thread_id: str,
        limit: int = 10,
        before: Optional[str] = None,
        metadata: Optional[dict] = None,
    ) -> list[ThreadState]:
        """Get the history of a thread."""
        payload: Dict[str, Any] = {
            "limit": limit,
        }
        if before:
            payload["before"] = before
        if metadata:
            payload["metadata"] = metadata
        return await self.http.post(f"/threads/{thread_id}/history", json=payload)

get(thread_id) async

Get a thread by ID.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get(self, thread_id: str) -> Thread:
    """Get a thread by ID."""
    return await self.http.get(f"/threads/{thread_id}")

create(*, metadata=None, thread_id=None) async

Create a new thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def create(
    self,
    *,
    metadata: Metadata = None,
    thread_id: Optional[str] = None,
) -> Thread:
    """Create a new thread."""
    payload: Dict[str, Any] = {}
    if thread_id:
        payload["thread_id"] = thread_id
    if metadata:
        payload["metadata"] = metadata
    return await self.http.post("/threads", json=payload)

update(thread_id, *, metadata) async

Update a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def update(self, thread_id: str, *, metadata: dict[str, Any]) -> Thread:
    """Update a thread."""
    return await self.http.patch(
        f"/threads/{thread_id}", json={"metadata": metadata}
    )

delete(thread_id) async

Delete a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def delete(self, thread_id: str) -> None:
    """Delete a thread."""
    await self.http.delete(f"/threads/{thread_id}")

search(*, metadata=None, status=None, limit=10, offset=0) async

Search for threads.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def search(
    self,
    *,
    metadata: Metadata = None,
    status: Optional[ThreadStatus] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Thread]:
    """Search for threads."""
    payload: Dict[str, Any] = {
        "limit": limit,
        "offset": offset,
    }
    if metadata:
        payload["metadata"] = metadata
    if status:
        payload["status"] = status
    return await self.http.post(
        "/threads/search",
        json=payload,
    )

get_state(thread_id, checkpoint_id=None) async

Get the state of a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get_state(
    self, thread_id: str, checkpoint_id: Optional[str] = None
) -> ThreadState:
    """Get the state of a thread."""
    if checkpoint_id:
        return await self.http.get(f"/threads/{thread_id}/state/{checkpoint_id}")
    else:
        return await self.http.get(f"/threads/{thread_id}/state")

update_state(thread_id, values, *, as_node=None, checkpoint_id=None) async

Update the state of a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def update_state(
    self,
    thread_id: str,
    values: dict,
    *,
    as_node: Optional[str] = None,
    checkpoint_id: Optional[str] = None,
) -> None:
    """Update the state of a thread."""
    payload: Dict[str, Any] = {
        "values": values,
    }
    if checkpoint_id:
        payload["checkpoint_id"] = checkpoint_id
    if as_node:
        payload["as_node"] = as_node
    return await self.http.post(f"/threads/{thread_id}/state", json=payload)

patch_state(thread_id, metadata) async

Patch the state of a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def patch_state(
    self,
    thread_id: Union[str, Config],
    metadata: dict,
) -> None:
    """Patch the state of a thread."""
    if isinstance(thread_id, dict):
        thread_id_: str = thread_id["configurable"]["thread_id"]
    else:
        thread_id_ = thread_id
    return await self.http.patch(
        f"/threads/{thread_id_}/state",
        json={"metadata": metadata},
    )

get_history(thread_id, limit=10, before=None, metadata=None) async

Get the history of a thread.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get_history(
    self,
    thread_id: str,
    limit: int = 10,
    before: Optional[str] = None,
    metadata: Optional[dict] = None,
) -> list[ThreadState]:
    """Get the history of a thread."""
    payload: Dict[str, Any] = {
        "limit": limit,
    }
    if before:
        payload["before"] = before
    if metadata:
        payload["metadata"] = metadata
    return await self.http.post(f"/threads/{thread_id}/history", json=payload)

RunsClient

Access the RunsClient via the LangGraphClient.runs attribute.

from langgraph_sdk import get_client

client = get_client(url="http://localhost:8123")
await client.runs.<method_name>()

Source code in libs/sdk-py/langgraph_sdk/client.py
class RunsClient:
    def __init__(self, http: HttpClient) -> None:
        self.http = http

    @overload
    def stream(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        stream_mode: Union[StreamMode, list[StreamMode]] = "values",
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        feedback_keys: Optional[list[str]] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> AsyncIterator[StreamPart]:
        ...

    @overload
    def stream(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        stream_mode: Union[StreamMode, list[StreamMode]] = "values",
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        feedback_keys: Optional[list[str]] = None,
    ) -> AsyncIterator[StreamPart]:
        ...

    def stream(
        self,
        thread_id: Optional[str],
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        stream_mode: Union[StreamMode, list[StreamMode]] = "values",
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        feedback_keys: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> AsyncIterator[StreamPart]:
        """Create a run and stream the results."""
        payload = {
            "input": input,
            "config": config,
            "metadata": metadata,
            "stream_mode": stream_mode,
            "assistant_id": assistant_id,
            "interrupt_before": interrupt_before,
            "interrupt_after": interrupt_after,
            "feedback_keys": feedback_keys,
            "webhook": webhook,
            "checkpoint_id": checkpoint_id,
            "multitask_strategy": multitask_strategy,
        }
        endpoint = (
            f"/threads/{thread_id}/runs/stream"
            if thread_id is not None
            else "/runs/stream"
        )
        return self.http.stream(
            endpoint, "POST", json={k: v for k, v in payload.items() if v is not None}
        )

    @overload
    async def create(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
    ) -> Run:
        ...

    @overload
    async def create(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> Run:
        ...

    async def create(
        self,
        thread_id: Optional[str],
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> Run:
        """Create a background run."""
        payload = {
            "input": input,
            "config": config,
            "metadata": metadata,
            "assistant_id": assistant_id,
            "interrupt_before": interrupt_before,
            "interrupt_after": interrupt_after,
            "webhook": webhook,
            "checkpoint_id": checkpoint_id,
            "multitask_strategy": multitask_strategy,
        }
        payload = {k: v for k, v in payload.items() if v is not None}
        if thread_id:
            return await self.http.post(f"/threads/{thread_id}/runs", json=payload)
        else:
            return await self.http.post("/runs", json=payload)

    @overload
    async def wait(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> Union[list[dict], dict[str, Any]]:
        ...

    @overload
    async def wait(
        self,
        thread_id: None,
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
    ) -> Union[list[dict], dict[str, Any]]:
        ...

    async def wait(
        self,
        thread_id: Optional[str],
        assistant_id: str,
        *,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        checkpoint_id: Optional[str] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[MultitaskStrategy] = None,
    ) -> Union[list[dict], dict[str, Any]]:
        """Create a run, wait for and return the final state."""
        payload = {
            "input": input,
            "config": config,
            "metadata": metadata,
            "assistant_id": assistant_id,
            "interrupt_before": interrupt_before,
            "interrupt_after": interrupt_after,
            "webhook": webhook,
            "checkpoint_id": checkpoint_id,
            "multitask_strategy": multitask_strategy,
        }
        endpoint = (
            f"/threads/{thread_id}/runs/wait" if thread_id is not None else "/runs/wait"
        )
        return await self.http.post(
            endpoint, json={k: v for k, v in payload.items() if v is not None}
        )

    async def list(
        self, thread_id: str, *, limit: int = 10, offset: int = 0
    ) -> List[Run]:
        """List runs."""
        return await self.http.get(
            f"/threads/{thread_id}/runs?limit={limit}&offset={offset}"
        )

    async def get(self, thread_id: str, run_id: str) -> Run:
        """Get a run."""
        return await self.http.get(f"/threads/{thread_id}/runs/{run_id}")

    async def cancel(self, thread_id: str, run_id: str, *, wait: bool = False) -> None:
        """Get a run."""
        return await self.http.post(
            f"/threads/{thread_id}/runs/{run_id}/cancel?wait={1 if wait else 0}",
            json=None,
        )

    async def join(self, thread_id: str, run_id: str) -> None:
        """Block until a run is done."""
        return await self.http.get(f"/threads/{thread_id}/runs/{run_id}/join")

    async def delete(self, thread_id: str, run_id: str) -> None:
        """Delete a run."""
        await self.http.delete(f"/threads/{thread_id}/runs/{run_id}")

stream(thread_id, assistant_id, *, input=None, stream_mode='values', metadata=None, config=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, feedback_keys=None, webhook=None, multitask_strategy=None)

Create a run and stream the results.

Source code in libs/sdk-py/langgraph_sdk/client.py
def stream(
    self,
    thread_id: Optional[str],
    assistant_id: str,
    *,
    input: Optional[dict] = None,
    stream_mode: Union[StreamMode, list[StreamMode]] = "values",
    metadata: Optional[dict] = None,
    config: Optional[Config] = None,
    checkpoint_id: Optional[str] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    feedback_keys: Optional[list[str]] = None,
    webhook: Optional[str] = None,
    multitask_strategy: Optional[MultitaskStrategy] = None,
) -> AsyncIterator[StreamPart]:
    """Create a run and stream the results."""
    payload = {
        "input": input,
        "config": config,
        "metadata": metadata,
        "stream_mode": stream_mode,
        "assistant_id": assistant_id,
        "interrupt_before": interrupt_before,
        "interrupt_after": interrupt_after,
        "feedback_keys": feedback_keys,
        "webhook": webhook,
        "checkpoint_id": checkpoint_id,
        "multitask_strategy": multitask_strategy,
    }
    endpoint = (
        f"/threads/{thread_id}/runs/stream"
        if thread_id is not None
        else "/runs/stream"
    )
    return self.http.stream(
        endpoint, "POST", json={k: v for k, v in payload.items() if v is not None}
    )

create(thread_id, assistant_id, *, input=None, metadata=None, config=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, webhook=None, multitask_strategy=None) async

Create a background run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def create(
    self,
    thread_id: Optional[str],
    assistant_id: str,
    *,
    input: Optional[dict] = None,
    metadata: Optional[dict] = None,
    config: Optional[Config] = None,
    checkpoint_id: Optional[str] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    webhook: Optional[str] = None,
    multitask_strategy: Optional[MultitaskStrategy] = None,
) -> Run:
    """Create a background run."""
    payload = {
        "input": input,
        "config": config,
        "metadata": metadata,
        "assistant_id": assistant_id,
        "interrupt_before": interrupt_before,
        "interrupt_after": interrupt_after,
        "webhook": webhook,
        "checkpoint_id": checkpoint_id,
        "multitask_strategy": multitask_strategy,
    }
    payload = {k: v for k, v in payload.items() if v is not None}
    if thread_id:
        return await self.http.post(f"/threads/{thread_id}/runs", json=payload)
    else:
        return await self.http.post("/runs", json=payload)

wait(thread_id, assistant_id, *, input=None, metadata=None, config=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, webhook=None, multitask_strategy=None) async

Create a run, wait for and return the final state.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def wait(
    self,
    thread_id: Optional[str],
    assistant_id: str,
    *,
    input: Optional[dict] = None,
    metadata: Optional[dict] = None,
    config: Optional[Config] = None,
    checkpoint_id: Optional[str] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    webhook: Optional[str] = None,
    multitask_strategy: Optional[MultitaskStrategy] = None,
) -> Union[list[dict], dict[str, Any]]:
    """Create a run, wait for and return the final state."""
    payload = {
        "input": input,
        "config": config,
        "metadata": metadata,
        "assistant_id": assistant_id,
        "interrupt_before": interrupt_before,
        "interrupt_after": interrupt_after,
        "webhook": webhook,
        "checkpoint_id": checkpoint_id,
        "multitask_strategy": multitask_strategy,
    }
    endpoint = (
        f"/threads/{thread_id}/runs/wait" if thread_id is not None else "/runs/wait"
    )
    return await self.http.post(
        endpoint, json={k: v for k, v in payload.items() if v is not None}
    )

list(thread_id, *, limit=10, offset=0) async

List runs.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def list(
    self, thread_id: str, *, limit: int = 10, offset: int = 0
) -> List[Run]:
    """List runs."""
    return await self.http.get(
        f"/threads/{thread_id}/runs?limit={limit}&offset={offset}"
    )

get(thread_id, run_id) async

Get a run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def get(self, thread_id: str, run_id: str) -> Run:
    """Get a run."""
    return await self.http.get(f"/threads/{thread_id}/runs/{run_id}")

cancel(thread_id, run_id, *, wait=False) async

Get a run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def cancel(self, thread_id: str, run_id: str, *, wait: bool = False) -> None:
    """Get a run."""
    return await self.http.post(
        f"/threads/{thread_id}/runs/{run_id}/cancel?wait={1 if wait else 0}",
        json=None,
    )

join(thread_id, run_id) async

Block until a run is done.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def join(self, thread_id: str, run_id: str) -> None:
    """Block until a run is done."""
    return await self.http.get(f"/threads/{thread_id}/runs/{run_id}/join")

delete(thread_id, run_id) async

Delete a run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def delete(self, thread_id: str, run_id: str) -> None:
    """Delete a run."""
    await self.http.delete(f"/threads/{thread_id}/runs/{run_id}")

CronClient

Access the CronClient via the LangGraphClient.crons attribute.

from langgraph_sdk import get_client

client = get_client(url="http://localhost:8123")
await client.crons.<method_name>()

Source code in libs/sdk-py/langgraph_sdk/client.py
class CronClient:
    def __init__(self, http_client: HttpClient) -> None:
        self.http = http_client

    async def create_for_thread(
        self,
        thread_id: str,
        assistant_id: str,
        *,
        schedule: str,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[str] = None,
    ) -> Run:
        """Create a background run."""
        payload = {
            "schedule": schedule,
            "input": input,
            "config": config,
            "metadata": metadata,
            "assistant_id": assistant_id,
            "interrupt_before": interrupt_before,
            "interrupt_after": interrupt_after,
            "webhook": webhook,
        }
        if multitask_strategy:
            payload["multitask_strategy"] = multitask_strategy
        payload = {k: v for k, v in payload.items() if v is not None}
        return await self.http.post(f"/threads/{thread_id}/runs/crons", json=payload)

    async def create(
        self,
        assistant_id: str,
        *,
        schedule: str,
        input: Optional[dict] = None,
        metadata: Optional[dict] = None,
        config: Optional[Config] = None,
        interrupt_before: Optional[list[str]] = None,
        interrupt_after: Optional[list[str]] = None,
        webhook: Optional[str] = None,
        multitask_strategy: Optional[str] = None,
    ) -> Run:
        """Create a background run."""
        payload = {
            "schedule": schedule,
            "input": input,
            "config": config,
            "metadata": metadata,
            "assistant_id": assistant_id,
            "interrupt_before": interrupt_before,
            "interrupt_after": interrupt_after,
            "webhook": webhook,
        }
        if multitask_strategy:
            payload["multitask_strategy"] = multitask_strategy
        payload = {k: v for k, v in payload.items() if v is not None}
        return await self.http.post("/runs/crons", json=payload)

    async def delete(self, cron_id: str) -> None:
        """Delete a cron."""
        await self.http.delete(f"/runs/crons/{cron_id}")

    async def search(
        self,
        *,
        assistant_id: Optional[str] = None,
        thread_id: Optional[str] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Cron]:
        """Get a list of cron jobs."""
        payload = {
            "assistant_id": assistant_id,
            "thread_id": thread_id,
            "limit": limit,
            "offset": offset,
        }
        payload = {k: v for k, v in payload.items() if v is not None}
        return await self.http.post("/runs/crons/search", json=payload)

create_for_thread(thread_id, assistant_id, *, schedule, input=None, metadata=None, config=None, interrupt_before=None, interrupt_after=None, webhook=None, multitask_strategy=None) async

Create a background run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def create_for_thread(
    self,
    thread_id: str,
    assistant_id: str,
    *,
    schedule: str,
    input: Optional[dict] = None,
    metadata: Optional[dict] = None,
    config: Optional[Config] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    webhook: Optional[str] = None,
    multitask_strategy: Optional[str] = None,
) -> Run:
    """Create a background run."""
    payload = {
        "schedule": schedule,
        "input": input,
        "config": config,
        "metadata": metadata,
        "assistant_id": assistant_id,
        "interrupt_before": interrupt_before,
        "interrupt_after": interrupt_after,
        "webhook": webhook,
    }
    if multitask_strategy:
        payload["multitask_strategy"] = multitask_strategy
    payload = {k: v for k, v in payload.items() if v is not None}
    return await self.http.post(f"/threads/{thread_id}/runs/crons", json=payload)

create(assistant_id, *, schedule, input=None, metadata=None, config=None, interrupt_before=None, interrupt_after=None, webhook=None, multitask_strategy=None) async

Create a background run.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def create(
    self,
    assistant_id: str,
    *,
    schedule: str,
    input: Optional[dict] = None,
    metadata: Optional[dict] = None,
    config: Optional[Config] = None,
    interrupt_before: Optional[list[str]] = None,
    interrupt_after: Optional[list[str]] = None,
    webhook: Optional[str] = None,
    multitask_strategy: Optional[str] = None,
) -> Run:
    """Create a background run."""
    payload = {
        "schedule": schedule,
        "input": input,
        "config": config,
        "metadata": metadata,
        "assistant_id": assistant_id,
        "interrupt_before": interrupt_before,
        "interrupt_after": interrupt_after,
        "webhook": webhook,
    }
    if multitask_strategy:
        payload["multitask_strategy"] = multitask_strategy
    payload = {k: v for k, v in payload.items() if v is not None}
    return await self.http.post("/runs/crons", json=payload)

delete(cron_id) async

Delete a cron.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def delete(self, cron_id: str) -> None:
    """Delete a cron."""
    await self.http.delete(f"/runs/crons/{cron_id}")

search(*, assistant_id=None, thread_id=None, limit=10, offset=0) async

Get a list of cron jobs.

Source code in libs/sdk-py/langgraph_sdk/client.py
async def search(
    self,
    *,
    assistant_id: Optional[str] = None,
    thread_id: Optional[str] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Cron]:
    """Get a list of cron jobs."""
    payload = {
        "assistant_id": assistant_id,
        "thread_id": thread_id,
        "limit": limit,
        "offset": offset,
    }
    payload = {k: v for k, v in payload.items() if v is not None}
    return await self.http.post("/runs/crons/search", json=payload)

Comments