Skip to content

How to stream data from within a tool

Prerequisites

This guide assumes familiarity with the following:

If your graph involves tools that invoke LLMs (or any other LangChain Runnable objects like other graphs, LCEL chains, or retrievers), you might want to surface partial results during the execution of the tool, especially if the tool takes a longer time to run.

A common scenario is streaming LLM tokens generated by a tool calling an LLM, though this applies to any use of Runnable objects.

This guide shows how to stream data from within a tool using the astream API with stream_mode="messages" and also the more granular astream_events API. The astream API should be sufficient for most use cases.

Setup

First, let's install the required packages and set our API keys

%%capture --no-stderr
%pip install -U langgraph langchain-openai
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

Set up LangSmith for LangGraph development

Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.

Define the graph

We'll use a prebuilt ReAct agent for this guide

ASYNC IN PYTHON<=3.10

Any Langchain `RunnableLambda`, a `RunnableGenerator`, or `Tool` that invokes other runnables and is running async in python<=3.10, will have to propagate callbacks to child objects **manually**. This is because LangChain cannot automatically propagate callbacks to child objects in this case. This is a common reason why you may fail to see events being emitted from custom runnables or tools.

from langchain_core.callbacks import Callbacks
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI


@tool
async def get_items(
    place: str,
    callbacks: Callbacks,  # <--- Manually accept callbacks (needed for Python <= 3.10)
) -> str:
    """Use this tool to look up which items are in the given place."""
    # Attention when using async, you should be invoking the LLM using ainvoke!
    # If you fail to do so, streaming will not WORK.
    return await llm.ainvoke(
        [
            {
                "role": "user",
                "content": f"Can you tell me what kind of items i might find in the following place: '{place}'. "
                "List at least 3 such items separating them by a comma. And include a brief description of each item..",
            }
        ],
        {"callbacks": callbacks},
    )


llm = ChatOpenAI(model_name="gpt-4o")
tools = [get_items]
agent = create_react_agent(llm, tools=tools)

API Reference: HumanMessage | tool | ChatOpenAI | create_react_agent

Using stream_mode="messages"

Using stream_mode="messages" is a good option if you don't have any complex LCEL logic inside of nodes (or you don't need super granular progress from within the LCEL chain).

final_message = ""
async for msg, metadata in agent.astream(
    {"messages": [("human", "what items are on the shelf?")]}, stream_mode="messages"
):
    # Stream all messages from the tool node
    if (
        msg.content
        and not isinstance(msg, HumanMessage)
        and metadata["langgraph_node"] == "tools"
        and not msg.name
    ):
        print(msg.content, end="|", flush=True)
    # Final message should come from our agent
    if msg.content and metadata["langgraph_node"] == "agent":
        final_message += msg.content

Using stream events API

For simplicity, the get_items tool doesn't use any complex LCEL logic inside it -- it only invokes an LLM.

However, if the tool were more complex (e.g., using a RAG chain inside it), and you wanted to see more granular events from within the chain, then you can use the astream events API.

The example below only illustrates how to invoke the API.

Use async for the astream events API

You should generally be using `async` code (e.g., using `ainvoke` to invoke the llm) to be able to leverage the astream events API properly.

from langchain_core.messages import HumanMessage

async for event in agent.astream_events(
    {"messages": [{"role": "user", "content": "what's in the bedroom."}]}, version="v2"
):
    if (
        event["event"] == "on_chat_model_stream"
        and event["metadata"].get("langgraph_node") == "tools"
    ):
        print(event["data"]["chunk"].content, end="|", flush=True)

API Reference: HumanMessage

|In| a| bedroom|,| you| might| find| the| following| items|:

|1|.| **|Bed|**|:| The| central| piece| of| furniture| in| a| bedroom|,| typically| consisting| of| a| mattress| on| a| frame|,| where| people| sleep|.| It| often| includes| bedding| such| as| sheets|,| blankets|,| and| pillows| for| comfort|.

|2|.| **|Ward|robe|**|:| A| large|,| tall| cupboard| or| fre|estanding| piece| of| furniture| used| for| storing| clothes|.| It| may| have| hanging| space|,| shelves|,| and| sometimes| drawers| for| organizing| garments| and| accessories|.

|3|.| **|Night|stand|**|:| A| small| table| or| cabinet| placed| beside| the| bed|,| used| for| holding| items| like| a| lamp|,| alarm| clock|,| books|,| or| personal| belongings| that| might| be| needed| during| the| night| or| early| morning|.||

Comments