Skip to content

Storage

Base classes and types for persistent key-value stores.

Stores enable persistence and memory that can be shared across threads, scoped to user IDs, assistant IDs, or other arbitrary namespaces.

Item

Represents a stored item with metadata.

Parameters:

  • value (dict[str, Any]) –

    The stored data as a dictionary. Keys are filterable.

  • key (str) –

    Unique identifier within the namespace.

  • namespace (tuple[str, ...]) –

    Hierarchical path defining the collection in which this document resides. Represented as a tuple of strings, allowing for nested categorization. For example: ("documents", 'user123')

  • created_at (datetime) –

    Timestamp of item creation.

  • updated_at (datetime) –

    Timestamp of last update.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class Item:
    """Represents a stored item with metadata.

    Args:
        value (dict[str, Any]): The stored data as a dictionary. Keys are filterable.
        key (str): Unique identifier within the namespace.
        namespace (tuple[str, ...]): Hierarchical path defining the collection in which this document resides.
            Represented as a tuple of strings, allowing for nested categorization.
            For example: ("documents", 'user123')
        created_at (datetime): Timestamp of item creation.
        updated_at (datetime): Timestamp of last update.
    """

    __slots__ = ("value", "key", "namespace", "created_at", "updated_at")

    def __init__(
        self,
        *,
        value: dict[str, Any],
        key: str,
        namespace: tuple[str, ...],
        created_at: datetime,
        updated_at: datetime,
    ):
        self.value = value
        self.key = key
        # The casting from json-like types is for if this object is
        # deserialized.
        self.namespace = tuple(namespace)
        self.created_at = (
            datetime.fromisoformat(cast(str, created_at))
            if isinstance(created_at, str)
            else created_at
        )
        self.updated_at = (
            datetime.fromisoformat(cast(str, created_at))
            if isinstance(updated_at, str)
            else updated_at
        )

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Item):
            return False
        return (
            self.value == other.value
            and self.key == other.key
            and self.namespace == other.namespace
            and self.created_at == other.created_at
            and self.updated_at == other.updated_at
        )

    def __hash__(self) -> int:
        return hash((self.namespace, self.key))

    def dict(self) -> dict:
        return {
            "value": self.value,
            "key": self.key,
            "namespace": list(self.namespace),
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
        }

GetOp

Bases: NamedTuple

Operation to retrieve an item by namespace and key.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class GetOp(NamedTuple):
    """Operation to retrieve an item by namespace and key."""

    namespace: tuple[str, ...]
    """Hierarchical path for the item."""
    key: str
    """Unique identifier within the namespace."""

namespace: tuple[str, ...] instance-attribute

Hierarchical path for the item.

key: str instance-attribute

Unique identifier within the namespace.

SearchOp

Bases: NamedTuple

Operation to search for items within a namespace prefix.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class SearchOp(NamedTuple):
    """Operation to search for items within a namespace prefix."""

    namespace_prefix: tuple[str, ...]
    """Hierarchical path prefix to search within."""
    filter: Optional[dict[str, Any]] = None
    """Key-value pairs to filter results."""
    limit: int = 10
    """Maximum number of items to return."""
    offset: int = 0
    """Number of items to skip before returning results."""

namespace_prefix: tuple[str, ...] instance-attribute

Hierarchical path prefix to search within.

filter: Optional[dict[str, Any]] = None class-attribute instance-attribute

Key-value pairs to filter results.

limit: int = 10 class-attribute instance-attribute

Maximum number of items to return.

offset: int = 0 class-attribute instance-attribute

Number of items to skip before returning results.

PutOp

Bases: NamedTuple

Operation to store, update, or delete an item.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class PutOp(NamedTuple):
    """Operation to store, update, or delete an item."""

    namespace: tuple[str, ...]
    """Hierarchical path for the item.

    Represented as a tuple of strings, allowing for nested categorization.
    For example: ("documents", "user123")
    """

    key: str
    """Unique identifier for the document.

    Should be distinct within its namespace.
    """

    value: Optional[dict[str, Any]]
    """Data to be stored, or None to delete the item.

    Schema:
    - Should be a dictionary where:
      - Keys are strings representing field names
      - Values can be of any serializable type
    - If None, it indicates that the item should be deleted
    """

namespace: tuple[str, ...] instance-attribute

Hierarchical path for the item.

Represented as a tuple of strings, allowing for nested categorization. For example: ("documents", "user123")

key: str instance-attribute

Unique identifier for the document.

Should be distinct within its namespace.

value: Optional[dict[str, Any]] instance-attribute

Data to be stored, or None to delete the item.

Schema: - Should be a dictionary where: - Keys are strings representing field names - Values can be of any serializable type - If None, it indicates that the item should be deleted

MatchCondition

Bases: NamedTuple

Represents a single match condition.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class MatchCondition(NamedTuple):
    """Represents a single match condition."""

    match_type: NamespaceMatchType
    path: NameSpacePath

ListNamespacesOp

Bases: NamedTuple

Operation to list namespaces with optional match conditions.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class ListNamespacesOp(NamedTuple):
    """Operation to list namespaces with optional match conditions."""

    match_conditions: Optional[tuple[MatchCondition, ...]] = None
    """A tuple of match conditions to apply to namespaces."""

    max_depth: Optional[int] = None
    """Return namespaces up to this depth in the hierarchy."""

    limit: int = 100
    """Maximum number of namespaces to return."""

    offset: int = 0
    """Number of namespaces to skip before returning results."""

