메인 콘텐츠로 건너뛰기
Alpha Notice: These docs cover the v1-alpha release. Content is incomplete and subject to change.For the latest stable version, see the current LangGraph Python or LangGraph JavaScript docs.
LangGraph는 실시간 업데이트를 제공하기 위한 스트리밍 시스템을 구현합니다. 스트리밍은 LLM 기반 애플리케이션의 응답성을 향상시키는 데 매우 중요합니다. 완전한 응답이 준비되기 전에도 출력을 점진적으로 표시함으로써, 스트리밍은 특히 LLM의 지연 시간을 처리할 때 사용자 경험(UX)을 크게 개선합니다. LangGraph 스트리밍으로 가능한 것들:

지원되는 스트림 모드

다음 스트림 모드 중 하나 이상을 리스트로 stream() 또는 astream() 메서드에 전달합니다:
모드설명
values그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
updates그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다. 동일한 단계에서 여러 업데이트가 발생하면 (예: 여러 노드가 실행됨), 해당 업데이트는 개별적으로 스트리밍됩니다.
custom그래프 노드 내부에서 커스텀 데이터를 스트리밍합니다.
messagesLLM이 호출되는 모든 그래프 노드에서 2-튜플 (LLM 토큰, 메타데이터)을 스트리밍합니다.
debug그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다.

기본 사용 예제

LangGraph 그래프는 스트리밍 출력을 이터레이터로 생성하기 위해 .stream() (동기) 및 .astream() (비동기) 메서드를 제공합니다.
for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# stream() 메서드는 스트리밍 출력을 생성하는 이터레이터를 반환합니다
for chunk in graph.stream(
    {"topic": "ice cream"},
    # 각 노드 후 그래프 상태에 대한 업데이트만 스트리밍하려면 stream_mode="updates"로 설정합니다
    # 다른 스트림 모드도 사용할 수 있습니다. 지원되는 스트림 모드를 참조하세요
    stream_mode="updates",
):
    print(chunk)
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

여러 모드 스트리밍

stream_mode 매개변수에 리스트를 전달하여 여러 모드를 동시에 스트리밍할 수 있습니다. 스트리밍되는 출력은 (mode, chunk) 튜플이 되며, 여기서 mode는 스트림 모드의 이름이고 chunk는 해당 모드에서 스트리밍된 데이터입니다.
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

그래프 상태 스트리밍

updatesvalues 스트림 모드를 사용하여 그래프 실행 시 상태를 스트리밍합니다.
  • updates는 그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다.
  • values는 그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
  • updates
  • values
각 단계 후 노드가 반환한 상태 업데이트만 스트리밍하려면 이 모드를 사용합니다. 스트리밍되는 출력에는 노드의 이름과 업데이트가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",
):
    print(chunk)

서브그래프 출력 스트리밍

스트리밍되는 출력에 서브그래프의 출력을 포함하려면, 부모 그래프의 .stream() 메서드에서 subgraphs=True로 설정할 수 있습니다. 이렇게 하면 부모 그래프와 모든 서브그래프의 출력이 스트리밍됩니다. 출력은 (namespace, data) 튜플로 스트리밍되며, 여기서 namespace는 서브그래프가 호출되는 노드의 경로를 가진 튜플입니다. 예: ("parent_node:<task_id>", "child_node:<task_id>")
for chunk in graph.stream(
    {"foo": "foo"},
    # 서브그래프의 출력을 스트리밍하려면 subgraphs=True로 설정합니다
    subgraphs=True,
    stream_mode="updates",
):
    print(chunk)
from langgraph.graph import START, StateGraph
from typing import TypedDict

# 서브그래프 정의
class SubgraphState(TypedDict):
    foo: str  # 이 키는 부모 그래프 상태와 공유됩니다
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 부모 그래프 정의
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # 서브그래프의 출력을 스트리밍하려면 subgraphs=True로 설정합니다
    subgraphs=True,
):
    print(chunk)
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
참고 노드 업데이트뿐만 아니라 어떤 그래프(또는 서브그래프)에서 스트리밍하고 있는지 알려주는 네임스페이스도 받고 있습니다.

디버깅

debug 스트리밍 모드를 사용하여 그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다. 스트리밍되는 출력에는 노드의 이름과 전체 상태가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",
):
    print(chunk)

LLM 토큰

messages 스트리밍 모드를 사용하여 노드, 도구, 서브그래프 또는 태스크를 포함한 그래프의 모든 부분에서 대규모 언어 모델(LLM) 출력을 토큰 단위로 스트리밍합니다. messages 모드에서 스트리밍되는 출력은 (message_chunk, metadata) 튜플입니다:
  • message_chunk: LLM에서 나온 토큰 또는 메시지 세그먼트입니다.
  • metadata: 그래프 노드 및 LLM 호출에 대한 세부 정보가 포함된 딕셔너리입니다.
