How to stream LLM tokens (without LangChain models)¶
In this guide, we will stream tokens from the language model powering an agent without using LangChain chat models. We'll be using the OpenAI client library directly in a ReAct agent as an example.
First, initialize the OpenAI SDK and define a tool schema for the model to populate using OpenAI's format:
importOpenAIfrom"openai";constopenaiClient=newOpenAI({});consttoolSchema:OpenAI.ChatCompletionTool={type:"function",function:{name:"get_items",description:"Use this tool to look up which items are in the given place.",parameters:{type:"object",properties:{place:{type:"string",},},required:["place"],}}};
Now, define a method for a LangGraph node that will call the model. It will handle formatting tool calls to and from the model, as well as streaming via custom callback events.
If you are using LangSmith, you can also wrap the OpenAI client for the same nice tracing you'd get with a LangChain chat model.
Here's what that looks like:
import{dispatchCustomEvent}from"@langchain/core/callbacks/dispatch";import{wrapOpenAI}from"langsmith/wrappers/openai";import{Annotation}from"@langchain/langgraph";constStateAnnotation=Annotation.Root({messages:Annotation<OpenAI.ChatCompletionMessageParam[]>({reducer:(x,y)=>x.concat(y),}),});// If using LangSmith, use "wrapOpenAI" on the whole client or// "traceable" to wrap a single method for nicer tracing:// https://docs.smith.langchain.com/how_to_guides/tracing/annotate_codeconstwrappedClient=wrapOpenAI(openaiClient);constcallModel=async(state:typeofStateAnnotation.State)=>{const{messages}=state;conststream=awaitwrappedClient.chat.completions.create({messages,model:"gpt-4o-mini",tools:[toolSchema],stream:true,});letresponseContent="";letrole:string="assistant";lettoolCallId:string|undefined;lettoolCallName:string|undefined;lettoolCallArgs="";forawait(constchunkofstream){constdelta=chunk.choices[0].delta;if(delta.role!==undefined){role=delta.role;}if(delta.content){responseContent+=delta.content;awaitdispatchCustomEvent("streamed_token",{content:delta.content,});}if(delta.tool_calls!==undefined&&delta.tool_calls.length>0){// note: for simplicity we're only handling a single tool call hereconsttoolCall=delta.tool_calls[0];if(toolCall.function?.name!==undefined){toolCallName=toolCall.function.name;}if(toolCall.id!==undefined){toolCallId=toolCall.id;}awaitdispatchCustomEvent("streamed_tool_call_chunk",toolCall);toolCallArgs+=toolCall.function?.arguments??"";}}letfinalToolCalls;if(toolCallName!==undefined&&toolCallId!==undefined){finalToolCalls=[{id:toolCallId,function:{name:toolCallName,arguments:toolCallArgs},type:"function"asconst,}];}constresponseMessage={role:roleasany,content:responseContent,tool_calls:finalToolCalls,};return{messages:[responseMessage]};}
Note that you can't call this method outside of a LangGraph node since dispatchCustomEvent will fail if it is called outside the proper context.
Next, set up the actual tool function and the node that will call it when the model populates a tool call:
constgetItems=async({place}:{place:string})=>{if(place.toLowerCase().includes("bed")){// For under the bedreturn"socks, shoes and dust bunnies";}elseif(place.toLowerCase().includes("shelf")){// For 'shelf'return"books, pencils and pictures";}else{// if the agent decides to ask about a different placereturn"cat snacks";}};constcallTools=async(state:typeofStateAnnotation.State)=>{const{messages}=state;constmostRecentMessage=messages[messages.length-1];consttoolCalls=(mostRecentMessageasOpenAI.ChatCompletionAssistantMessageParam).tool_calls;if(toolCalls===undefined||toolCalls.length===0){thrownewError("No tool calls passed to node.");}consttoolNameMap={get_items:getItems,};constfunctionName=toolCalls[0].function.name;constfunctionArguments=JSON.parse(toolCalls[0].function.arguments);constresponse=awaittoolNameMap[functionName](functionArguments);consttoolMessage={tool_call_id:toolCalls[0].id,role:"tool"asconst,name:functionName,content:response,}return{messages:[toolMessage]};}
import{StateGraph}from"@langchain/langgraph";importOpenAIfrom"openai";// We can reuse the same `GraphState` from above as it has not changed.constshouldContinue=(state:typeofStateAnnotation.State)=>{const{messages}=state;constlastMessage=messages[messages.length-1]asOpenAI.ChatCompletionAssistantMessageParam;if(lastMessage?.tool_calls!==undefined&&lastMessage?.tool_calls.length>0){return"tools";}return"__end__";}constgraph=newStateGraph(StateAnnotation).addNode("model",callModel).addNode("tools",callTools).addEdge("__start__","model").addConditionalEdges("model",shouldContinue,{tools:"tools",__end__:"__end__",}).addEdge("tools","model").compile();
And now we can use the .streamEvents method to get the streamed tokens and tool calls from the OpenAI model:
consteventStream=awaitgraph.streamEvents({messages:[{role:"user",content:"what's in the bedroom?"}]},{version:"v2"},);forawait(const{event,name,data}ofeventStream){if(event==="on_custom_event"){console.log(name,data);}}