Skip to content

Channels

BaseChannel

Bases: Generic[Value, Update, C], ABC

Source code in libs/langgraph/langgraph/channels/base.py
class BaseChannel(Generic[Value, Update, C], ABC):
    __slots__ = ("key", "typ")

    def __init__(self, typ: Type[Any], key: str = "") -> None:
        self.typ = typ
        self.key = key

    @property
    @abstractmethod
    def ValueType(self) -> Any:
        """The type of the value stored in the channel."""

    @property
    @abstractmethod
    def UpdateType(self) -> Any:
        """The type of the update received by the channel."""

    # serialize/deserialize methods

    def checkpoint(self) -> Optional[C]:
        """Return a serializable representation of the channel's current state.
        Raises EmptyChannelError if the channel is empty (never updated yet),
        or doesn't support checkpoints."""
        return self.get()

    @abstractmethod
    def from_checkpoint(self, checkpoint: Optional[C]) -> Self:
        """Return a new identical channel, optionally initialized from a checkpoint.
        If the checkpoint contains complex data structures, they should be copied."""

    # state methods

    @abstractmethod
    def update(self, values: Sequence[Update]) -> bool:
        """Update the channel's value with the given sequence of updates.
        The order of the updates in the sequence is arbitrary.
        This method is called by Pregel for all channels at the end of each step.
        If there are no updates, it is called with an empty sequence.
        Raises InvalidUpdateError if the sequence of updates is invalid.
        Returns True if the channel was updated, False otherwise."""

    @abstractmethod
    def get(self) -> Value:
        """Return the current value of the channel.

        Raises EmptyChannelError if the channel is empty (never updated yet)."""

    def consume(self) -> bool:
        """Mark the current value of the channel as consumed. By default, no-op.
        This is called by Pregel before the start of the next step, for all
        channels that triggered a node. If the channel was updated, return True.
        """
        return False

ValueType: Any abstractmethod property

The type of the value stored in the channel.

UpdateType: Any abstractmethod property

The type of the update received by the channel.

checkpoint() -> Optional[C]

Return a serializable representation of the channel's current state. Raises EmptyChannelError if the channel is empty (never updated yet), or doesn't support checkpoints.

Source code in libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

from_checkpoint(checkpoint: Optional[C]) -> Self abstractmethod

Return a new identical channel, optionally initialized from a checkpoint. If the checkpoint contains complex data structures, they should be copied.

Source code in libs/langgraph/langgraph/channels/base.py
@abstractmethod
def from_checkpoint(self, checkpoint: Optional[C]) -> Self:
    """Return a new identical channel, optionally initialized from a checkpoint.
    If the checkpoint contains complex data structures, they should be copied."""

update(values: Sequence[Update]) -> bool abstractmethod

Update the channel's value with the given sequence of updates. The order of the updates in the sequence is arbitrary. This method is called by Pregel for all channels at the end of each step. If there are no updates, it is called with an empty sequence. Raises InvalidUpdateError if the sequence of updates is invalid. Returns True if the channel was updated, False otherwise.

Source code in libs/langgraph/langgraph/channels/base.py
@abstractmethod
def update(self, values: Sequence[Update]) -> bool:
    """Update the channel's value with the given sequence of updates.
    The order of the updates in the sequence is arbitrary.
    This method is called by Pregel for all channels at the end of each step.
    If there are no updates, it is called with an empty sequence.
    Raises InvalidUpdateError if the sequence of updates is invalid.
    Returns True if the channel was updated, False otherwise."""

get() -> Value abstractmethod

Return the current value of the channel.

Raises EmptyChannelError if the channel is empty (never updated yet).

Source code in libs/langgraph/langgraph/channels/base.py
@abstractmethod
def get(self) -> Value:
    """Return the current value of the channel.

    Raises EmptyChannelError if the channel is empty (never updated yet)."""

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

Topic

Bases: Generic[Value], BaseChannel[Sequence[Value], Union[Value, list[Value]], tuple[set[Value], list[Value]]]

A configurable PubSub Topic.