LLM이 LangChain 통합으로 제공되지 않는 경우, custom 모드를 사용하여 출력을 스트리밍할 수 있습니다. 자세한 내용은 모든 LLM과 함께 사용을 참조하세요.
Python < 3.11에서 async 사용 시 수동 구성 필요 Python < 3.11에서 async 코드를 사용할 때는 적절한 스트리밍을 활성화하기 위해 명시적으로 RunnableConfigainvoke()에 전달해야 합니다. 자세한 내용은 Python < 3.11에서 Async를 참조하거나 Python 3.11+로 업그레이드하세요.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


llm = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """주제에 대한 농담을 생성하기 위해 LLM을 호출합니다"""
    # LLM이 .stream이 아닌 .invoke를 사용하여 실행되더라도 메시지 이벤트가 발생합니다
    llm_response = llm.invoke(
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": llm_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# "messages" 스트림 모드는 (message_chunk, metadata) 튜플의 이터레이터를 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

LLM 호출별 필터링

LLM 호출에 tags를 연결하여 LLM 호출별로 스트리밍된 토큰을 필터링할 수 있습니다.
from langchain.chat_models import init_chat_model

# llm_1은 "joke" 태그가 지정됩니다
llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
# llm_2는 "poem" 태그가 지정됩니다
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])

graph = ... # 이러한 LLM을 사용하는 그래프를 정의합니다

# stream_mode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
# metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    # metadata의 tags 필드로 스트리밍된 토큰을 필터링하여
    # "joke" 태그가 있는 LLM 호출의 토큰만 포함합니다
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)
from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

# joke_model은 "joke" 태그가 지정됩니다
joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"])
# poem_model은 "poem" 태그가 지정됩니다
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"])


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # 참고: Python < 3.11에서는 config를 명시적으로 전달해야 합니다
      # 그 이전에는 컨텍스트 변수 지원이 추가되지 않았기 때문입니다: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      # 컨텍스트 변수가 올바르게 전파되도록 config를 명시적으로 전달합니다
      # Python < 3.11에서 async 코드를 사용할 때 이것이 필요합니다. 자세한 내용은 async 섹션을 참조하세요
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config,
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config,
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

# stream_mode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
# metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
async for msg, metadata in graph.astream(
      {"topic": "cats"},
      stream_mode="messages",
):
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)

노드별 필터링

특정 노드에서만 토큰을 스트리밍하려면, stream_mode="messages"를 사용하고 스트리밍된 메타데이터의 langgraph_node 필드로 출력을 필터링합니다:
# "messages" 스트림 모드는 (message_chunk, metadata) 튜플을 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for msg, metadata in graph.stream(
    inputs,
    stream_mode="messages",
):
    # metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
    # 지정된 노드의 토큰만 포함합니다
    if msg.content and metadata["langgraph_node"] == "some_node_name":
        ...
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # 농담과 시를 동시에 작성합니다
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

# "messages" 스트림 모드는 (message_chunk, metadata) 튜플을 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for msg, metadata in graph.stream(
    {"topic": "cats"},
    stream_mode="messages",
):
    # metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
    # write_poem 노드의 토큰만 포함합니다
    if msg.content and metadata["langgraph_node"] == "write_poem":
        print(msg.content, end="|", flush=True)

커스텀 데이터 스트리밍

LangGraph 노드 또는 도구 내부에서 커스텀 사용자 정의 데이터를 전송하려면 다음 단계를 따르세요:
  1. get_stream_writer()를 사용하여 스트림 라이터에 액세스하고 커스텀 데이터를 발생시킵니다.
  2. .stream() 또는 .astream()을 호출할 때 stream_mode="custom"으로 설정하여 스트림에서 커스텀 데이터를 가져옵니다. 여러 모드를 결합할 수 있지만 (예: ["updates", "custom"]), 적어도 하나는 "custom"이어야 합니다.
Python < 3.11에서 async에서 get_stream_writer() 사용 불가 Python < 3.11에서 실행되는 async 코드에서는 get_stream_writer()가 작동하지 않습니다. 대신, 노드 또는 도구에 writer 매개변수를 추가하고 수동으로 전달하세요. 사용 예제는 Python < 3.11에서 Async를 참조하세요.
  • node
  • tool
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START

class State(TypedDict):
    query: str
    answer: str

def node(state: State):
    # 커스텀 데이터를 전송하기 위해 스트림 라이터를 가져옵니다
    writer = get_stream_writer()
    # 커스텀 키-값 쌍을 발생시킵니다 (예: 진행 상황 업데이트)
    writer({"custom_key": "Generating custom data inside node"})
    return {"answer": "some data"}

