{ "cells": [ { "cell_type": "markdown", "id": "b23ced4e-dc29-43be-9f94-0c36bb181b8a", "metadata": {}, "source": [ "# How to stream events from within a tool (without LangChain LLMs / tools)" ] }, { "cell_type": "markdown", "id": "7044eeb8-4074-4f9c-8a62-962488744557", "metadata": {}, "source": [ "In this example we will stream tokens from within tools that an agent is using. We'll also be using OpenAI client library directly, without using LangChain chat models. We will use a ReAct agent as an example." ] }, { "cell_type": "markdown", "id": "a37f60af-43ea-4aa6-847a-df8cc47065f5", "metadata": {}, "source": [ "## Setup\n", "\n", "First, let's install the required packages and set our API keys" ] }, { "cell_type": "code", "execution_count": 1, "id": "47f79af8-58d8-4a48-8d9a-88823d88701f", "metadata": {}, "outputs": [], "source": [ "%%capture --no-stderr\n", "%pip install -U langgraph openai" ] }, { "cell_type": "code", "execution_count": null, "id": "0cf6b41d-7fcb-40b6-9a72-229cdd00a094", "metadata": {}, "outputs": [], "source": [ "import getpass\n", "import os\n", "\n", "\n", "def _set_env(var: str):\n", " if not os.environ.get(var):\n", " os.environ[var] = getpass.getpass(f\"{var}: \")\n", "\n", "\n", "_set_env(\"OPENAI_API_KEY\")" ] }, { "cell_type": "markdown", "id": "d8df7b58", "metadata": {}, "source": [ "
\n", "

Set up LangSmith for LangGraph development

\n", "

\n", " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", "

\n", "
" ] }, { "cell_type": "markdown", "id": "e3d02ebb-c2e1-4ef7-b187-810d55139317", "metadata": {}, "source": [ "## Define the graph" ] }, { "cell_type": "markdown", "id": "3ba684f1-d46b-42e4-95cf-9685209a5992", "metadata": {}, "source": [ "### Define a node that will call OpenAI API" ] }, { "cell_type": "code", "execution_count": 1, "id": "d59234f9-173e-469d-a725-c13e0979663e", "metadata": {}, "outputs": [], "source": [ "from openai import AsyncOpenAI\n", "from langchain_core.language_models.chat_models import ChatGenerationChunk\n", "from langchain_core.messages import AIMessageChunk\n", "from langchain_core.runnables.config import (\n", " ensure_config,\n", " get_callback_manager_for_config,\n", ")\n", "\n", "openai_client = AsyncOpenAI()\n", "# define tool schema for openai tool calling\n", "\n", "tool = {\n", " \"type\": \"function\",\n", " \"function\": {\n", " \"name\": \"get_items\",\n", " \"description\": \"Use this tool to look up which items are in the given place.\",\n", " \"parameters\": {\n", " \"type\": \"object\",\n", " \"properties\": {\"place\": {\"type\": \"string\"}},\n", " \"required\": [\"place\"],\n", " },\n", " },\n", "}\n", "\n", "\n", "async def call_model(state, config=None):\n", " config = ensure_config(config | {\"tags\": [\"agent_llm\"]})\n", " callback_manager = get_callback_manager_for_config(config)\n", " messages = state[\"messages\"]\n", "\n", " llm_run_manager = callback_manager.on_chat_model_start({}, [messages])[0]\n", " response = await openai_client.chat.completions.create(\n", " messages=messages, model=\"gpt-3.5-turbo\", tools=[tool], stream=True\n", " )\n", "\n", " response_content = \"\"\n", " role = None\n", "\n", " tool_call_id = None\n", " tool_call_function_name = None\n", " tool_call_function_arguments = \"\"\n", " async for chunk in response:\n", " delta = chunk.choices[0].delta\n", " if delta.role is not None:\n", " role = delta.role\n", "\n", " if delta.content:\n", " response_content += delta.content\n", " llm_run_manager.on_llm_new_token(delta.content)\n", "\n", " if delta.tool_calls:\n", " # note: for simplicity we're only handling a single tool call here\n", " if delta.tool_calls[0].function.name is not None:\n", " tool_call_function_name = delta.tool_calls[0].function.name\n", " tool_call_id = delta.tool_calls[0].id\n", "\n", " # note: we're wrapping the tools calls in ChatGenerationChunk so that the events from .astream_events in the graph can render tool calls correctly\n", " tool_call_chunk = ChatGenerationChunk(\n", " message=AIMessageChunk(\n", " content=\"\",\n", " additional_kwargs={\"tool_calls\": [delta.tool_calls[0].dict()]},\n", " )\n", " )\n", " llm_run_manager.on_llm_new_token(\"\", chunk=tool_call_chunk)\n", " tool_call_function_arguments += delta.tool_calls[0].function.arguments\n", "\n", " if tool_call_function_name is not None:\n", " tool_calls = [\n", " {\n", " \"id\": tool_call_id,\n", " \"function\": {\n", " \"name\": tool_call_function_name,\n", " \"arguments\": tool_call_function_arguments,\n", " },\n", " \"type\": \"function\",\n", " }\n", " ]\n", " else:\n", " tool_calls = None\n", "\n", " response_message = {\n", " \"role\": role,\n", " \"content\": response_content,\n", " \"tool_calls\": tool_calls,\n", " }\n", " return {\"messages\": [response_message]}" ] }, { "cell_type": "markdown", "id": "3a3877e8-8ace-40d5-ad04-cbf21c6f3250", "metadata": {}, "source": [ "### Define our tools and a tool-calling node" ] }, { "cell_type": "code", "execution_count": 2, "id": "b90941d8-afe4-42ec-9262-9c3b87c3b1ec", "metadata": {}, "outputs": [], "source": [ "import json\n", "from langchain_core.callbacks import adispatch_custom_event\n", "\n", "\n", "async def get_items(place: str) -> str:\n", " \"\"\"Use this tool to look up which items are in the given place.\"\"\"\n", "\n", " # this can be replaced with any actual streaming logic that you might have\n", " def stream(place: str):\n", " if \"bed\" in place: # For under the bed\n", " yield from [\"socks\", \"shoes\", \"dust bunnies\"]\n", " elif \"shelf\" in place: # For 'shelf'\n", " yield from [\"books\", \"penciles\", \"pictures\"]\n", " else: # if the agent decides to ask about a different place\n", " yield \"cat snacks\"\n", "\n", " tokens = []\n", " for token in stream(place):\n", " await adispatch_custom_event(\n", " # this will allow you to filter events by name\n", " \"tool_call_token_stream\",\n", " {\n", " \"function_name\": \"get_items\",\n", " \"arguments\": {\"place\": place},\n", " \"tool_output_token\": token,\n", " },\n", " # this will allow you to filter events by tags\n", " config={\"tags\": [\"tool_call\"]},\n", " )\n", " tokens.append(token)\n", "\n", " return \", \".join(tokens)\n", "\n", "\n", "# define mapping to look up functions when running tools\n", "function_name_to_function = {\"get_items\": get_items}\n", "\n", "\n", "async def call_tools(state):\n", " messages = state[\"messages\"]\n", "\n", " tool_call = messages[-1][\"tool_calls\"][0]\n", " function_name = tool_call[\"function\"][\"name\"]\n", " function_arguments = tool_call[\"function\"][\"arguments\"]\n", " arguments = json.loads(function_arguments)\n", "\n", " function_response = await function_name_to_function[function_name](**arguments)\n", " tool_message = {\n", " \"tool_call_id\": tool_call[\"id\"],\n", " \"role\": \"tool\",\n", " \"name\": function_name,\n", " \"content\": function_response,\n", " }\n", " return {\"messages\": [tool_message]}" ] }, { "cell_type": "markdown", "id": "6685898c-9a1c-4803-a492-bd70574ebe38", "metadata": {}, "source": [ "### Define our graph" ] }, { "cell_type": "code", "execution_count": 3, "id": "228260be-1f9a-4195-80e0-9604f8a5dba6", "metadata": {}, "outputs": [], "source": [ "import operator\n", "from typing import Annotated, TypedDict, Literal\n", "\n", "from langgraph.graph import StateGraph, END, START\n", "\n", "\n", "class State(TypedDict):\n", " messages: Annotated[list, operator.add]\n", "\n", "\n", "def should_continue(state) -> Literal[\"tools\", END]:\n", " messages = state[\"messages\"]\n", " last_message = messages[-1]\n", " if last_message[\"tool_calls\"]:\n", " return \"tools\"\n", " return END\n", "\n", "\n", "workflow = StateGraph(State)\n", "workflow.add_edge(START, \"model\")\n", "workflow.add_node(\"model\", call_model) # i.e. our \"agent\"\n", "workflow.add_node(\"tools\", call_tools)\n", "workflow.add_conditional_edges(\"model\", should_continue)\n", "workflow.add_edge(\"tools\", \"model\")\n", "graph = workflow.compile()" ] }, { "cell_type": "markdown", "id": "d046e2ef-f208-4831-ab31-203b2e75a49a", "metadata": {}, "source": [ "## Stream tokens from within the tool" ] }, { "cell_type": "code", "execution_count": 4, "id": "45c96a79-4147-42e3-89fd-d942b2b49f6c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Tool token socks\n", "Tool token shoes\n", "Tool token dust bunnies\n" ] } ], "source": [ "async for event in graph.astream_events(\n", " {\"messages\": [{\"role\": \"user\", \"content\": \"what's in the bedroom\"}]}, version=\"v2\"\n", "):\n", " tags = event.get(\"tags\", [])\n", " if event[\"event\"] == \"on_custom_event\" and \"tool_call\" in tags:\n", " print(\"Tool token\", event[\"data\"][\"tool_output_token\"])" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 5 }