Skip to content

Pregel

Pregel

Bases: PregelProtocol

Pregel manages the runtime behavior for LangGraph applications.

Overview

Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.

Each step consists of three phases:

  • Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
  • Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
  • Update: Update the channels with the values written by the actors in this step.

Repeat until no actors are selected for execution, or a maximum number of steps is reached.

Actors

An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain's Runnable interface.

Channels

Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function – which takes a sequence of updates and modifies the stored value. Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step. LangGraph provides a number of built-in channels:

Basic channels: LastValue and Topic
  • LastValue: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the next
  • Topic: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values, and/or to accumulate values over the course of multiple steps.
Advanced channels: Context and BinaryOperatorAggregate
  • Context: exposes the value of a context manager, managing its lifecycle. Useful for accessing external resources that require setup and/or teardown. eg. client = Context(httpx.Client)
  • BinaryOperatorAggregate: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps. eg. total = BinaryOperatorAggregate(int, operator.add)

Examples

Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).

However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no – you should use the Graph API or Functional API instead. These are higher-level interfaces that will compile down to Pregel under the hood.

Here are some examples to give you a sense of how it works:

Single node application
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
Using multiple nodes and multiple output channels
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | Channel.write_to("b")
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | Channel.write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
Using a Topic channel
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry

node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': ['foofoo', 'foofoofoofoo']}
Using a BinaryOperatorAggregate channel
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channel


node1 = (
    Channel.subscribe_to("a")
    | (lambda x: x + x)
    | {
        "b": Channel.write_to("b"),
        "c": Channel.write_to("c")
    }
)

node2 = (
    Channel.subscribe_to("b")
    | (lambda x: x + x)
    | {
        "c": Channel.write_to("c"),
    }
)


def reducer(current, update):
    if current:
        return current + " | " + "update"
    else:
        return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"]
)

app.invoke({"a": "foo"})
{'c': 'foofoo | foofoofoofoo'}
Introducing a cycle

This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to. Execution will continue until a None value is written to the channel.

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry

example_node = (
    Channel.subscribe_to("value")
    | (lambda x: x + x if len(x) < 10 else None)
    | ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"]
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}

Methods:

Name Description
stream

Stream graph steps for a single input.

astream

Asynchronously stream graph steps for a single input.

invoke

Run the graph with a single input and config.

ainvoke

Asynchronously invoke the graph on a single input.

get_state

Get the current state of the graph.

aget_state

Get the current state of the graph.

get_state_history

Get the history of the state of the graph.

aget_state_history

Asynchronously get the history of the state of the graph.

update_state

Update the state of the graph with the given values, as if they came from

aupdate_state

Asynchronously update the state of the graph with the given values, as if they came from

bulk_update_state

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

abulk_update_state

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

get_graph

Return a drawable representation of the computation graph.

aget_graph

Return a drawable representation of the computation graph.

get_subgraphs

Get the subgraphs of the graph.

aget_subgraphs

Get the subgraphs of the graph.

with_config

Create a copy of the Pregel object with an updated config.

stream

stream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]

Stream graph steps for a single input.

Parameters:

Name Type Description Default
input dict[str, Any] | Any

The input to the graph.

required
config RunnableConfig | None

The configuration to use for the run.

None
stream_mode StreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keys str | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_before All | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_after All | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_during bool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debug bool | None

Whether to print debug information during execution, defaults to False.

None
subgraphs bool

Whether to stream subgraphs, defaults to False.

False

Yields:

Type Description
dict[str, Any] | Any

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

def node_a(state: State):
    response = llm.invoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream async

astream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]

Asynchronously stream graph steps for a single input.

Parameters:

Name Type Description Default
input dict[str, Any] | Any

The input to the graph.

required
config RunnableConfig | None

The configuration to use for the run.

None
stream_mode StreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keys str | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_before All | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_after All | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_during bool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debug bool | None

Whether to print debug information during execution, defaults to False.

None
subgraphs bool

Whether to stream subgraphs, defaults to False.

False

Yields:

Type Description
AsyncIterator[dict[str, Any] | Any]

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

async def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

async def node_a(state: State):
    response = await llm.ainvoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke

invoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Run the graph with a single input and config.

Parameters:

Name Type Description Default
input dict[str, Any] | Any

The input data for the graph. It can be a dictionary or any other type.

required
config RunnableConfig | None

Optional. The configuration for the graph run.

None
stream_mode StreamMode

Optional[str]. The stream mode for the graph run. Default is "values".

'values'
output_keys str | Sequence[str] | None

Optional. The output keys to retrieve from the graph run.

None
interrupt_before All | Sequence[str] | None

Optional. The nodes to interrupt the graph run before.

None
interrupt_after All | Sequence[str] | None

Optional. The nodes to interrupt the graph run after.

None
debug bool | None

Optional. Enable debug mode for the graph run.

None
**kwargs Any

Additional keyword arguments to pass to the graph run.

{}

Returns:

Type Description
dict[str, Any] | Any

The output of the graph run. If stream_mode is "values", it returns the latest output.

dict[str, Any] | Any

If stream_mode is not "values", it returns a list of output chunks.

ainvoke async

ainvoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Asynchronously invoke the graph on a single input.

Parameters:

Name Type Description Default
input dict[str, Any] | Any

The input data for the computation. It can be a dictionary or any other type.

required
config RunnableConfig | None

Optional. The configuration for the computation.

None
stream_mode StreamMode

Optional. The stream mode for the computation. Default is "values".

'values'
output_keys str | Sequence[str] | None

Optional. The output keys to include in the result. Default is None.

None
interrupt_before All | Sequence[str] | None

Optional. The nodes to interrupt before. Default is None.

None
interrupt_after All | Sequence[str] | None

Optional. The nodes to interrupt after. Default is None.

None
debug bool | None

Optional. Whether to enable debug mode. Default is None.

None
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
dict[str, Any] | Any

The result of the computation. If stream_mode is "values", it returns the latest value.

dict[str, Any] | Any

If stream_mode is "chunks", it returns a list of chunks.

get_state

get_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

aget_state async

aget_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[StateSnapshot]

Get the history of the state of the graph.

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[StateSnapshot]

Asynchronously get the history of the state of the graph.

update_state

update_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
) -> RunnableConfig

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: str | None = None,
) -> RunnableConfig

Asynchronously update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

bulk_update_state

bulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

Name Type Description Default
config RunnableConfig

The config to apply the updates to.

required
supersteps Sequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

Type Description
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

Name Type Description
RunnableConfig RunnableConfig

The updated config.

abulk_update_state async

abulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

Name Type Description Default
config RunnableConfig

The config to apply the updates to.

required
supersteps Sequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

Type Description
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

Name Type Description
RunnableConfig RunnableConfig

The updated config.

get_graph

get_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

aget_graph async

aget_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

get_subgraphs

get_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

Name Type Description Default
namespace str | None

The namespace to filter the subgraphs by.

None
recurse bool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

Type Description
Iterator[tuple[str, PregelProtocol]]

Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

aget_subgraphs async

aget_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

Name Type Description Default
namespace str | None

The namespace to filter the subgraphs by.

None
recurse bool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

Type Description
AsyncIterator[tuple[str, PregelProtocol]]

AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

with_config

with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Self

Create a copy of the Pregel object with an updated config.