메인 콘텐츠로 건너뛰기
Alpha Notice: These docs cover the v1-alpha release. Content is incomplete and subject to change.For the latest stable version, see the current LangGraph Python or LangGraph JavaScript docs.
함수형 API를 사용하면 기존 코드를 최소한으로 수정하면서도 LangGraph의 핵심 기능인 영속성, 메모리, human-in-the-loop, 스트리밍을 애플리케이션에 추가할 수 있습니다.
함수형 API에 대한 개념적인 정보는 함수형 API를 참조하세요.

간단한 워크플로우 만들기

entrypoint를 정의할 때 입력은 함수의 첫 번째 인자로 제한됩니다. 여러 입력을 전달하려면 딕셔너리를 사용할 수 있습니다.
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = inputs["value"]
    another_value = inputs["another_value"]
    ...

my_workflow.invoke({"value": 1, "another_value": 2})
import uuid
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 숫자가 짝수인지 확인하는 작업
@task
def is_even(number: int) -> bool:
    return number % 2 == 0

# 메시지 형식을 지정하는 작업
@task
def format_message(is_even: bool) -> str:
    return "The number is even." if is_even else "The number is odd."

# 영속성을 위한 체크포인터 생성
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    """숫자를 분류하는 간단한 워크플로우입니다."""
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

# 고유한 스레드 ID로 워크플로우 실행
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
이 예제는 @task@entrypoint 데코레이터를 구문적으로 사용하는 방법을 보여줍니다. 체크포인터가 제공되면 워크플로우 결과가 체크포인터에 저장됩니다.
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

llm = init_chat_model('openai:gpt-3.5-turbo')

# 작업: LLM을 사용하여 에세이 생성
@task
def compose_essay(topic: str) -> str:
    """주어진 주제에 대한 에세이를 생성합니다."""
    return llm.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes essays."},
        {"role": "user", "content": f"Write an essay about {topic}."}
    ]).content

# 영속성을 위한 체크포인터 생성
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
    """LLM으로 에세이를 생성하는 간단한 워크플로우입니다."""
    return compose_essay(topic).result()

# 워크플로우 실행
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke("the history of flight", config=config)
print(result)

병렬 실행

작업을 동시에 호출하고 결과를 기다림으로써 병렬로 실행할 수 있습니다. 이는 IO 바운드 작업(예: LLM을 위한 API 호출)의 성능을 향상시키는 데 유용합니다.
@task
def add_one(number: int) -> int:
    return number + 1

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]
이 예제는 @task를 사용하여 여러 LLM 호출을 병렬로 실행하는 방법을 보여줍니다. 각 호출은 서로 다른 주제에 대한 단락을 생성하고, 결과는 하나의 텍스트 출력으로 결합됩니다.
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# LLM 모델 초기화
llm = init_chat_model("openai:gpt-3.5-turbo")

# 주어진 주제에 대한 단락을 생성하는 작업
@task
def generate_paragraph(topic: str) -> str:
    response = llm.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
        {"role": "user", "content": f"Write a paragraph about {topic}."}
    ])
    return response.content

# 영속성을 위한 체크포인터 생성
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    """여러 단락을 병렬로 생성하고 결합합니다."""
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 워크플로우 실행
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
이 예제는 LangGraph의 동시성 모델을 사용하여 실행 시간을 개선합니다. 특히 작업이 LLM 완성과 같은 I/O를 포함할 때 효과적입니다.

그래프 호출하기

함수형 API그래프 API는 동일한 기본 런타임을 공유하므로 같은 애플리케이션에서 함께 사용할 수 있습니다.
from langgraph.func import entrypoint
from langgraph.graph import StateGraph

builder = StateGraph()
...
some_graph = builder.compile()

@entrypoint()
def some_workflow(some_input: dict) -> int:
    # 그래프 API로 정의된 그래프 호출
    result_1 = some_graph.invoke(...)
    # 그래프 API로 정의된 다른 그래프 호출
    result_2 = another_graph.invoke(...)
    return {
        "result_1": result_1,
        "result_2": result_2
    }
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

# 공유 상태 타입 정의
class State(TypedDict):
    foo: int

# 간단한 변환 노드 정의
def double(state: State) -> State:
    return {"foo": state["foo"] * 2}

# 그래프 API를 사용하여 그래프 구축
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()

# 함수형 API 워크플로우 정의
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
    result = graph.invoke({"foo": x})
    return {"bar": result["foo"]}

