RemoteGraph¶
RemoteGraph
¶
Bases:
The RemoteGraph
class is a client implementation for calling remote
APIs that implement the LangGraph Server API specification.
For example, the RemoteGraph
class can be used to call APIs from deployments
on LangGraph Cloud.
RemoteGraph
behaves the same way as a Graph
and can be used directly as
a node in another Graph
.
InputType
property
¶
The type of input this Runnable accepts specified as a type annotation.
OutputType
property
¶
The type of output this Runnable produces specified as a type annotation.
input_schema
property
¶
The type of input this Runnable accepts specified as a pydantic model.
output_schema
property
¶
The type of output this Runnable produces specified as a pydantic model.
config_specs
property
¶
List configurable fields for this Runnable.
get_name
¶
Get the name of the Runnable.
get_input_schema
¶
Get a pydantic model that can be used to validate input to the Runnable.
Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic input schema that depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –A config to use when generating the schema.
Returns:
-
–type [BaseModel ]A pydantic model that can be used to validate input.
get_input_jsonschema
¶
Get a JSON schema that represents the input to the Runnable.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –A config to use when generating the schema.
Returns:
-
–dict [str ,Any ]A JSON schema that represents the input to the Runnable.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
print(runnable.get_input_jsonschema())
.. versionadded:: 0.3.0
get_output_schema
¶
Get a pydantic model that can be used to validate output to the Runnable.
Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –A config to use when generating the schema.
Returns:
-
–type [BaseModel ]A pydantic model that can be used to validate output.
get_output_jsonschema
¶
Get a JSON schema that represents the output of the Runnable.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –A config to use when generating the schema.
Returns:
-
–dict [str ,Any ]A JSON schema that represents the output of the Runnable.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
print(runnable.get_output_jsonschema())
.. versionadded:: 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives
methods.
Parameters:
-
include
(
, default:Optional [Sequence [str ]]None
) –A list of fields to include in the config schema.
Returns:
-
–type [BaseModel ]A pydantic model that can be used to validate config.
get_config_jsonschema
¶
Get a JSON schema that represents the config of the Runnable.
Parameters:
-
include
(
, default:Optional [Sequence [str ]]None
) –A list of fields to include in the config schema.
Returns:
-
–dict [str ,Any ]A JSON schema that represents the config of the Runnable.
.. versionadded:: 0.3.0
get_prompts
¶
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: Union [
Runnable [Any , Other ],
Callable [[Any ], Other ],
Callable [[Iterator [Any ]], Iterator [Other ]],
Mapping [
str ,
Union [
Runnable [Any , Other ],
Callable [[Any ], Other ],
Any ,
],
],
],
) -> RunnableSerializable [Input , Other ]
Compose this Runnable with another object to create a RunnableSequence.
__ror__
¶
__ror__(
other: Union [
Runnable [Other , Any ],
Callable [[Other ], Any ],
Callable [[Iterator [Other ]], Iterator [Any ]],
Mapping [
str ,
Union [
Runnable [Other , Any ],
Callable [[Other ], Any ],
Any ,
],
],
],
) -> RunnableSerializable [Other , Output ]
Compose this Runnable with another object to create a RunnableSequence.
pipe
¶
pipe(
*others: Union [
Runnable [Any , Other ], Callable [[Any ], Other ]
],
name: Optional [str ] = None
) -> RunnableSerializable [Input , Other ]
Compose this Runnable with Runnable-like objects to make a RunnableSequence.
Equivalent to RunnableSequence(self, *others)
or self | others[0] | ...
Example
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
pick
¶
Pick keys from the output dict of this Runnable.
Pick single key
.. code-block:: python
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick list of keys
.. code-block:: python
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str,
json=as_json,
bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
assign
¶
assign(
**kwargs: Union [
Runnable [dict [str , Any ], Any ],
Callable [[dict [str , Any ]], Any ],
Mapping [
str ,
Union [
Runnable [dict [str , Any ], Any ],
Callable [[dict [str , Any ]], Any ],
],
],
],
) -> RunnableSerializable [Any , Any ]
Assigns new fields to the dict output of this Runnable.
Returns a new Runnable.
.. code-block:: python
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | llm | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | llm)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
batch
¶
batch(
inputs: list [Input ],
config: Optional [
Union [RunnableConfig , list [RunnableConfig ]]
] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional [Any ]
) -> list [Output ]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.
batch_as_completed
¶
batch_as_completed(
inputs: Sequence [Input ],
config: Optional [
Union [RunnableConfig , Sequence [RunnableConfig ]]
] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional [Any ]
) -> Iterator [tuple [int , Union [Output , Exception ]]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
abatch
async
¶
abatch(
inputs: list [Input ],
config: Optional [
Union [RunnableConfig , list [RunnableConfig ]]
] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional [Any ]
) -> list [Output ]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.
Parameters:
-
inputs
(
) –list [Input ]A list of inputs to the Runnable.
-
config
(
, default:Optional [Union [RunnableConfig ,list [RunnableConfig ]]]None
) –A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details. Defaults to None.
-
return_exceptions
(
, default:bool False
) –Whether to return exceptions instead of raising them. Defaults to False.
-
kwargs
(
, default:Optional [Any ]{}
) –Additional keyword arguments to pass to the Runnable.
Returns:
-
–list [Output ]A list of outputs from the Runnable.
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence [Input ],
config: Optional [
Union [RunnableConfig , Sequence [RunnableConfig ]]
] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional [Any ]
) -> AsyncIterator [tuple [int , Union [Output , Exception ]]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
Parameters:
-
inputs
(
) –Sequence [Input ]A list of inputs to the Runnable.
-
config
(
, default:Optional [Union [RunnableConfig ,Sequence [RunnableConfig ]]]None
) –A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details. Defaults to None. Defaults to None.
-
return_exceptions
(
, default:bool False
) –Whether to return exceptions instead of raising them. Defaults to False.
-
kwargs
(
, default:Optional [Any ]{}
) –Additional keyword arguments to pass to the Runnable.
Yields:
-
–AsyncIterator [tuple [int ,Union [Output ,Exception ]]]A tuple of the index of the input and the output from the Runnable.
astream_log
async
¶
astream_log(
input: Any ,
config: Optional [RunnableConfig ] = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Optional [Sequence [str ]] = None,
include_types: Optional [Sequence [str ]] = None,
include_tags: Optional [Sequence [str ]] = None,
exclude_names: Optional [Sequence [str ]] = None,
exclude_types: Optional [Sequence [str ]] = None,
exclude_tags: Optional [Sequence [str ]] = None,
**kwargs: Any
) -> Union [
AsyncIterator [RunLogPatch ], AsyncIterator [RunLog ]
]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
Parameters:
-
input
(
) –Any The input to the Runnable.
-
config
(
, default:Optional [RunnableConfig ]None
) –The config to use for the Runnable.
-
diff
(
, default:bool True
) –Whether to yield diffs between each step or the current state.
-
with_streamed_output_list
(
, default:bool True
) –Whether to yield the streamed_output list.
-
include_names
(
, default:Optional [Sequence [str ]]None
) –Only include logs with these names.
-
include_types
(
, default:Optional [Sequence [str ]]None
) –Only include logs with these types.
-
include_tags
(
, default:Optional [Sequence [str ]]None
) –Only include logs with these tags.
-
exclude_names
(
, default:Optional [Sequence [str ]]None
) –Exclude logs with these names.
-
exclude_types
(
, default:Optional [Sequence [str ]]None
) –Exclude logs with these types.
-
exclude_tags
(
, default:Optional [Sequence [str ]]None
) –Exclude logs with these tags.
-
kwargs
(
, default:Any {}
) –Additional keyword arguments to pass to the Runnable.
Yields:
-
–Union [AsyncIterator [RunLogPatch ],AsyncIterator [RunLog ]]A RunLogPatch or RunLog object.
transform
¶
transform(
input: Iterator [Input ],
config: Optional [RunnableConfig ] = None,
**kwargs: Optional [Any ]
) -> Iterator [Output ]
Default implementation of transform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
-
input
(
) –Iterator [Input ]An iterator of inputs to the Runnable.
-
config
(
, default:Optional [RunnableConfig ]None
) –The config to use for the Runnable. Defaults to None.
-
kwargs
(
, default:Optional [Any ]{}
) –Additional keyword arguments to pass to the Runnable.
Yields:
-
–Output The output of the Runnable.
atransform
async
¶
atransform(
input: AsyncIterator [Input ],
config: Optional [RunnableConfig ] = None,
**kwargs: Optional [Any ]
) -> AsyncIterator [Output ]
Default implementation of atransform, which buffers input and calls astream.
Subclasses should override this method if they can start producing output while input is still being generated.
Parameters:
-
input
(
) –AsyncIterator [Input ]An async iterator of inputs to the Runnable.
-
config
(
, default:Optional [RunnableConfig ]None
) –The config to use for the Runnable. Defaults to None.
-
kwargs
(
, default:Optional [Any ]{}
) –Additional keyword arguments to pass to the Runnable.
Yields:
-
–AsyncIterator [Output ]The output of the Runnable.
bind
¶
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.
Parameters:
-
kwargs
(
, default:Any {}
) –The arguments to bind to the Runnable.
Returns:
-
–Runnable [Input ,Output ]A new Runnable with the arguments bound.
Example:
.. code-block:: python
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser
llm = ChatOllama(model='llama2')
# Without bind.
chain = (
llm
| StrOutputParser()
)
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind.
chain = (
llm.bind(stop=["three"])
| StrOutputParser()
)
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_listeners
¶
with_listeners(
*,
on_start: Optional [
Union [
Callable [[Run ], None],
Callable [[Run , RunnableConfig ], None],
]
] = None,
on_end: Optional [
Union [
Callable [[Run ], None],
Callable [[Run , RunnableConfig ], None],
]
] = None,
on_error: Optional [
Union [
Callable [[Run ], None],
Callable [[Run , RunnableConfig ], None],
]
] = None
) -> Runnable [Input , Output ]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
on_start: Called before the Runnable starts running, with the Run object. on_end: Called after the Runnable finishes running, with the Run object. on_error: Called if the Runnable throws an error, with the Run object.
The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.
Parameters:
-
on_start
(
, default:Optional [Union [Callable [[Run ], None],Callable [[Run ,RunnableConfig ], None]]]None
) –Called before the Runnable starts running. Defaults to None.
-
on_end
(
, default:Optional [Union [Callable [[Run ], None],Callable [[Run ,RunnableConfig ], None]]]None
) –Called after the Runnable finishes running. Defaults to None.
-
on_error
(
, default:Optional [Union [Callable [[Run ], None],Callable [[Run ,RunnableConfig ], None]]]None
) –Called if the Runnable throws an error. Defaults to None.
Returns:
-
–Runnable [Input ,Output ]A new Runnable with the listeners bound.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep : int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start,
on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: Optional [AsyncListener ] = None,
on_end: Optional [AsyncListener ] = None,
on_error: Optional [AsyncListener ] = None
) -> Runnable [Input , Output ]
Bind async lifecycle listeners to a Runnable, returning a new Runnable.
on_start: Asynchronously called before the Runnable starts running. on_end: Asynchronously called after the Runnable finishes running. on_error: Asynchronously called if the Runnable throws an error.
The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.
Parameters:
-
on_start
(
, default:Optional [AsyncListener ]None
) –Asynchronously called before the Runnable starts running. Defaults to None.
-
on_end
(
, default:Optional [AsyncListener ]None
) –Asynchronously called after the Runnable finishes running. Defaults to None.
-
on_error
(
, default:Optional [AsyncListener ]None
) –Asynchronously called if the Runnable throws an error. Defaults to None.
Returns:
-
–Runnable [Input ,Output ]A new Runnable with the listeners bound.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep : int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj : Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj : Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*,
input_type: Optional [type [Input ]] = None,
output_type: Optional [type [Output ]] = None
) -> Runnable [Input , Output ]
Bind input and output types to a Runnable, returning a new Runnable.
Parameters:
-
input_type
(
, default:Optional [type [Input ]]None
) –The input type to bind to the Runnable. Defaults to None.
-
output_type
(
, default:Optional [type [Output ]]None
) –The output type to bind to the Runnable. Defaults to None.
Returns:
-
–Runnable [Input ,Output ]A new Runnable with the types bound.
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple [
type [BaseException ], ...
] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: Optional [
ExponentialJitterParams
] = None,
stop_after_attempt: int = 3
) -> Runnable [Input , Output ]
Create a new Runnable that retries the original Runnable on exceptions.
Parameters:
-
retry_if_exception_type
(
, default:tuple [type [BaseException ], ...](
) –Exception ,)A tuple of exception types to retry on. Defaults to (Exception,).
-
wait_exponential_jitter
(
, default:bool True
) –Whether to add jitter to the wait time between retries. Defaults to True.
-
stop_after_attempt
(
, default:int 3
) –The maximum number of attempts to make before giving up. Defaults to 3.
-
exponential_jitter_params
(
, default:Optional [ExponentialJitterParams ]None
) –Parameters for
tenacity.wait_exponential_jitter
. Namely:initial
,max
,exp_base
, andjitter
(all float values).
Returns:
-
–Runnable [Input ,Output ]A new Runnable that retries the original Runnable on exceptions.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert (count == 2)
map
¶
Return a new Runnable that maps a list of inputs to a list of outputs.
Calls invoke() with each input.
Returns:
-
–Runnable [list [Input ],list [Output ]]A new Runnable that maps a list of inputs to a list of outputs.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def _lambda(x: int) -> int:
return x + 1
runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence [Runnable [Input , Output ]],
*,
exceptions_to_handle: tuple [
type [BaseException ], ...
] = (Exception,),
exception_key: Optional [str ] = None
) -> RunnableWithFallbacks [Input , Output ]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback in order, upon failures.
Parameters:
-
fallbacks
(
) –Sequence [Runnable [Input ,Output ]]A sequence of runnables to try if the original Runnable fails.
-
exceptions_to_handle
(
, default:tuple [type [BaseException ], ...](
) –Exception ,)A tuple of exception types to handle. Defaults to (Exception,).
-
exception_key
(
, default:Optional [str ]None
) –If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input. Defaults to None.
Returns:
-
–RunnableWithFallbacks [Input ,Output ]A new Runnable that will try the original Runnable, and then each
-
–RunnableWithFallbacks [Input ,Output ]fallback in order, upon failures.
Example:
.. code-block:: python
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print(''.join(runnable.stream({}))) #foo bar
Parameters:
-
fallbacks
(
) –Sequence [Runnable [Input ,Output ]]A sequence of runnables to try if the original Runnable fails.
-
exceptions_to_handle
(
, default:tuple [type [BaseException ], ...](
) –Exception ,)A tuple of exception types to handle.
-
exception_key
(
, default:Optional [str ]None
) –If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input.
Returns:
-
–RunnableWithFallbacks [Input ,Output ]A new Runnable that will try the original Runnable, and then each
-
–RunnableWithFallbacks [Input ,Output ]fallback in order, upon failures.
as_tool
¶
as_tool(
args_schema: Optional [type [BaseModel ]] = None,
*,
name: Optional [str ] = None,
description: Optional [str ] = None,
arg_types: Optional [dict [str , type ]] = None
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool
will instantiate a BaseTool with a name, description, and
args_schema
from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema
. Alternatively (e.g., if the
Runnable takes a dict as input and the specific dict keys are not typed),
the schema can be specified directly with args_schema
. You can also
pass arg_types
to just specify the required arguments and their types.
Parameters:
-
args_schema
(
, default:Optional [type [BaseModel ]]None
) –The schema for the tool. Defaults to None.
-
name
(
, default:Optional [str ]None
) –The name of the tool. Defaults to None.
-
description
(
, default:Optional [str ]None
) –The description of the tool. Defaults to None.
-
arg_types
(
, default:Optional [dict [str ,type ]]None
) –A dictionary of argument names to types. Defaults to None.
Returns:
-
–BaseTool A BaseTool instance.
Typed dict input:
.. code-block:: python
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via args_schema
:
.. code-block:: python
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema via arg_types
:
.. code-block:: python
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
String input:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
def f(x: str) -> str:
return x + "a"
def g(x: str) -> str:
return x + "z"
runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")
.. versionadded:: 0.2.14
__init__
¶
__init__(
name: str ,
/,
*,
url: Optional [str ] = None,
api_key: Optional [str ] = None,
headers: Optional [dict [str , str ]] = None,
client: Optional [LangGraphClient ] = None,
sync_client: Optional [SyncLangGraphClient ] = None,
config: Optional [RunnableConfig ] = None,
)
Specify url
, api_key
, and/or headers
to create default sync and async clients.
If client
or sync_client
are provided, they will be used instead of the default clients.
See LangGraphClient
and SyncLangGraphClient
for details on the default clients. At least
one of url
, client
, or sync_client
must be provided.
Parameters:
-
name
(
) –str The name of the graph.
-
url
(
, default:Optional [str ]None
) –The URL of the remote API.
-
api_key
(
, default:Optional [str ]None
) –The API key to use for authentication. If not provided, it will be read from the environment (
LANGGRAPH_API_KEY
,LANGSMITH_API_KEY
, orLANGCHAIN_API_KEY
). -
headers
(
, default:Optional [dict [str ,str ]]None
) –Additional headers to include in the requests.
-
client
(
, default:Optional [LangGraphClient ]None
) –A
LangGraphClient
instance to use instead of creating a default client. -
sync_client
(
, default:Optional [SyncLangGraphClient ]None
) –A
SyncLangGraphClient
instance to use instead of creating a default client. -
config
(
, default:Optional [RunnableConfig ]None
) –An optional
RunnableConfig
instance with additional configuration.
get_graph
¶
Get graph by graph name.
This method calls GET /assistants/{assistant_id}/graph
.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –This parameter is not used.
-
xray
(
, default:Union [int ,bool ]False
) –Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.
Returns:
-
–Graph The graph information for the assistant in JSON format.
aget_graph
async
¶
Get graph by graph name.
This method calls GET /assistants/{assistant_id}/graph
.
Parameters:
-
config
(
, default:Optional [RunnableConfig ]None
) –This parameter is not used.
-
xray
(
, default:Union [int ,bool ]False
) –Include graph representation of subgraphs. If an integer value is provided, only subgraphs with a depth less than or equal to the value will be included.
Returns:
-
–Graph The graph information for the assistant in JSON format.
get_state
¶
Get the state of a thread.
This method calls POST /threads/{thread_id}/state/checkpoint
if a
checkpoint is specified in the config or GET /threads/{thread_id}/state
if no checkpoint is specified.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
subgraphs
(
, default:bool False
) –Include subgraphs in the state.
Returns:
-
–StateSnapshot The latest state of the thread.
aget_state
async
¶
Get the state of a thread.
This method calls POST /threads/{thread_id}/state/checkpoint
if a
checkpoint is specified in the config or GET /threads/{thread_id}/state
if no checkpoint is specified.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
subgraphs
(
, default:bool False
) –Include subgraphs in the state.
Returns:
-
–StateSnapshot The latest state of the thread.
get_state_history
¶
get_state_history(
config: RunnableConfig ,
*,
filter: Optional [dict [str , Any ]] = None,
before: Optional [RunnableConfig ] = None,
limit: Optional [int ] = None
) -> Iterator [StateSnapshot ]
Get the state history of a thread.
This method calls POST /threads/{thread_id}/history
.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
filter
(
, default:Optional [dict [str ,Any ]]None
) –Metadata to filter on.
-
before
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
that includes checkpoint metadata. -
limit
(
, default:Optional [int ]None
) –Max number of states to return.
Returns:
-
–Iterator [StateSnapshot ]States of the thread.
aget_state_history
async
¶
aget_state_history(
config: RunnableConfig ,
*,
filter: Optional [dict [str , Any ]] = None,
before: Optional [RunnableConfig ] = None,
limit: Optional [int ] = None
) -> AsyncIterator [StateSnapshot ]
Get the state history of a thread.
This method calls POST /threads/{thread_id}/history
.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
filter
(
, default:Optional [dict [str ,Any ]]None
) –Metadata to filter on.
-
before
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
that includes checkpoint metadata. -
limit
(
, default:Optional [int ]None
) –Max number of states to return.
Returns:
-
–AsyncIterator [StateSnapshot ]States of the thread.
update_state
¶
update_state(
config: RunnableConfig ,
values: Optional [Union [dict [str , Any ], Any ]],
as_node: Optional [str ] = None,
) -> RunnableConfig
Update the state of a thread.
This method calls POST /threads/{thread_id}/state
.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
values
(
) –Optional [Union [dict [str ,Any ],Any ]]Values to update to the state.
-
as_node
(
, default:Optional [str ]None
) –Update the state as if this node had just executed.
Returns:
-
–RunnableConfig RunnableConfig
for the updated thread.
aupdate_state
async
¶
aupdate_state(
config: RunnableConfig ,
values: Optional [Union [dict [str , Any ], Any ]],
as_node: Optional [str ] = None,
) -> RunnableConfig
Update the state of a thread.
This method calls POST /threads/{thread_id}/state
.
Parameters:
-
config
(
) –RunnableConfig A
RunnableConfig
that includesthread_id
in theconfigurable
field. -
values
(
) –Optional [Union [dict [str ,Any ],Any ]]Values to update to the state.
-
as_node
(
, default:Optional [str ]None
) –Update the state as if this node had just executed.
Returns:
-
–RunnableConfig RunnableConfig
for the updated thread.
stream
¶
stream(
input: Union [dict [str , Any ], Any ],
config: Optional [RunnableConfig ] = None,
*,
stream_mode: Optional [
Union [StreamMode , list [StreamMode ]]
] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
subgraphs: bool = False,
**kwargs: Any
) -> Iterator [Union [dict [str , Any ], Any ]]
Create a run and stream the results.
This method calls POST /threads/{thread_id}/runs/stream
if a thread_id
is speciffed in the configurable
field of the config or
POST /runs/stream
otherwise.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]Input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
for graph invocation. -
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –Stream mode(s) to use.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph before these nodes.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph after these nodes.
-
subgraphs
(
, default:bool False
) –Stream from subgraphs.
-
**kwargs
(
, default:Any {}
) –Additional params to pass to client.runs.stream.
Yields:
-
–Union [dict [str ,Any ],Any ]The output of the graph.
astream
async
¶
astream(
input: Union [dict [str , Any ], Any ],
config: Optional [RunnableConfig ] = None,
*,
stream_mode: Optional [
Union [StreamMode , list [StreamMode ]]
] = None,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
subgraphs: bool = False,
**kwargs: Any
) -> AsyncIterator [Union [dict [str , Any ], Any ]]
Create a run and stream the results.
This method calls POST /threads/{thread_id}/runs/stream
if a thread_id
is speciffed in the configurable
field of the config or
POST /runs/stream
otherwise.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]Input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
for graph invocation. -
stream_mode
(
, default:Optional [Union [StreamMode ,list [StreamMode ]]]None
) –Stream mode(s) to use.
-
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph before these nodes.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph after these nodes.
-
subgraphs
(
, default:bool False
) –Stream from subgraphs.
-
**kwargs
(
, default:Any {}
) –Additional params to pass to client.runs.stream.
Yields:
-
–AsyncIterator [Union [dict [str ,Any ],Any ]]The output of the graph.
invoke
¶
invoke(
input: Union [dict [str , Any ], Any ],
config: Optional [RunnableConfig ] = None,
*,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
**kwargs: Any
) -> Union [dict [str , Any ], Any ]
Create a run, wait until it finishes and return the final state.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]Input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
for graph invocation. -
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph before these nodes.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph after these nodes.
-
**kwargs
(
, default:Any {}
) –Additional params to pass to RemoteGraph.stream.
Returns:
-
–Union [dict [str ,Any ],Any ]The output of the graph.
ainvoke
async
¶
ainvoke(
input: Union [dict [str , Any ], Any ],
config: Optional [RunnableConfig ] = None,
*,
interrupt_before: Optional [
Union [All , Sequence [str ]]
] = None,
interrupt_after: Optional [
Union [All , Sequence [str ]]
] = None,
**kwargs: Any
) -> Union [dict [str , Any ], Any ]
Create a run, wait until it finishes and return the final state.
Parameters:
-
input
(
) –Union [dict [str ,Any ],Any ]Input to the graph.
-
config
(
, default:Optional [RunnableConfig ]None
) –A
RunnableConfig
for graph invocation. -
interrupt_before
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph before these nodes.
-
interrupt_after
(
, default:Optional [Union [All ,Sequence [str ]]]None
) –Interrupt the graph after these nodes.
-
**kwargs
(
, default:Any {}
) –Additional params to pass to RemoteGraph.astream.
Returns:
-
–Union [dict [str ,Any ],Any ]The output of the graph.