Parameters:

  • typ (Type[Value]) –

    The type of the value stored in the channel.

  • accumulate (bool, default: False ) –

    Whether to accumulate values across steps. If False, the channel will be emptied after each step.

Source code in libs/langgraph/langgraph/channels/topic.py
class Topic(
    Generic[Value],
    BaseChannel[
        Sequence[Value], Union[Value, list[Value]], tuple[set[Value], list[Value]]
    ],
):
    """A configurable PubSub Topic.

    Args:
        typ: The type of the value stored in the channel.
        accumulate: Whether to accumulate values across steps. If False, the channel will be emptied after each step.
    """

    __slots__ = ("values", "accumulate")

    def __init__(self, typ: Type[Value], accumulate: bool = False) -> None:
        super().__init__(typ)
        # attrs
        self.accumulate = accumulate
        # state
        self.values = list[Value]()

    def __eq__(self, value: object) -> bool:
        return isinstance(value, Topic) and value.accumulate == self.accumulate

    @property
    def ValueType(self) -> Any:
        """The type of the value stored in the channel."""
        return Sequence[self.typ]  # type: ignore[name-defined]

    @property
    def UpdateType(self) -> Any:
        """The type of the update received by the channel."""
        return Union[self.typ, list[self.typ]]  # type: ignore[name-defined]

    def checkpoint(self) -> tuple[set[Value], list[Value]]:
        return self.values

    def from_checkpoint(self, checkpoint: Optional[list[Value]]) -> Self:
        empty = self.__class__(self.typ, self.accumulate)
        empty.key = self.key
        if checkpoint is not None:
            if isinstance(checkpoint, tuple):
                empty.values = checkpoint[1]
            else:
                empty.values = checkpoint
        return empty

    def update(self, values: Sequence[Union[Value, list[Value]]]) -> None:
        current = list(self.values)
        if not self.accumulate:
            self.values = list[Value]()
        if flat_values := flatten(values):
            self.values.extend(flat_values)
        return self.values != current

    def get(self) -> Sequence[Value]:
        if self.values:
            return list(self.values)
        else:
            raise EmptyChannelError

ValueType: Any property

The type of the value stored in the channel.

UpdateType: Any property

The type of the update received by the channel.

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

LastValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the last value received, can receive at most one value per step.

Source code in libs/langgraph/langgraph/channels/last_value.py
class LastValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the last value received, can receive at most one value per step."""

    __slots__ = ("value",)

    def __eq__(self, value: object) -> bool:
        return isinstance(value, LastValue)

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            return False
        if len(values) != 1:
            msg = create_error_message(
                message=f"At key '{self.key}': Can receive only one value per step. Use an Annotated key to handle multiple values.",
                error_code=ErrorCode.INVALID_CONCURRENT_GRAPH_UPDATE,
            )
            raise InvalidUpdateError(msg)

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

The type of the value stored in the channel.

UpdateType: Type[Value] property

The type of the update received by the channel.

checkpoint() -> Optional[C]

Return a serializable representation of the channel's current state. Raises EmptyChannelError if the channel is empty (never updated yet), or doesn't support checkpoints.

Source code in libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

EphemeralValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the value received in the step immediately preceding, clears after.

Source code in libs/langgraph/langgraph/channels/ephemeral_value.py
class EphemeralValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the value received in the step immediately preceding, clears after."""

    __slots__ = ("value", "guard")

    def __init__(self, typ: Any, guard: bool = True) -> None:
        super().__init__(typ)
        self.guard = guard

    def __eq__(self, value: object) -> bool:
        return isinstance(value, EphemeralValue) and value.guard == self.guard

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ, self.guard)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            try:
                del self.value
                return True
            except AttributeError:
                return False
        if len(values) != 1 and self.guard:
            raise InvalidUpdateError(
                f"At key '{self.key}': EphemeralValue(guard=True) can receive only one value per step. Use guard=False if you want to store any one of multiple values."
            )

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

The type of the value stored in the channel.

UpdateType: Type[Value] property

The type of the update received by the channel.

checkpoint() -> Optional[C]

Return a serializable representation of the channel's current state. Raises EmptyChannelError if the channel is empty (never updated yet), or doesn't support checkpoints.