graph = (
    StateGraph(State)
    .add_node(node)
    .add_edge(START, "node")
    .compile()
)

inputs = {"query": "example"}

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
for chunk in graph.stream(inputs, stream_mode="custom"):
    print(chunk)

모든 LLM과 함께 사용

stream_mode="custom"을 사용하여 모든 LLM API에서 데이터를 스트리밍할 수 있습니다 — 해당 API가 LangChain 채팅 모델 인터페이스를 구현하지 않더라도 말입니다. 이를 통해 자체 스트리밍 인터페이스를 제공하는 원시 LLM 클라이언트 또는 외부 서비스를 통합할 수 있어, LangGraph를 커스텀 설정에 매우 유연하게 사용할 수 있습니다.
from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """임의의 모델을 호출하고 출력을 스트리밍하는 예제 노드"""
    # 커스텀 데이터를 전송하기 위해 스트림 라이터를 가져옵니다
    writer = get_stream_writer()
    # 청크를 생성하는 스트리밍 클라이언트가 있다고 가정합니다
    # 커스텀 스트리밍 클라이언트를 사용하여 LLM 토큰을 생성합니다
    for chunk in your_custom_streaming_client(state["topic"]):
        # writer를 사용하여 스트림에 커스텀 데이터를 전송합니다
        writer({"custom_llm_chunk": chunk})
    return {"result": "completed"}

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # 필요에 따라 다른 노드와 엣지를 추가합니다
    .compile()
)

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="custom",
):
    # 청크에는 LLM에서 스트리밍된 커스텀 데이터가 포함됩니다
    print(chunk)
import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# 이것은 우리의 도구입니다
async def get_items(place: str) -> str:
    """질문한 장소에서 찾을 수 있는 항목을 나열하기 위해 이 도구를 사용합니다."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# 이것은 도구 호출 그래프 노드입니다
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)
도구 호출을 포함하는 AI 메시지로 그래프를 호출해 봅시다:
inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)

특정 채팅 모델에 대한 스트리밍 비활성화

애플리케이션에서 스트리밍을 지원하는 모델과 지원하지 않는 모델을 혼합하여 사용하는 경우, 스트리밍을 지원하지 않는 모델에 대해 명시적으로 스트리밍을 비활성화해야 할 수 있습니다. 모델을 초기화할 때 disable_streaming=True로 설정합니다.
  • init_chat_model
  • chat model interface
from langchain.chat_models import init_chat_model

model = init_chat_model(
    "anthropic:claude-3-7-sonnet-latest",
    # 채팅 모델에 대한 스트리밍을 비활성화하려면 disable_streaming=True로 설정합니다
    disable_streaming=True
)

Python < 3.11에서 Async

Python 버전 < 3.11에서는 asyncio 태스크context 매개변수를 지원하지 않습니다. 이는 LangGraph가 컨텍스트를 자동으로 전파하는 기능을 제한하며, LangGraph의 스트리밍 메커니즘에 두 가지 주요 방식으로 영향을 미칩니다:
  1. 콜백이 자동으로 전파되지 않으므로, async LLM 호출(예: ainvoke())에 명시적으로 RunnableConfig를 전달해야 합니다.
  2. async 노드 또는 도구에서 get_stream_writer()를 사용할 수 없습니다writer 인수를 직접 전달해야 합니다.
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model

llm = init_chat_model(model="openai:gpt-4o-mini")

class State(TypedDict):
    topic: str
    joke: str

# async 노드 함수에서 config를 인수로 받습니다
async def call_model(state, config):
    topic = state["topic"]
    print("Generating joke...")
    # 적절한 컨텍스트 전파를 보장하기 위해 llm.ainvoke()에 config를 전달합니다
    joke_response = await llm.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    return {"joke": joke_response.content}

graph = (
    StateGraph(State)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# LLM 토큰을 스트리밍하려면 stream_mode="messages"로 설정합니다
async for chunk, metadata in graph.astream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if chunk.content:
        print(chunk.content, end="|", flush=True)
from typing import TypedDict
from langgraph.types import StreamWriter

class State(TypedDict):
      topic: str
      joke: str

# async 노드 또는 도구의 함수 시그니처에 writer를 인수로 추가합니다
# LangGraph는 자동으로 스트림 라이터를 함수에 전달합니다
async def generate_joke(state: State, writer: StreamWriter):
      writer({"custom_key": "Streaming custom data while generating a joke"})
      return {"joke": f"This is a joke about {state['topic']}"}

graph = (
      StateGraph(State)
      .add_node(generate_joke)
      .add_edge(START, "generate_joke")
      .compile()
)

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
async for chunk in graph.astream(
      {"topic": "ice cream"},
      stream_mode="custom",
):
      print(chunk)

I