match_conditions: Optional[tuple[MatchCondition, ...]] = None class-attribute instance-attribute

A tuple of match conditions to apply to namespaces.

max_depth: Optional[int] = None class-attribute instance-attribute

Return namespaces up to this depth in the hierarchy.

limit: int = 100 class-attribute instance-attribute

Maximum number of namespaces to return.

offset: int = 0 class-attribute instance-attribute

Number of namespaces to skip before returning results.

InvalidNamespaceError

Bases: ValueError

Provided namespace is invalid.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class InvalidNamespaceError(ValueError):
    """Provided namespace is invalid."""

BaseStore

Bases: ABC

Abstract base class for persistent key-value stores.

Stores enable persistence and memory that can be shared across threads, scoped to user IDs, assistant IDs, or other arbitrary namespaces.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
class BaseStore(ABC):
    """Abstract base class for persistent key-value stores.

    Stores enable persistence and memory that can be shared across threads,
    scoped to user IDs, assistant IDs, or other arbitrary namespaces.
    """

    __slots__ = ("__weakref__",)

    @abstractmethod
    def batch(self, ops: Iterable[Op]) -> list[Result]:
        """Execute multiple operations synchronously in a single batch.

        Args:
            ops: An iterable of operations to execute.

        Returns:
            A list of results, where each result corresponds to an operation in the input.
            The order of results matches the order of input operations.
        """

    @abstractmethod
    async def abatch(self, ops: Iterable[Op]) -> list[Result]:
        """Execute multiple operations asynchronously in a single batch.

        Args:
            ops: An iterable of operations to execute.

        Returns:
            A list of results, where each result corresponds to an operation in the input.
            The order of results matches the order of input operations.
        """

    def get(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
        """Retrieve a single item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.

        Returns:
            The retrieved item or None if not found.
        """
        return self.batch([GetOp(namespace, key)])[0]

    def search(
        self,
        namespace_prefix: tuple[str, ...],
        /,
        *,
        filter: Optional[dict[str, Any]] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Item]:
        """Search for items within a namespace prefix.

        Args:
            namespace_prefix: Hierarchical path prefix to search within.
            filter: Key-value pairs to filter results.
            limit: Maximum number of items to return.
            offset: Number of items to skip before returning results.

        Returns:
            List of items matching the search criteria.
        """
        return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

    def put(self, namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None:
        """Store or update an item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.
            value: Dictionary containing the item's data.
        """
        _validate_namespace(namespace)
        self.batch([PutOp(namespace, key, value)])

    def delete(self, namespace: tuple[str, ...], key: str) -> None:
        """Delete an item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.
        """
        self.batch([PutOp(namespace, key, None)])

    def list_namespaces(
        self,
        *,
        prefix: Optional[NameSpacePath] = None,
        suffix: Optional[NameSpacePath] = None,
        max_depth: Optional[int] = None,
        limit: int = 100,
        offset: int = 0,
    ) -> list[tuple[str, ...]]:
        """List and filter namespaces in the store.

        Used to explore the organization of data,
        find specific collections, or navigate the namespace hierarchy.

        Args:
            prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
            suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
            max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
                Namespaces deeper than this level will be truncated to this depth.
            limit (int): Maximum number of namespaces to return (default 100).
            offset (int): Number of namespaces to skip for pagination (default 0).

        Returns:
            List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
            Each tuple represents a full namespace path up to `max_depth`.

        Examples:

            Setting max_depth=3. Given the namespaces:
                # ("a", "b", "c")
                # ("a", "b", "d", "e")
                # ("a", "b", "d", "i")
                # ("a", "b", "f")
                # ("a", "c", "f")
                store.list_namespaces(prefix=("a", "b"), max_depth=3)
                # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
        """
        match_conditions = []
        if prefix:
            match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
        if suffix:
            match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

        op = ListNamespacesOp(
            match_conditions=tuple(match_conditions),
            max_depth=max_depth,
            limit=limit,
            offset=offset,
        )
        return self.batch([op])[0]

    async def aget(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
        """Asynchronously retrieve a single item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.

        Returns:
            The retrieved item or None if not found.
        """
        return (await self.abatch([GetOp(namespace, key)]))[0]

    async def asearch(
        self,
        namespace_prefix: tuple[str, ...],
        /,
        *,
        filter: Optional[dict[str, Any]] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> list[Item]:
        """Asynchronously search for items within a namespace prefix.

        Args:
            namespace_prefix: Hierarchical path prefix to search within.
            filter: Key-value pairs to filter results.
            limit: Maximum number of items to return.
            offset: Number of items to skip before returning results.

        Returns:
            List of items matching the search criteria.
        """
        return (await self.abatch([SearchOp(namespace_prefix, filter, limit, offset)]))[
            0
        ]

    async def aput(
        self, namespace: tuple[str, ...], key: str, value: dict[str, Any]
    ) -> None:
        """Asynchronously store or update an item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.
            value: Dictionary containing the item's data.
        """
        _validate_namespace(namespace)
        await self.abatch([PutOp(namespace, key, value)])

    async def adelete(self, namespace: tuple[str, ...], key: str) -> None:
        """Asynchronously delete an item.

        Args:
            namespace: Hierarchical path for the item.
            key: Unique identifier within the namespace.
        """
        await self.abatch([PutOp(namespace, key, None)])

    async def alist_namespaces(
        self,
        *,
        prefix: Optional[NameSpacePath] = None,
        suffix: Optional[NameSpacePath] = None,
        max_depth: Optional[int] = None,
        limit: int = 100,
        offset: int = 0,
    ) -> list[tuple[str, ...]]:
        """List and filter namespaces in the store asynchronously.

        Used to explore the organization of data,
        find specific collections, or navigate the namespace hierarchy.

        Args:
            prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
            suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
            max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
                Namespaces deeper than this level will be truncated to this depth.
            limit (int): Maximum number of namespaces to return (default 100).
            offset (int): Number of namespaces to skip for pagination (default 0).

        Returns:
            List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
            Each tuple represents a full namespace path up to `max_depth`.

        Examples:

            Setting max_depth=3. Given the namespaces:
                # ("a", "b", "c")
                # ("a", "b", "d", "e")
                # ("a", "b", "d", "i")
                # ("a", "b", "f")
                # ("a", "c", "f")
                await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
                # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
        """
        match_conditions = []
        if prefix:
            match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
        if suffix:
            match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

        op = ListNamespacesOp(
            match_conditions=tuple(match_conditions),
            max_depth=max_depth,
            limit=limit,
            offset=offset,
        )
        return (await self.abatch([op]))[0]

batch(ops: Iterable[Op]) -> list[Result] abstractmethod

Execute multiple operations synchronously in a single batch.

Parameters:

  • ops (Iterable[Op]) –

    An iterable of operations to execute.

Returns:

  • list[Result]

    A list of results, where each result corresponds to an operation in the input.

  • list[Result]

    The order of results matches the order of input operations.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
@abstractmethod
def batch(self, ops: Iterable[Op]) -> list[Result]:
    """Execute multiple operations synchronously in a single batch.

    Args:
        ops: An iterable of operations to execute.

    Returns:
        A list of results, where each result corresponds to an operation in the input.
        The order of results matches the order of input operations.
    """

abatch(ops: Iterable[Op]) -> list[Result] abstractmethod async

Execute multiple operations asynchronously in a single batch.

Parameters:

  • ops (Iterable[Op]) –

    An iterable of operations to execute.

Returns:

  • list[Result]

    A list of results, where each result corresponds to an operation in the input.

  • list[Result]

    The order of results matches the order of input operations.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
@abstractmethod
async def abatch(self, ops: Iterable[Op]) -> list[Result]:
    """Execute multiple operations asynchronously in a single batch.

    Args:
        ops: An iterable of operations to execute.

    Returns:
        A list of results, where each result corresponds to an operation in the input.
        The order of results matches the order of input operations.
    """

get(namespace: tuple[str, ...], key: str) -> Optional[Item]

Retrieve a single item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Returns:

  • Optional[Item]

    The retrieved item or None if not found.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def get(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
    """Retrieve a single item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.

    Returns:
        The retrieved item or None if not found.
    """
    return self.batch([GetOp(namespace, key)])[0]

search(namespace_prefix: tuple[str, ...], /, *, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0) -> list[Item]

Search for items within a namespace prefix.

Parameters:

  • namespace_prefix (tuple[str, ...]) –

    Hierarchical path prefix to search within.

  • filter (Optional[dict[str, Any]], default: None ) –

    Key-value pairs to filter results.

  • limit (int, default: 10 ) –

    Maximum number of items to return.

  • offset (int, default: 0 ) –

    Number of items to skip before returning results.

Returns:

  • list[Item]

    List of items matching the search criteria.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def search(
    self,
    namespace_prefix: tuple[str, ...],
    /,
    *,
    filter: Optional[dict[str, Any]] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Search for items within a namespace prefix.

    Args:
        namespace_prefix: Hierarchical path prefix to search within.
        filter: Key-value pairs to filter results.
        limit: Maximum number of items to return.
        offset: Number of items to skip before returning results.

    Returns:
        List of items matching the search criteria.
    """
    return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

put(namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None

Store or update an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

  • value (dict[str, Any]) –

    Dictionary containing the item's data.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def put(self, namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None:
    """Store or update an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
        value: Dictionary containing the item's data.
    """
    _validate_namespace(namespace)
    self.batch([PutOp(namespace, key, value)])

delete(namespace: tuple[str, ...], key: str) -> None

Delete an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def delete(self, namespace: tuple[str, ...], key: str) -> None:
    """Delete an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
    """
    self.batch([PutOp(namespace, key, None)])

list_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]]

List and filter namespaces in the store.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    store.list_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
def list_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            store.list_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return self.batch([op])[0]

aget(namespace: tuple[str, ...], key: str) -> Optional[Item] async

Asynchronously retrieve a single item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Returns:

  • Optional[Item]

    The retrieved item or None if not found.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def aget(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
    """Asynchronously retrieve a single item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.

    Returns:
        The retrieved item or None if not found.
    """
    return (await self.abatch([GetOp(namespace, key)]))[0]

asearch(namespace_prefix: tuple[str, ...], /, *, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0) -> list[Item] async

Asynchronously search for items within a namespace prefix.

Parameters:

  • namespace_prefix (tuple[str, ...]) –

    Hierarchical path prefix to search within.

  • filter (Optional[dict[str, Any]], default: None ) –

    Key-value pairs to filter results.

  • limit (int, default: 10 ) –

    Maximum number of items to return.

  • offset (int, default: 0 ) –

    Number of items to skip before returning results.

Returns:

  • list[Item]

    List of items matching the search criteria.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def asearch(
    self,
    namespace_prefix: tuple[str, ...],
    /,
    *,
    filter: Optional[dict[str, Any]] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Asynchronously search for items within a namespace prefix.

    Args:
        namespace_prefix: Hierarchical path prefix to search within.
        filter: Key-value pairs to filter results.
        limit: Maximum number of items to return.
        offset: Number of items to skip before returning results.

    Returns:
        List of items matching the search criteria.
    """
    return (await self.abatch([SearchOp(namespace_prefix, filter, limit, offset)]))[
        0
    ]

aput(namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None async

Asynchronously store or update an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

  • value (dict[str, Any]) –

    Dictionary containing the item's data.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def aput(
    self, namespace: tuple[str, ...], key: str, value: dict[str, Any]
) -> None:
    """Asynchronously store or update an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
        value: Dictionary containing the item's data.
    """
    _validate_namespace(namespace)
    await self.abatch([PutOp(namespace, key, value)])

adelete(namespace: tuple[str, ...], key: str) -> None async

Asynchronously delete an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def adelete(self, namespace: tuple[str, ...], key: str) -> None:
    """Asynchronously delete an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
    """
    await self.abatch([PutOp(namespace, key, None)])

alist_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]] async

List and filter namespaces in the store asynchronously.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def alist_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store asynchronously.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return (await self.abatch([op]))[0]

AsyncPostgresStore

Bases: AsyncBatchedBaseStore, BasePostgresStore[AsyncConnection]

Source code in libs/checkpoint-postgres/langgraph/store/postgres/aio.py
class AsyncPostgresStore(AsyncBatchedBaseStore, BasePostgresStore[AsyncConnection]):
    __slots__ = ("_deserializer",)

    def __init__(
        self,
        conn: AsyncConnection[Any],
        *,
        deserializer: Optional[
            Callable[[Union[bytes, orjson.Fragment]], dict[str, Any]]
        ] = None,
    ) -> None:
        super().__init__()
        self._deserializer = deserializer
        self.conn = conn
        self.loop = asyncio.get_running_loop()

    async def abatch(self, ops: Iterable[Op]) -> list[Result]:
        grouped_ops, num_ops = _group_ops(ops)
        results: list[Result] = [None] * num_ops

        async with self.conn.pipeline():
            tasks = []

            if GetOp in grouped_ops:
                tasks.append(
                    self._batch_get_ops(
                        cast(Sequence[tuple[int, GetOp]], grouped_ops[GetOp]), results
                    )
                )

            if PutOp in grouped_ops:
                tasks.append(
                    self._batch_put_ops(
                        cast(Sequence[tuple[int, PutOp]], grouped_ops[PutOp])
                    )
                )

            if SearchOp in grouped_ops:
                tasks.append(
                    self._batch_search_ops(
                        cast(Sequence[tuple[int, SearchOp]], grouped_ops[SearchOp]),
                        results,
                    )
                )

            if ListNamespacesOp in grouped_ops:
                tasks.append(
                    self._batch_list_namespaces_ops(
                        cast(
                            Sequence[tuple[int, ListNamespacesOp]],
                            grouped_ops[ListNamespacesOp],
                        ),
                        results,
                    )
                )

            await asyncio.gather(*tasks)

        return results

    def batch(self, ops: Iterable[Op]) -> list[Result]:
        return asyncio.run_coroutine_threadsafe(self.abatch(ops), self.loop).result()

    async def _batch_get_ops(
        self,
        get_ops: Sequence[tuple[int, GetOp]],
        results: list[Result],
    ) -> None:
        cursors = []
        for query, params, namespace, items in self._get_batch_GET_ops_queries(get_ops):
            cur = self.conn.cursor(binary=True)
            await cur.execute(query, params)
            cursors.append((cur, namespace, items))

        for cur, namespace, items in cursors:
            rows = cast(list[Row], await cur.fetchall())
            key_to_row = {row["key"]: row for row in rows}
            for idx, key in items:
                row = key_to_row.get(key)
                if row:
                    results[idx] = _row_to_item(
                        namespace, row, loader=self._deserializer
                    )
                else:
                    results[idx] = None

    async def _batch_put_ops(
        self,
        put_ops: Sequence[tuple[int, PutOp]],
    ) -> None:
        queries = self._get_batch_PUT_queries(put_ops)
        for query, params in queries:
            cur = self.conn.cursor(binary=True)
            await cur.execute(query, params)

    async def _batch_search_ops(
        self,
        search_ops: Sequence[tuple[int, SearchOp]],
        results: list[Result],
    ) -> None:
        queries = self._get_batch_search_queries(search_ops)
        cursors: list[tuple[AsyncCursor[Any], int]] = []

        for (query, params), (idx, _) in zip(queries, search_ops):
            cur = self.conn.cursor(binary=True)
            await cur.execute(query, params)
            cursors.append((cur, idx))

        for cur, idx in cursors:
            rows = cast(list[Row], await cur.fetchall())
            items = [
                _row_to_item(
                    _decode_ns_bytes(row["prefix"]), row, loader=self._deserializer
                )
                for row in rows
            ]
            results[idx] = items

    async def _batch_list_namespaces_ops(
        self,
        list_ops: Sequence[tuple[int, ListNamespacesOp]],
        results: list[Result],
    ) -> None:
        queries = self._get_batch_list_namespaces_queries(list_ops)
        cursors: list[tuple[AsyncCursor[Any], int]] = []
        for (query, params), (idx, _) in zip(queries, list_ops):
            cur = self.conn.cursor(binary=True)
            await cur.execute(query, params)
            cursors.append((cur, idx))

        for cur, idx in cursors:
            rows = cast(list[dict], await cur.fetchall())
            namespaces = [_decode_ns_bytes(row["truncated_prefix"]) for row in rows]
            results[idx] = namespaces

    @classmethod
    @asynccontextmanager
    async def from_conn_string(
        cls,
        conn_string: str,
    ) -> AsyncIterator["AsyncPostgresStore"]:
        """Create a new AsyncPostgresStore instance from a connection string.

        Args:
            conn_string (str): The Postgres connection info string.

        Returns:
            AsyncPostgresStore: A new AsyncPostgresStore instance.
        """
        async with await AsyncConnection.connect(
            conn_string, autocommit=True, prepare_threshold=0, row_factory=dict_row
        ) as conn:
            yield cls(conn=conn)

    async def setup(self) -> None:
        """Set up the store database asynchronously.

        This method creates the necessary tables in the Postgres database if they don't
        already exist and runs database migrations. It MUST be called directly by the user
        the first time the store is used.
        """
        async with self.conn.cursor() as cur:
            try:
                await cur.execute(
                    "SELECT v FROM store_migrations ORDER BY v DESC LIMIT 1"
                )
                row = cast(dict, await cur.fetchone())
                if row is None:
                    version = -1
                else:
                    version = row["v"]
            except UndefinedTable:
                version = -1
                # Create store_migrations table if it doesn't exist
                await cur.execute(
                    """
                    CREATE TABLE IF NOT EXISTS store_migrations (
                        v INTEGER PRIMARY KEY
                    )
                    """
                )
            for v, migration in enumerate(
                self.MIGRATIONS[version + 1 :], start=version + 1
            ):
                await cur.execute(migration)
                await cur.execute("INSERT INTO store_migrations (v) VALUES (%s)", (v,))

get(namespace: tuple[str, ...], key: str) -> Optional[Item]

Retrieve a single item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Returns:

  • Optional[Item]

    The retrieved item or None if not found.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def get(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
    """Retrieve a single item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.

    Returns:
        The retrieved item or None if not found.
    """
    return self.batch([GetOp(namespace, key)])[0]

search(namespace_prefix: tuple[str, ...], /, *, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0) -> list[Item]

Search for items within a namespace prefix.

Parameters:

  • namespace_prefix (tuple[str, ...]) –

    Hierarchical path prefix to search within.

  • filter (Optional[dict[str, Any]], default: None ) –

    Key-value pairs to filter results.

  • limit (int, default: 10 ) –

    Maximum number of items to return.

  • offset (int, default: 0 ) –

    Number of items to skip before returning results.

Returns:

  • list[Item]

    List of items matching the search criteria.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def search(
    self,
    namespace_prefix: tuple[str, ...],
    /,
    *,
    filter: Optional[dict[str, Any]] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Search for items within a namespace prefix.

    Args:
        namespace_prefix: Hierarchical path prefix to search within.
        filter: Key-value pairs to filter results.
        limit: Maximum number of items to return.
        offset: Number of items to skip before returning results.

    Returns:
        List of items matching the search criteria.
    """
    return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

put(namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None

Store or update an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

  • value (dict[str, Any]) –

    Dictionary containing the item's data.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def put(self, namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None:
    """Store or update an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
        value: Dictionary containing the item's data.
    """
    _validate_namespace(namespace)
    self.batch([PutOp(namespace, key, value)])

delete(namespace: tuple[str, ...], key: str) -> None

Delete an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def delete(self, namespace: tuple[str, ...], key: str) -> None:
    """Delete an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
    """
    self.batch([PutOp(namespace, key, None)])

list_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]]

List and filter namespaces in the store.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    store.list_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
def list_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            store.list_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return self.batch([op])[0]

alist_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]] async

List and filter namespaces in the store asynchronously.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def alist_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store asynchronously.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return (await self.abatch([op]))[0]

from_conn_string(conn_string: str) -> AsyncIterator[AsyncPostgresStore] async classmethod

Create a new AsyncPostgresStore instance from a connection string.

Parameters:

  • conn_string (str) –

    The Postgres connection info string.

Returns:

  • AsyncPostgresStore ( AsyncIterator[AsyncPostgresStore] ) –

    A new AsyncPostgresStore instance.

Source code in libs/checkpoint-postgres/langgraph/store/postgres/aio.py
@classmethod
@asynccontextmanager
async def from_conn_string(
    cls,
    conn_string: str,
) -> AsyncIterator["AsyncPostgresStore"]:
    """Create a new AsyncPostgresStore instance from a connection string.

    Args:
        conn_string (str): The Postgres connection info string.

    Returns:
        AsyncPostgresStore: A new AsyncPostgresStore instance.
    """
    async with await AsyncConnection.connect(
        conn_string, autocommit=True, prepare_threshold=0, row_factory=dict_row
    ) as conn:
        yield cls(conn=conn)

setup() -> None async

Set up the store database asynchronously.

This method creates the necessary tables in the Postgres database if they don't already exist and runs database migrations. It MUST be called directly by the user the first time the store is used.

Source code in libs/checkpoint-postgres/langgraph/store/postgres/aio.py
async def setup(self) -> None:
    """Set up the store database asynchronously.

    This method creates the necessary tables in the Postgres database if they don't
    already exist and runs database migrations. It MUST be called directly by the user
    the first time the store is used.
    """
    async with self.conn.cursor() as cur:
        try:
            await cur.execute(
                "SELECT v FROM store_migrations ORDER BY v DESC LIMIT 1"
            )
            row = cast(dict, await cur.fetchone())
            if row is None:
                version = -1
            else:
                version = row["v"]
        except UndefinedTable:
            version = -1
            # Create store_migrations table if it doesn't exist
            await cur.execute(
                """
                CREATE TABLE IF NOT EXISTS store_migrations (
                    v INTEGER PRIMARY KEY
                )
                """
            )
        for v, migration in enumerate(
            self.MIGRATIONS[version + 1 :], start=version + 1
        ):
            await cur.execute(migration)
            await cur.execute("INSERT INTO store_migrations (v) VALUES (%s)", (v,))

PostgresStore

Bases: BaseStore, BasePostgresStore[Connection]

Source code in libs/checkpoint-postgres/langgraph/store/postgres/base.py
class PostgresStore(BaseStore, BasePostgresStore[Connection]):
    __slots__ = ("_deserializer",)

    def __init__(
        self,
        conn: Connection[Any],
        *,
        deserializer: Optional[
            Callable[[Union[bytes, orjson.Fragment]], dict[str, Any]]
        ] = None,
    ) -> None:
        super().__init__()
        self._deserializer = deserializer
        self.conn = conn

    def batch(self, ops: Iterable[Op]) -> list[Result]:
        grouped_ops, num_ops = _group_ops(ops)
        results: list[Result] = [None] * num_ops

        with self.conn.pipeline():
            if GetOp in grouped_ops:
                self._batch_get_ops(
                    cast(Sequence[tuple[int, GetOp]], grouped_ops[GetOp]), results
                )

            if PutOp in grouped_ops:
                self._batch_put_ops(
                    cast(Sequence[tuple[int, PutOp]], grouped_ops[PutOp])
                )

            if SearchOp in grouped_ops:
                self._batch_search_ops(
                    cast(Sequence[tuple[int, SearchOp]], grouped_ops[SearchOp]),
                    results,
                )

            if ListNamespacesOp in grouped_ops:
                self._batch_list_namespaces_ops(
                    cast(
                        Sequence[tuple[int, ListNamespacesOp]],
                        grouped_ops[ListNamespacesOp],
                    ),
                    results,
                )

        return results

    async def abatch(self, ops: Iterable[Op]) -> list[Result]:
        return await asyncio.get_running_loop().run_in_executor(None, self.batch, ops)

    def _batch_get_ops(
        self,
        get_ops: Sequence[tuple[int, GetOp]],
        results: list[Result],
    ) -> None:
        cursors = []
        for query, params, namespace, items in self._get_batch_GET_ops_queries(get_ops):
            cur = self.conn.cursor(binary=True)
            cur.execute(query, params)
            cursors.append((cur, namespace, items))

        for cur, namespace, items in cursors:
            rows = cast(list[Row], cur.fetchall())
            key_to_row = {row["key"]: row for row in rows}
            for idx, key in items:
                row = key_to_row.get(key)
                if row:
                    results[idx] = _row_to_item(
                        namespace, row, loader=self._deserializer
                    )
                else:
                    results[idx] = None

    def _batch_put_ops(
        self,
        put_ops: Sequence[tuple[int, PutOp]],
    ) -> None:
        queries = self._get_batch_PUT_queries(put_ops)
        for query, params in queries:
            cur = self.conn.cursor(binary=True)
            cur.execute(query, params)

    def _batch_search_ops(
        self,
        search_ops: Sequence[tuple[int, SearchOp]],
        results: list[Result],
    ) -> None:
        queries = self._get_batch_search_queries(search_ops)
        cursors: list[tuple[Cursor[Any], int]] = []

        for (query, params), (idx, _) in zip(queries, search_ops):
            cur = self.conn.cursor(binary=True)
            cur.execute(query, params)
            cursors.append((cur, idx))

        for cur, idx in cursors:
            rows = cast(list[Row], cur.fetchall())
            items = [
                _row_to_item(
                    _decode_ns_bytes(row["prefix"]), row, loader=self._deserializer
                )
                for row in rows
            ]
            results[idx] = items

    def _batch_list_namespaces_ops(
        self,
        list_ops: Sequence[tuple[int, ListNamespacesOp]],
        results: list[Result],
    ) -> None:
        queries = self._get_batch_list_namespaces_queries(list_ops)
        cursors: list[tuple[Cursor[Any], int]] = []
        for (query, params), (idx, _) in zip(queries, list_ops):
            cur = self.conn.cursor(binary=True)
            cur.execute(query, params)
            cursors.append((cur, idx))

        for cur, idx in cursors:
            rows = cast(list[dict], cur.fetchall())
            namespaces = [_decode_ns_bytes(row["truncated_prefix"]) for row in rows]
            results[idx] = namespaces

    @classmethod
    @contextmanager
    def from_conn_string(
        cls,
        conn_string: str,
    ) -> Iterator["PostgresStore"]:
        """Create a new BasePostgresStore instance from a connection string.

        Args:
            conn_string (str): The Postgres connection info string.

        Returns:
            BasePostgresStore: A new BasePostgresStore instance.
        """
        with Connection.connect(
            conn_string, autocommit=True, prepare_threshold=0, row_factory=dict_row
        ) as conn:
            yield cls(conn=conn)

    def setup(self) -> None:
        """Set up the store database.

        This method creates the necessary tables in the Postgres database if they don't
        already exist and runs database migrations. It MUST be called directly by the user
        the first time the store is used.
        """
        with self.conn.cursor(binary=True) as cur:
            try:
                cur.execute("SELECT v FROM store_migrations ORDER BY v DESC LIMIT 1")
                row = cast(dict, cur.fetchone())
                if row is None:
                    version = -1
                else:
                    version = row["v"]
            except UndefinedTable:
                self.conn.rollback()
                version = -1
                # Create store_migrations table if it doesn't exist
                cur.execute(
                    """
                    CREATE TABLE IF NOT EXISTS store_migrations (
                        v INTEGER PRIMARY KEY
                    )
                """
                )
            for v, migration in enumerate(
                self.MIGRATIONS[version + 1 :], start=version + 1
            ):
                cur.execute(migration)
                cur.execute("INSERT INTO store_migrations (v) VALUES (%s)", (v,))

get(namespace: tuple[str, ...], key: str) -> Optional[Item]

Retrieve a single item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Returns:

  • Optional[Item]

    The retrieved item or None if not found.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def get(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
    """Retrieve a single item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.

    Returns:
        The retrieved item or None if not found.
    """
    return self.batch([GetOp(namespace, key)])[0]

search(namespace_prefix: tuple[str, ...], /, *, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0) -> list[Item]

Search for items within a namespace prefix.

Parameters:

  • namespace_prefix (tuple[str, ...]) –

    Hierarchical path prefix to search within.

  • filter (Optional[dict[str, Any]], default: None ) –

    Key-value pairs to filter results.

  • limit (int, default: 10 ) –

    Maximum number of items to return.

  • offset (int, default: 0 ) –

    Number of items to skip before returning results.

Returns:

  • list[Item]

    List of items matching the search criteria.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def search(
    self,
    namespace_prefix: tuple[str, ...],
    /,
    *,
    filter: Optional[dict[str, Any]] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Search for items within a namespace prefix.

    Args:
        namespace_prefix: Hierarchical path prefix to search within.
        filter: Key-value pairs to filter results.
        limit: Maximum number of items to return.
        offset: Number of items to skip before returning results.

    Returns:
        List of items matching the search criteria.
    """
    return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

put(namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None

Store or update an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

  • value (dict[str, Any]) –

    Dictionary containing the item's data.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def put(self, namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None:
    """Store or update an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
        value: Dictionary containing the item's data.
    """
    _validate_namespace(namespace)
    self.batch([PutOp(namespace, key, value)])

delete(namespace: tuple[str, ...], key: str) -> None

Delete an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
def delete(self, namespace: tuple[str, ...], key: str) -> None:
    """Delete an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
    """
    self.batch([PutOp(namespace, key, None)])

list_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]]

List and filter namespaces in the store.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    store.list_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
def list_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            store.list_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return self.batch([op])[0]

aget(namespace: tuple[str, ...], key: str) -> Optional[Item] async

Asynchronously retrieve a single item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Returns:

  • Optional[Item]

    The retrieved item or None if not found.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def aget(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
    """Asynchronously retrieve a single item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.

    Returns:
        The retrieved item or None if not found.
    """
    return (await self.abatch([GetOp(namespace, key)]))[0]

asearch(namespace_prefix: tuple[str, ...], /, *, filter: Optional[dict[str, Any]] = None, limit: int = 10, offset: int = 0) -> list[Item] async

Asynchronously search for items within a namespace prefix.

Parameters:

  • namespace_prefix (tuple[str, ...]) –

    Hierarchical path prefix to search within.

  • filter (Optional[dict[str, Any]], default: None ) –

    Key-value pairs to filter results.

  • limit (int, default: 10 ) –

    Maximum number of items to return.

  • offset (int, default: 0 ) –

    Number of items to skip before returning results.

Returns:

  • list[Item]

    List of items matching the search criteria.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def asearch(
    self,
    namespace_prefix: tuple[str, ...],
    /,
    *,
    filter: Optional[dict[str, Any]] = None,
    limit: int = 10,
    offset: int = 0,
) -> list[Item]:
    """Asynchronously search for items within a namespace prefix.

    Args:
        namespace_prefix: Hierarchical path prefix to search within.
        filter: Key-value pairs to filter results.
        limit: Maximum number of items to return.
        offset: Number of items to skip before returning results.

    Returns:
        List of items matching the search criteria.
    """
    return (await self.abatch([SearchOp(namespace_prefix, filter, limit, offset)]))[
        0
    ]

aput(namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None async

Asynchronously store or update an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

  • value (dict[str, Any]) –

    Dictionary containing the item's data.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def aput(
    self, namespace: tuple[str, ...], key: str, value: dict[str, Any]
) -> None:
    """Asynchronously store or update an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
        value: Dictionary containing the item's data.
    """
    _validate_namespace(namespace)
    await self.abatch([PutOp(namespace, key, value)])

adelete(namespace: tuple[str, ...], key: str) -> None async

Asynchronously delete an item.

Parameters:

  • namespace (tuple[str, ...]) –

    Hierarchical path for the item.

  • key (str) –

    Unique identifier within the namespace.

Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def adelete(self, namespace: tuple[str, ...], key: str) -> None:
    """Asynchronously delete an item.

    Args:
        namespace: Hierarchical path for the item.
        key: Unique identifier within the namespace.
    """
    await self.abatch([PutOp(namespace, key, None)])

alist_namespaces(*, prefix: Optional[NameSpacePath] = None, suffix: Optional[NameSpacePath] = None, max_depth: Optional[int] = None, limit: int = 100, offset: int = 0) -> list[tuple[str, ...]] async

List and filter namespaces in the store asynchronously.

Used to explore the organization of data, find specific collections, or navigate the namespace hierarchy.

Parameters:

  • prefix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that start with this path.

  • suffix (Optional[Tuple[str, ...]], default: None ) –

    Filter namespaces that end with this path.

  • max_depth (Optional[int], default: None ) –

    Return namespaces up to this depth in the hierarchy. Namespaces deeper than this level will be truncated to this depth.

  • limit (int, default: 100 ) –

    Maximum number of namespaces to return (default 100).

  • offset (int, default: 0 ) –

    Number of namespaces to skip for pagination (default 0).

Returns:

  • list[tuple[str, ...]]

    List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.

  • list[tuple[str, ...]]

    Each tuple represents a full namespace path up to max_depth.

Examples:

Setting max_depth=3. Given the namespaces:
    # ("a", "b", "c")
    # ("a", "b", "d", "e")
    # ("a", "b", "d", "i")
    # ("a", "b", "f")
    # ("a", "c", "f")
    await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
    # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
Source code in libs/checkpoint/langgraph/store/base/__init__.py
async def alist_namespaces(
    self,
    *,
    prefix: Optional[NameSpacePath] = None,
    suffix: Optional[NameSpacePath] = None,
    max_depth: Optional[int] = None,
    limit: int = 100,
    offset: int = 0,
) -> list[tuple[str, ...]]:
    """List and filter namespaces in the store asynchronously.

    Used to explore the organization of data,
    find specific collections, or navigate the namespace hierarchy.

    Args:
        prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
        suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
        max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
            Namespaces deeper than this level will be truncated to this depth.
        limit (int): Maximum number of namespaces to return (default 100).
        offset (int): Number of namespaces to skip for pagination (default 0).

    Returns:
        List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
        Each tuple represents a full namespace path up to `max_depth`.

    Examples:

        Setting max_depth=3. Given the namespaces:
            # ("a", "b", "c")
            # ("a", "b", "d", "e")
            # ("a", "b", "d", "i")
            # ("a", "b", "f")
            # ("a", "c", "f")
            await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
            # [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
    """
    match_conditions = []
    if prefix:
        match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
    if suffix:
        match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

    op = ListNamespacesOp(
        match_conditions=tuple(match_conditions),
        max_depth=max_depth,
        limit=limit,
        offset=offset,
    )
    return (await self.abatch([op]))[0]

from_conn_string(conn_string: str) -> Iterator[PostgresStore] classmethod

Create a new BasePostgresStore instance from a connection string.

Parameters:

  • conn_string (str) –

    The Postgres connection info string.

Returns:

  • BasePostgresStore ( Iterator[PostgresStore] ) –

    A new BasePostgresStore instance.

Source code in libs/checkpoint-postgres/langgraph/store/postgres/base.py
@classmethod
@contextmanager
def from_conn_string(
    cls,
    conn_string: str,
) -> Iterator["PostgresStore"]:
    """Create a new BasePostgresStore instance from a connection string.

    Args:
        conn_string (str): The Postgres connection info string.

    Returns:
        BasePostgresStore: A new BasePostgresStore instance.
    """
    with Connection.connect(
        conn_string, autocommit=True, prepare_threshold=0, row_factory=dict_row
    ) as conn:
        yield cls(conn=conn)

setup() -> None

Set up the store database.

This method creates the necessary tables in the Postgres database if they don't already exist and runs database migrations. It MUST be called directly by the user the first time the store is used.

Source code in libs/checkpoint-postgres/langgraph/store/postgres/base.py
def setup(self) -> None:
    """Set up the store database.

    This method creates the necessary tables in the Postgres database if they don't
    already exist and runs database migrations. It MUST be called directly by the user
    the first time the store is used.
    """
    with self.conn.cursor(binary=True) as cur:
        try:
            cur.execute("SELECT v FROM store_migrations ORDER BY v DESC LIMIT 1")
            row = cast(dict, cur.fetchone())
            if row is None:
                version = -1
            else:
                version = row["v"]
        except UndefinedTable:
            self.conn.rollback()
            version = -1
            # Create store_migrations table if it doesn't exist
            cur.execute(
                """
                CREATE TABLE IF NOT EXISTS store_migrations (
                    v INTEGER PRIMARY KEY
                )
            """
            )
        for v, migration in enumerate(
            self.MIGRATIONS[version + 1 :], start=version + 1
        ):
            cur.execute(migration)
            cur.execute("INSERT INTO store_migrations (v) VALUES (%s)", (v,))

Comments