# 워크플로우 실행
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(workflow.invoke(5, config=config))  # 출력: {'bar': 10}

다른 엔트리포인트 호출하기

entrypoint 또는 task 내에서 다른 entrypoint를 호출할 수 있습니다.
@entrypoint() # 부모 엔트리포인트의 체크포인터를 자동으로 사용합니다
def some_other_workflow(inputs: dict) -> int:
    return inputs["value"]

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = some_other_workflow.invoke({"value": 1})
    return value
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

# 체크포인터 초기화
checkpointer = InMemorySaver()

# 숫자를 곱하는 재사용 가능한 서브 워크플로우
@entrypoint()
def multiply(inputs: dict) -> int:
    return inputs["a"] * inputs["b"]

# 서브 워크플로우를 호출하는 메인 워크플로우
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
    result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
    return {"product": result}

# 메인 워크플로우 실행
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
print(main.invoke({"x": 6, "y": 7}, config=config))  # 출력: {'product': 42}

스트리밍

함수형 API그래프 API와 동일한 스트리밍 메커니즘을 사용합니다. 자세한 내용은 스트리밍 가이드 섹션을 참조하세요. 업데이트와 사용자 정의 데이터를 모두 스트리밍하는 스트리밍 API 사용 예제입니다.
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer # (1)!

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer() # (2)!
    writer("Started processing") # (3)!
    result = inputs["x"] * 2
    writer(f"Result is {result}") # (4)!
    return result

config = {"configurable": {"thread_id": "abc"}}

for mode, chunk in main.stream( # (5)!
    {"x": 5},
    stream_mode=["custom", "updates"], # (6)!
    config=config
):
    print(f"{mode}: {chunk}")
  1. langgraph.config에서 get_stream_writer를 가져옵니다.
  2. 엔트리포인트 내에서 스트림 라이터 인스턴스를 가져옵니다.
  3. 계산이 시작되기 전에 사용자 정의 데이터를 출력합니다.
  4. 결과를 계산한 후 다른 사용자 정의 메시지를 출력합니다.
  5. .stream()을 사용하여 스트리밍된 출력을 처리합니다.
  6. 사용할 스트리밍 모드를 지정합니다.
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Python < 3.11에서 비동기 사용 Python < 3.11을 사용하고 비동기 코드를 작성하는 경우 get_stream_writer()를 사용하면 작동하지 않습니다. 대신 StreamWriter 클래스를 직접 사용하세요. 자세한 내용은 Python < 3.11에서 비동기를 참조하세요.
from langgraph.types import StreamWriter

@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:
...

재시도 정책

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy

# 이 변수는 네트워크 장애를 시뮬레이션하기 위한 데모용입니다.
# 실제 코드에서는 사용하지 않습니다.
attempts = 0

# RetryPolicy를 ValueError에 대해 재시도하도록 구성합니다.
# 기본 RetryPolicy는 특정 네트워크 오류를 재시도하도록 최적화되어 있습니다.
retry_policy = RetryPolicy(retry_on=ValueError)

@task(retry_policy=retry_policy)
def get_info():
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError('Failure')
    return "OK"

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
    return get_info().result()

config = {
    "configurable": {
        "thread_id": "1"
    }
}

main.invoke({'any_input': 'foobar'}, config=config)
'OK'

작업 캐싱

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy


@task(cache_policy=CachePolicy(ttl=120))  # (1)!
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2


@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()
    return {"result1": result1, "result2": result2}


for chunk in main.stream({"x": 5}, stream_mode="updates"):
    print(chunk)

#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
  1. ttl은 초 단위로 지정됩니다. 이 시간이 지나면 캐시가 무효화됩니다.

오류 후 재개하기

import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter

# 이 변수는 네트워크 장애를 시뮬레이션하기 위한 데모용입니다.
# 실제 코드에서는 사용하지 않습니다.
attempts = 0

@task()
def get_info():
    """
    성공하기 전에 한 번 실패하는 작업을 시뮬레이션합니다.
    첫 번째 시도에서는 예외를 발생시키고, 이후 시도에서는 "OK"를 반환합니다.
    """
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError("Failure")  # 첫 번째 시도에서 실패 시뮬레이션
    return "OK"

# 영속성을 위한 인메모리 체크포인터 초기화
checkpointer = InMemorySaver()

