Pass one or more of the following stream modes as a list to the stream() or astream() methods:
Mode
Description
values
Streams the full value of the state after each step of the graph.
updates
Streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately.
custom
Streams custom data from inside your graph nodes.
messages
Streams 2-tuples (LLM token, metadata) from any graph nodes where an LLM is invoked.
debug
Streams as much information as possible throughout the execution of the graph.
To stream agent progress, use the stream() or astream() methods with stream_mode="updates". This emits an event after every agent step.For example, if you have an agent that calls a tool once, you should see the following updates:
LLM node: AI message with tool call requests
Tool node: Tool message with execution result
LLM node: Final AI response
Sync
Async
Copy
Ask AI
agent = create_react_agent( model="anthropic:claude-3-7-sonnet-latest", tools=[get_weather],)# highlight-next-linefor chunk in agent.stream( {"messages": [{"role": "user", "content": "what is the weather in sf"}]}, # highlight-next-line stream_mode="updates"): print(chunk) print("\n")
To stream updates from tools as they are executed, you can use get_stream_writer.
Sync
Async
Copy
Ask AI
# highlight-next-linefrom langgraph.config import get_stream_writerdef get_weather(city: str) -> str: """Get weather for a given city.""" # highlight-next-line writer = get_stream_writer() # stream any arbitrary data # highlight-next-line writer(f"Looking up data for city: {city}") return f"It's always sunny in {city}!"agent = create_react_agent( model="anthropic:claude-3-7-sonnet-latest", tools=[get_weather],)for chunk in agent.stream( {"messages": [{"role": "user", "content": "what is the weather in sf"}]}, # highlight-next-line stream_mode="custom"): print(chunk) print("\n")
If you add get_stream_writer inside your tool, you won’t be able to invoke the tool outside of a LangGraph execution context.
In some applications you might need to disable streaming of individual tokens for a given model. This is useful in multi-agent systems to control which agents stream their output.See the Models guide to learn how to disable streaming.
LangGraph graphs expose the .stream() (sync) and .astream() (async) methods to yield streamed outputs as iterators.
Sync
Async
Copy
Ask AI
for chunk in graph.stream(inputs, stream_mode="updates"): print(chunk)
Extended example: streaming updates
Copy
Ask AI
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): topic: str joke: strdef refine_topic(state: State): return {"topic": state["topic"] + " and cats"}def generate_joke(state: State): return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, "refine_topic") .add_edge("refine_topic", "generate_joke") .add_edge("generate_joke", END) .compile())# highlight-next-linefor chunk in graph.stream( # (1)! {"topic": "ice cream"}, # highlight-next-line stream_mode="updates", # (2)!): print(chunk)
The stream() method returns an iterator that yields streamed outputs.
Set stream_mode="updates" to stream only the updates to the graph state after each node. Other stream modes are also available. See supported stream modes for details.
Copy
Ask AI
{'refineTopic': {'topic': 'ice cream and cats'}}{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
You can pass a list as the stream_mode parameter to stream multiple modes at once.The streamed outputs will be tuples of (mode, chunk) where mode is the name of the stream mode and chunk is the data streamed by that mode.
Sync
Async
Copy
Ask AI
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]): print(chunk)
Use the stream modes updates and values to stream the state of the graph as it executes.
updates streams the updates to the state after each step of the graph.
values streams the full value of the state after each step of the graph.
Copy
Ask AI
from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): topic: str joke: strdef refine_topic(state: State): return {"topic": state["topic"] + " and cats"}def generate_joke(state: State): return {"joke": f"This is a joke about {state['topic']}"}graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, "refine_topic") .add_edge("refine_topic", "generate_joke") .add_edge("generate_joke", END) .compile())
updates
values
Use this to stream only the state updates returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.
Copy
Ask AI
for chunk in graph.stream( {"topic": "ice cream"}, # highlight-next-line stream_mode="updates",): print(chunk)
To include outputs from subgraphs in the streamed outputs, you can set subgraphs=True in the .stream() method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.The outputs will be streamed as tuples (namespace, data), where namespace is a tuple with the path to the node where a subgraph is invoked, e.g. ("parent_node:<task_id>", "child_node:<task_id>").
Copy
Ask AI
for chunk in graph.stream( {"foo": "foo"}, # highlight-next-line subgraphs=True, # (1)! stream_mode="updates",): print(chunk)
Set subgraphs=True to stream outputs from subgraphs.
Extended example: streaming from subgraphs
Copy
Ask AI
from langgraph.graph import START, StateGraphfrom typing import TypedDict# Define subgraphclass SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: strdef subgraph_node_1(state: SubgraphState): return {"bar": "bar"}def subgraph_node_2(state: SubgraphState): return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)subgraph_builder.add_node(subgraph_node_1)subgraph_builder.add_node(subgraph_node_2)subgraph_builder.add_edge(START, "subgraph_node_1")subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")subgraph = subgraph_builder.compile()# Define parent graphclass ParentState(TypedDict): foo: strdef node_1(state: ParentState): return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)builder.add_node("node_1", node_1)builder.add_node("node_2", subgraph)builder.add_edge(START, "node_1")builder.add_edge("node_1", "node_2")graph = builder.compile()for chunk in graph.stream( {"foo": "foo"}, stream_mode="updates", # highlight-next-line subgraphs=True, # (1)!): print(chunk)
Set subgraphs=True to stream outputs from subgraphs.
Use the debug streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.
Copy
Ask AI
for chunk in graph.stream( {"topic": "ice cream"}, # highlight-next-line stream_mode="debug",): print(chunk)
Use the messages streaming mode to stream Large Language Model (LLM) outputs token by token from any part of your graph, including nodes, tools, subgraphs, or tasks.The streamed output from messages mode is a tuple (message_chunk, metadata) where:
message_chunk: the token or message segment from the LLM.
metadata: a dictionary containing details about the graph node and LLM invocation.
If your LLM is not available as a LangChain integration, you can stream its outputs using custom mode instead. See use with any LLM for details.
Manual config required for async in Python < 3.11
When using Python < 3.11 with async code, you must explicitly pass RunnableConfig to ainvoke() to enable proper streaming. See Async with Python < 3.11 for details or upgrade to Python 3.11+.
Copy
Ask AI
from dataclasses import dataclassfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import StateGraph, START@dataclassclass MyState: topic: str joke: str = ""llm = init_chat_model(model="openai:gpt-4o-mini")def call_model(state: MyState): """Call the LLM to generate a joke about a topic""" # highlight-next-line llm_response = llm.invoke( # (1)! [ {"role": "user", "content": f"Generate a joke about {state.topic}"} ] ) return {"joke": llm_response.content}graph = ( StateGraph(MyState) .add_node(call_model) .add_edge(START, "call_model") .compile())for message_chunk, metadata in graph.stream( # (2)! {"topic": "ice cream"}, # highlight-next-line stream_mode="messages",): if message_chunk.content: print(message_chunk.content, end="|", flush=True)
Note that the message events are emitted even when the LLM is run using .invoke rather than .stream.
The “messages” stream mode returns an iterator of tuples (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.
You can associate tags with LLM invocations to filter the streamed tokens by LLM invocation.
Copy
Ask AI
from langchain.chat_models import init_chat_modelllm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) # (1)!llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) # (2)!graph = ... # define a graph that uses these LLMsasync for msg, metadata in graph.astream( # (3)! {"topic": "cats"}, # highlight-next-line stream_mode="messages",): if metadata["tags"] == ["joke"]: # (4)! print(msg.content, end="|", flush=True)
llm_1 is tagged with “joke”.
llm_2 is tagged with “poem”.
The stream_mode is set to “messages” to stream LLM tokens. The metadata contains information about the LLM invocation, including the tags.
Filter the streamed tokens by the tags field in the metadata to only include the tokens from the LLM invocation with the “joke” tag.
Extended example: filtering by tags
Copy
Ask AI
from typing import TypedDictfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import START, StateGraphjoke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) # (1)!poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) # (2)!class State(TypedDict): topic: str joke: str poem: strasync def call_model(state, config): topic = state["topic"] print("Writing joke...") # Note: Passing the config through explicitly is required for python < 3.11 # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks joke_response = await joke_model.ainvoke( [{"role": "user", "content": f"Write a joke about {topic}"}], config, # (3)! ) print("\n\nWriting poem...") poem_response = await poem_model.ainvoke( [{"role": "user", "content": f"Write a short poem about {topic}"}], config, # (3)! ) return {"joke": joke_response.content, "poem": poem_response.content}graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, "call_model") .compile())async for msg, metadata in graph.astream( {"topic": "cats"}, # highlight-next-line stream_mode="messages", # (4)!): if metadata["tags"] == ["joke"]: # (4)! print(msg.content, end="|", flush=True)
The joke_model is tagged with “joke”.
The poem_model is tagged with “poem”.
The config is passed through explicitly to ensure the context vars are propagated correctly. This is required for Python < 3.11 when using async code. Please see the async section for more details.
The stream_mode is set to “messages” to stream LLM tokens. The metadata contains information about the LLM invocation, including the tags.
To stream tokens only from specific nodes, use stream_mode="messages" and filter the outputs by the langgraph_node field in the streamed metadata:
Copy
Ask AI
for msg, metadata in graph.stream( # (1)! inputs, # highlight-next-line stream_mode="messages",): # highlight-next-line if msg.content and metadata["langgraph_node"] == "some_node_name": # (2)! ...
The “messages” stream mode returns a tuple of (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.
Filter the streamed tokens by the langgraph_node field in the metadata to only include the tokens from the write_poem node.
Extended example: streaming LLM tokens from specific nodes
Copy
Ask AI
from typing import TypedDictfrom langgraph.graph import START, StateGraphfrom langchain_openai import ChatOpenAImodel = ChatOpenAI(model="gpt-4o-mini")class State(TypedDict): topic: str joke: str poem: strdef write_joke(state: State): topic = state["topic"] joke_response = model.invoke( [{"role": "user", "content": f"Write a joke about {topic}"}] ) return {"joke": joke_response.content}def write_poem(state: State): topic = state["topic"] poem_response = model.invoke( [{"role": "user", "content": f"Write a short poem about {topic}"}] ) return {"poem": poem_response.content}graph = ( StateGraph(State) .add_node(write_joke) .add_node(write_poem) # write both the joke and the poem concurrently .add_edge(START, "write_joke") .add_edge(START, "write_poem") .compile())# highlight-next-linefor msg, metadata in graph.stream( # (1)! {"topic": "cats"}, stream_mode="messages",): # highlight-next-line if msg.content and metadata["langgraph_node"] == "write_poem": # (2)! print(msg.content, end="|", flush=True)
The “messages” stream mode returns a tuple of (message_chunk, metadata) where message_chunk is the token streamed by the LLM and metadata is a dictionary with information about the graph node where the LLM was called and other information.
Filter the streamed tokens by the langgraph_node field in the metadata to only include the tokens from the write_poem node.
To send custom user-defined data from inside a LangGraph node or tool, follow these steps:
Use get_stream_writer() to access the stream writer and emit custom data.
Set stream_mode="custom" when calling .stream() or .astream() to get the custom data in the stream. You can combine multiple modes (e.g., ["updates", "custom"]), but at least one must be "custom".
No get_stream_writer() in async for Python < 3.11
In async code running on Python < 3.11, get_stream_writer() will not work.
Instead, add a writer parameter to your node or tool and pass it manually.
See Async with Python < 3.11 for usage examples.
You can use stream_mode="custom" to stream data from any LLM API — even if that API does not implement the LangChain chat model interface.This lets you integrate raw LLM clients or external services that provide their own streaming interfaces, making LangGraph highly flexible for custom setups.
Copy
Ask AI
from langgraph.config import get_stream_writerdef call_arbitrary_model(state): """Example node that calls an arbitrary model and streams the output""" # highlight-next-line writer = get_stream_writer() # (1)! # Assume you have a streaming client that yields chunks for chunk in your_custom_streaming_client(state["topic"]): # (2)! # highlight-next-line writer({"custom_llm_chunk": chunk}) # (3)! return {"result": "completed"}graph = ( StateGraph(State) .add_node(call_arbitrary_model) # Add other nodes and edges as needed .compile())for chunk in graph.stream( {"topic": "cats"}, # highlight-next-line stream_mode="custom", # (4)!): # The chunk will contain the custom data streamed from the llm print(chunk)
Get the stream writer to send custom data.
Generate LLM tokens using your custom streaming client.
Use the writer to send custom data to the stream.
Set stream_mode="custom" to receive the custom data in the stream.
Extended example: streaming arbitrary chat model
Copy
Ask AI
import operatorimport jsonfrom typing import TypedDictfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, STARTfrom openai import AsyncOpenAIopenai_client = AsyncOpenAI()model_name = "gpt-4o-mini"async def stream_tokens(model_name: str, messages: list[dict]): response = await openai_client.chat.completions.create( messages=messages, model=model_name, stream=True ) role = None async for chunk in response: delta = chunk.choices[0].delta if delta.role is not None: role = delta.role if delta.content: yield {"role": role, "content": delta.content}# this is our toolasync def get_items(place: str) -> str: """Use this tool to list items one might find in a place you're asked about.""" writer = get_stream_writer() response = "" async for msg_chunk in stream_tokens( model_name, [ { "role": "user", "content": ( "Can you tell me what kind of items " f"i might find in the following place: '{place}'. " "List at least 3 such items separating them by a comma. " "And include a brief description of each item." ), } ], ): response += msg_chunk["content"] writer(msg_chunk) return responseclass State(TypedDict): messages: Annotated[list[dict], operator.add]# this is the tool-calling graph nodeasync def call_tool(state: State): ai_message = state["messages"][-1] tool_call = ai_message["tool_calls"][-1] function_name = tool_call["function"]["name"] if function_name != "get_items": raise ValueError(f"Tool {function_name} not supported") function_arguments = tool_call["function"]["arguments"] arguments = json.loads(function_arguments) function_response = await get_items(**arguments) tool_message = { "tool_call_id": tool_call["id"], "role": "tool", "name": function_name, "content": function_response, } return {"messages": [tool_message]}graph = ( StateGraph(State) .add_node(call_tool) .add_edge(START, "call_tool") .compile())
Let’s invoke the graph with an AI message that includes a tool call:
If your application mixes models that support streaming with those that do not, you may need to explicitly disable streaming for
models that do not support it.Set disable_streaming=True when initializing the model.
In Python versions < 3.11, asyncio tasks do not support the context parameter.
This limits LangGraph ability to automatically propagate context, and affects LangGraph’s streaming mechanisms in two key ways:
You must explicitly pass RunnableConfig into async LLM calls (e.g., ainvoke()), as callbacks are not automatically propagated.
You cannot use get_stream_writer() in async nodes or tools — you must pass a writer argument directly.
Extended example: async LLM call with manual config