Pregel¶
Pregel
¶
Bases: PregelProtocol
Pregel manages the runtime behavior for LangGraph applications.
Overview¶
Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model.
Each step consists of three phases:
- Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
- Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
- Update: Update the channels with the values written by the actors in this step.
Repeat until no actors are selected for execution, or a maximum number of steps is reached.
Actors¶
An actor is a PregelNode
.
It subscribes to channels, reads data from them, and writes data to them.
It can be thought of as an actor in the Pregel algorithm.
PregelNodes
implement LangChain's
Runnable interface.
Channels¶
Channels are used to communicate between actors (PregelNodes
).
Each channel has a value type, an update type, and an update function – which
takes a sequence of updates and
modifies the stored value. Channels can be used to send data from one chain to
another, or to send data from a chain to itself in a future step. LangGraph
provides a number of built-in channels:
Basic channels: LastValue and Topic¶
LastValue
: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the nextTopic
: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values, and/or to accumulate values over the course of multiple steps.
Advanced channels: Context and BinaryOperatorAggregate¶
Context
: exposes the value of a context manager, managing its lifecycle. Useful for accessing external resources that require setup and/or teardown. eg.client = Context(httpx.Client)
BinaryOperatorAggregate
: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps. eg.total = BinaryOperatorAggregate(int, operator.add)
Examples¶
Most users will interact with Pregel via a StateGraph (Graph API) or via an entrypoint (Functional API).
However, for advanced use cases, Pregel can be used directly. If you're not sure whether you need to use Pregel directly, then the answer is probably no – you should use the Graph API or Functional API instead. These are higher-level interfaces that will compile down to Pregel under the hood.
Here are some examples to give you a sense of how it works:
Single node application
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry
node1 = (
Channel.subscribe_to("a")
| (lambda x: x + x)
| Channel.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
Using multiple nodes and multiple output channels
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry
node1 = (
Channel.subscribe_to("a")
| (lambda x: x + x)
| Channel.write_to("b")
)
node2 = (
Channel.subscribe_to("b")
| (lambda x: x + x)
| Channel.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
Using a Topic channel
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, Channel, ChannelWriteEntry
node1 = (
Channel.subscribe_to("a")
| (lambda x: x + x)
| {
"b": Channel.write_to("b"),
"c": Channel.write_to("c")
}
)
node2 = (
Channel.subscribe_to("b")
| (lambda x: x + x)
| {
"c": Channel.write_to("c"),
}
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
Using a BinaryOperatorAggregate channel
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channel
node1 = (
Channel.subscribe_to("a")
| (lambda x: x + x)
| {
"b": Channel.write_to("b"),
"c": Channel.write_to("c")
}
)
node2 = (
Channel.subscribe_to("b")
| (lambda x: x + x)
| {
"c": Channel.write_to("c"),
}
)
def reducer(current, update):
if current:
return current + " | " + "update"
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"]
)
app.invoke({"a": "foo"})
Introducing a cycle
This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to. Execution will continue until a None value is written to the channel.
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntry
example_node = (
Channel.subscribe_to("value")
| (lambda x: x + x if len(x) < 10 else None)
| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"]
)
app.invoke({"value": "a"})
Methods:
Name | Description |
---|---|
stream |
Stream graph steps for a single input. |
astream |
Asynchronously stream graph steps for a single input. |
invoke |
Run the graph with a single input and config. |
ainvoke |
Asynchronously invoke the graph on a single input. |
get_state |
Get the current state of the graph. |
aget_state |
Get the current state of the graph. |
get_state_history |
Get the history of the state of the graph. |
aget_state_history |
Asynchronously get the history of the state of the graph. |
update_state |
Update the state of the graph with the given values, as if they came from |
aupdate_state |
Asynchronously update the state of the graph with the given values, as if they came from |
bulk_update_state |
Apply updates to the graph state in bulk. Requires a checkpointer to be set. |
abulk_update_state |
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set. |
get_graph |
Return a drawable representation of the computation graph. |
aget_graph |
Return a drawable representation of the computation graph. |
get_subgraphs |
Get the subgraphs of the graph. |
aget_subgraphs |
Get the subgraphs of the graph. |
with_config |
Create a copy of the Pregel object with an updated config. |
stream
¶
stream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]
Stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
def node_a(state: State):
response = llm.invoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
astream
async
¶
astream(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: (
StreamMode | list[StreamMode] | None
) = None,
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]
Asynchronously stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input to the graph. |
required |
config
|
RunnableConfig | None
|
The configuration to use for the run. |
None
|
stream_mode
|
StreamMode | list[StreamMode] | None
|
The mode to stream output, defaults to self.stream_mode. Options are:
|
None
|
output_keys
|
str | Sequence[str] | None
|
The keys to stream, defaults to all non-context channels. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Nodes to interrupt before, defaults to all nodes in the graph. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Nodes to interrupt after, defaults to all nodes in the graph. |
None
|
checkpoint_during
|
bool | None
|
Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved. |
None
|
debug
|
bool | None
|
Whether to print debug information during execution, defaults to False. |
None
|
subgraphs
|
bool
|
Whether to stream subgraphs, defaults to False. |
False
|
Yields:
Type | Description |
---|---|
AsyncIterator[dict[str, Any] | Any]
|
The output of each step in the graph. The output shape depends on the stream_mode. |
Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
class State(TypedDict):
alist: Annotated[list, operator.add]
another_list: Annotated[list, operator.add]
builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
print(event)
# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
print(event)
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter
async def node_a(state: State, writer: StreamWriter):
writer({"custom_data": "foo"})
return {"alist": ["hi"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
print(event)
# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
question: str
answer: str
async def node_a(state: State):
response = await llm.ainvoke(state["question"])
return {"answer": response.content}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()
async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
print(event)
# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})
invoke
¶
invoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Run the graph with a single input and config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the graph. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the graph run. |
None
|
stream_mode
|
StreamMode
|
Optional[str]. The stream mode for the graph run. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to retrieve from the graph run. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run before. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt the graph run after. |
None
|
debug
|
bool | None
|
Optional. Enable debug mode for the graph run. |
None
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the graph run. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The output of the graph run. If stream_mode is "values", it returns the latest output. |
dict[str, Any] | Any
|
If stream_mode is not "values", it returns a list of output chunks. |
ainvoke
async
¶
ainvoke(
input: dict[str, Any] | Any,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode = "values",
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
checkpoint_during: bool | None = None,
debug: bool | None = None,
**kwargs: Any
) -> dict[str, Any] | Any
Asynchronously invoke the graph on a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
dict[str, Any] | Any
|
The input data for the computation. It can be a dictionary or any other type. |
required |
config
|
RunnableConfig | None
|
Optional. The configuration for the computation. |
None
|
stream_mode
|
StreamMode
|
Optional. The stream mode for the computation. Default is "values". |
'values'
|
output_keys
|
str | Sequence[str] | None
|
Optional. The output keys to include in the result. Default is None. |
None
|
interrupt_before
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt before. Default is None. |
None
|
interrupt_after
|
All | Sequence[str] | None
|
Optional. The nodes to interrupt after. Default is None. |
None
|
debug
|
bool | None
|
Optional. Whether to enable debug mode. Default is None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments. |
{}
|
Returns:
Type | Description |
---|---|
dict[str, Any] | Any
|
The result of the computation. If stream_mode is "values", it returns the latest value. |
dict[str, Any] | Any
|
If stream_mode is "chunks", it returns a list of chunks. |
get_state
¶
get_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
aget_state
async
¶
aget_state(
config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot
Get the current state of the graph.
get_state_history
¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[StateSnapshot]
Get the history of the state of the graph.
aget_state_history
async
¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> AsyncIterator[StateSnapshot]
Asynchronously get the history of the state of the graph.
update_state
¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
) -> RunnableConfig
Update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
aupdate_state
async
¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any,
as_node: str | None = None,
) -> RunnableConfig
Asynchronously update the state of the graph with the given values, as if they came from
node as_node
. If as_node
is not provided, it will be set to the last node
that updated the state, if not ambiguous.
bulk_update_state
¶
bulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
abulk_update_state
async
¶
abulk_update_state(
config: RunnableConfig,
supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
RunnableConfig
|
The config to apply the updates to. |
required |
supersteps
|
Sequence[Sequence[StateUpdate]]
|
A list of supersteps, each including a list of updates to apply sequentially to a graph state.
Each update is a tuple of the form |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If no checkpointer is set or no updates are provided. |
InvalidUpdateError
|
If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig |
RunnableConfig
|
The updated config. |
get_graph
¶
get_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
aget_graph
async
¶
aget_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
get_subgraphs
¶
get_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
Iterator[tuple[str, PregelProtocol]]
|
Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
aget_subgraphs
async
¶
aget_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace
|
str | None
|
The namespace to filter the subgraphs by. |
None
|
recurse
|
bool
|
Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned. |
False
|
Returns:
Type | Description |
---|---|
AsyncIterator[tuple[str, PregelProtocol]]
|
AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Self
Create a copy of the Pregel object with an updated config.