@task
def slow_task():
    """
    1초 지연을 도입하여 느리게 실행되는 작업을 시뮬레이션합니다.
    """
    time.sleep(1)
    return "Ran slow task."

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
    """
    slow_task와 get_info 작업을 순차적으로 실행하는 메인 워크플로우 함수입니다.

    매개변수:
    - inputs: 워크플로우 입력 값을 포함하는 딕셔너리입니다.
    - writer: 사용자 정의 데이터를 스트리밍하기 위한 StreamWriter입니다.

    워크플로우는 먼저 `slow_task`를 실행한 다음 `get_info`를 실행하려고 시도하는데,
    첫 번째 호출에서는 실패합니다.
    """
    slow_task_result = slow_task().result()  # slow_task에 대한 블로킹 호출
    get_info().result()  # 첫 번째 시도에서 여기서 예외가 발생합니다
    return slow_task_result

# 고유한 스레드 식별자를 사용한 워크플로우 실행 구성
config = {
    "configurable": {
        "thread_id": "1"  # 워크플로우 실행을 추적하기 위한 고유 식별자
    }
}

# 이 호출은 slow_task 실행으로 인해 약 1초가 소요됩니다
try:
    # get_info 작업이 실패하여 첫 번째 호출에서 예외가 발생합니다
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass  # 실패를 우아하게 처리
실행을 재개할 때 slow_task의 결과가 이미 체크포인트에 저장되어 있으므로 다시 실행할 필요가 없습니다.
main.invoke(None, config=config)
'Ran slow task.'

Human-in-the-loop

함수형 API는 interrupt 함수와 Command 프리미티브를 사용하여 human-in-the-loop 워크플로우를 지원합니다.

기본 human-in-the-loop 워크플로우

세 개의 작업을 생성하겠습니다:
  1. "bar" 추가하기.
  2. 사용자 입력을 위해 일시 정지. 재개할 때 사용자 입력 추가하기.
  3. "qux" 추가하기.
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """bar를 추가합니다."""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """사용자 입력을 추가합니다."""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"

@task
def step_3(input_query):
    """qux를 추가합니다."""
    return f"{input_query} qux"
이제 이러한 작업을 entrypoint에서 구성할 수 있습니다:
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3
interrupt()는 작업 내에서 호출되어 사람이 이전 작업의 출력을 검토하고 편집할 수 있게 합니다. 이 경우 step_1과 같은 이전 작업의 결과는 유지되므로 interrupt 이후 다시 실행되지 않습니다. 쿼리 문자열을 보내 보겠습니다:
config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
step_1 이후 interrupt로 일시 정지했습니다. interrupt는 실행을 재개하기 위한 지침을 제공합니다. 재개하려면 human_feedback 작업에서 예상하는 데이터를 포함하는 Command를 발행합니다.
# 실행 계속
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
재개한 후 실행은 나머지 단계를 거쳐 예상대로 종료됩니다.

도구 호출 검토하기

실행 전에 도구 호출을 검토하려면 interrupt를 호출하는 review_tool_call 함수를 추가합니다. 이 함수가 호출되면 재개 명령을 발행할 때까지 실행이 일시 정지됩니다. 도구 호출이 주어지면 함수는 사람의 검토를 위해 interrupt합니다. 이 시점에서 다음 중 하나를 수행할 수 있습니다:
  • 도구 호출 수락
  • 도구 호출 수정 및 계속
  • 사용자 정의 도구 메시지 생성(예: 도구 호출을 다시 포맷하도록 모델에 지시)
from typing import Union

def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
    """도구 호출을 검토하고 검증된 버전을 반환합니다."""
    human_review = interrupt(
        {
            "question": "Is this correct?",
            "tool_call": tool_call,
        }
    )
    review_action = human_review["action"]
    review_data = human_review.get("data")
    if review_action == "continue":
        return tool_call
    elif review_action == "update":
        updated_tool_call = {**tool_call, **{"args": review_data}}
        return updated_tool_call
    elif review_action == "feedback":
        return ToolMessage(
            content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
        )
이제 entrypoint를 업데이트하여 생성된 도구 호출을 검토할 수 있습니다. 도구 호출이 수락되거나 수정되면 이전과 같은 방식으로 실행합니다. 그렇지 않으면 사람이 제공한 ToolMessage를 추가하기만 하면 됩니다. 이전 작업의 결과(이 경우 초기 모델 호출)는 유지되므로 interrupt 이후 다시 실행되지 않습니다.
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt


checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    llm_response = call_model(messages).result()
    while True:
        if not llm_response.tool_calls:
            break

        # 도구 호출 검토
        tool_results = []
        tool_calls = []
        for i, tool_call in enumerate(llm_response.tool_calls):
            review = review_tool_call(tool_call)
            if isinstance(review, ToolMessage):
                tool_results.append(review)
            else:  # 검증된 도구 호출인 경우
                tool_calls.append(review)
                if review != tool_call:
                    llm_response.tool_calls[i] = review  # 메시지 업데이트

        # 나머지 도구 호출 실행
        tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
        remaining_tool_results = [fut.result() for fut in tool_result_futures]

        # 메시지 목록에 추가
        messages = add_messages(
            messages,
            [llm_response, *tool_results, *remaining_tool_results],
        )

        # 모델 다시 호출
        llm_response = call_model(messages).result()

    # 최종 응답 생성
    messages = add_messages(messages, llm_response)
    return entrypoint.final(value=llm_response, save=messages)

단기 메모리

단기 메모리를 사용하면 동일한 스레드 ID의 서로 다른 호출 간에 정보를 저장할 수 있습니다. 자세한 내용은 단기 메모리를 참조하세요.

체크포인트 관리

체크포인터에 저장된 정보를 보고 삭제할 수 있습니다.

스레드 상태 보기

config = {
    "configurable": {
        "thread_id": "1",
        # 선택적으로 특정 체크포인트의 ID를 제공할 수 있습니다.
        # 그렇지 않으면 최신 체크포인트가 표시됩니다.
        # "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"

    }
}
graph.get_state(config)
StateSnapshot(
    values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]}, next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
    metadata={
        'source': 'loop',
        'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},
        'step': 4,
        'parents': {},
        'thread_id': '1'
    },
    created_at='2025-05-05T16:01:24.680462+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
    tasks=(),
    interrupts=()
)

스레드 히스토리 보기

config = {
    "configurable": {
        "thread_id": "1"
    }
}
list(graph.get_state_history(config))
[
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:24.680462+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},
        next=('call_model',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863421+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=('__start__',),
        config={...},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "what's my name?"}]}}, 'step': 2, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863173+00:00',
        parent_config={...}
        tasks=(PregelTask(id='24ba39d6-6db1-4c9b-f4c5-682aeaf38dcd', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "what's my name?"}]}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=(),
        config={...},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}}, 'step': 1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.862295+00:00',
        parent_config={...}
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob")]},
        next=('call_model',),
        config={...},
        metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.278960+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8cbd75e0-3720-b056-04f7-71ac805140a0', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-0870-6ce2-bfff-1f3f14c3e565'}},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}}, 'step': -1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.277497+00:00',
        parent_config=None,
        tasks=(PregelTask(id='d458367b-8265-812c-18e2-33001d199ce6', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}),),
        interrupts=()
    )
]

반환 값과 저장 값 분리하기

entrypoint.final을 사용하면 호출자에게 반환되는 값과 체크포인트에 저장되는 값을 분리할 수 있습니다. 다음과 같은 경우에 유용합니다:
  • 계산된 결과(예: 요약 또는 상태)를 반환하지만 다음 호출에 사용할 다른 내부 값을 저장하려는 경우.
  • 다음 실행에서 previous 매개변수로 전달될 값을 제어해야 하는 경우.
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    # 호출자에게는 *이전* 값을 반환하지만 체크포인트에는 *새* 합계를 저장합니다.
    return entrypoint.final(value=previous, save=total)

config = {"configurable": {"thread_id": "my-thread"}}

print(accumulate.invoke(1, config=config))  # 0
print(accumulate.invoke(2, config=config))  # 1
print(accumulate.invoke(3, config=config))  # 3

챗봇 예제

함수형 API와 InMemorySaver 체크포인터를 사용하는 간단한 챗봇 예제입니다. 봇은 이전 대화를 기억하고 중단한 부분부터 계속할 수 있습니다.
from langchain.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-5-sonnet-latest")

@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

장기 메모리

장기 메모리를 사용하면 서로 다른 스레드 ID 간에 정보를 저장할 수 있습니다. 이는 한 대화에서 특정 사용자에 대한 정보를 학습하고 다른 대화에서 사용하는 데 유용할 수 있습니다.

워크플로우

  • 함수형 API를 사용하여 워크플로우를 구축하는 방법에 대한 더 많은 예제는 워크플로우와 에이전트 가이드를 참조하세요.

다른 라이브러리와 통합하기


I