Source code in libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

BinaryOperatorAggregate

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the result of applying a binary operator to the current value and each new value.

import operator

total = Channels.BinaryOperatorAggregate(int, operator.add)
Source code in libs/langgraph/langgraph/channels/binop.py
class BinaryOperatorAggregate(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the result of applying a binary operator to the current value and each new value.

    ```python
    import operator

    total = Channels.BinaryOperatorAggregate(int, operator.add)
    ```
    """

    __slots__ = ("value", "operator")

    def __init__(self, typ: Type[Value], operator: Callable[[Value, Value], Value]):
        super().__init__(typ)
        self.operator = operator
        # special forms from typing or collections.abc are not instantiable
        # so we need to replace them with their concrete counterparts
        typ = _strip_extras(typ)
        if typ in (collections.abc.Sequence, collections.abc.MutableSequence):
            typ = list
        if typ in (collections.abc.Set, collections.abc.MutableSet):
            typ = set
        if typ in (collections.abc.Mapping, collections.abc.MutableMapping):
            typ = dict
        try:
            self.value = typ()
        except Exception:
            pass

    def __eq__(self, value: object) -> bool:
        return isinstance(value, BinaryOperatorAggregate) and (
            value.operator is self.operator
            if value.operator.__name__ != "<lambda>"
            and self.operator.__name__ != "<lambda>"
            else True
        )

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ, self.operator)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if not values:
            return False
        if not hasattr(self, "value"):
            self.value = values[0]
            values = values[1:]
        for value in values:
            self.value = self.operator(self.value, value)
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

The type of the value stored in the channel.

UpdateType: Type[Value] property

The type of the update received by the channel.

checkpoint() -> Optional[C]

Return a serializable representation of the channel's current state. Raises EmptyChannelError if the channel is empty (never updated yet), or doesn't support checkpoints.

Source code in libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

AnyValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the last value received, assumes that if multiple values are received, they are all equal.

Source code in libs/langgraph/langgraph/channels/any_value.py
class AnyValue(Generic[Value], BaseChannel[Value, Value, Value]):
    """Stores the last value received, assumes that if multiple values are
    received, they are all equal."""

    __slots__ = ("typ", "value")

    def __eq__(self, value: object) -> bool:
        return isinstance(value, AnyValue)

    @property
    def ValueType(self) -> Type[Value]:
        """The type of the value stored in the channel."""
        return self.typ

    @property
    def UpdateType(self) -> Type[Value]:
        """The type of the update received by the channel."""
        return self.typ

    def from_checkpoint(self, checkpoint: Optional[Value]) -> Self:
        empty = self.__class__(self.typ)
        empty.key = self.key
        if checkpoint is not None:
            empty.value = checkpoint
        return empty

    def update(self, values: Sequence[Value]) -> bool:
        if len(values) == 0:
            try:
                del self.value
                return True
            except AttributeError:
                return False

        self.value = values[-1]
        return True

    def get(self) -> Value:
        try:
            return self.value
        except AttributeError:
            raise EmptyChannelError()

ValueType: Type[Value] property

The type of the value stored in the channel.

UpdateType: Type[Value] property

The type of the update received by the channel.

checkpoint() -> Optional[C]

Return a serializable representation of the channel's current state. Raises EmptyChannelError if the channel is empty (never updated yet), or doesn't support checkpoints.

Source code in libs/langgraph/langgraph/channels/base.py
def checkpoint(self) -> Optional[C]:
    """Return a serializable representation of the channel's current state.
    Raises EmptyChannelError if the channel is empty (never updated yet),
    or doesn't support checkpoints."""
    return self.get()

consume() -> bool

Mark the current value of the channel as consumed. By default, no-op. This is called by Pregel before the start of the next step, for all channels that triggered a node. If the channel was updated, return True.

Source code in libs/langgraph/langgraph/channels/base.py
def consume(self) -> bool:
    """Mark the current value of the channel as consumed. By default, no-op.
    This is called by Pregel before the start of the next step, for all
    channels that triggered a node. If the channel was updated, return True.
    """
    return False

Comments