메인 콘텐츠로 건너뛰기
Alpha Notice: These docs cover the v1-alpha release. Content is incomplete and subject to change.For the latest stable version, see the current LangGraph Python or LangGraph JavaScript docs.
Functional API를 사용하면 기존 코드를 최소한으로 수정하면서 LangGraph의 핵심 기능인 영속성, 메모리, human-in-the-loop, 스트리밍을 애플리케이션에 추가할 수 있습니다. 이 API는 if 문, for 루프, 함수 호출과 같은 표준 언어 프리미티브를 사용하여 분기 및 제어 흐름을 처리하는 기존 코드에 이러한 기능을 통합하도록 설계되었습니다. 코드를 명시적인 파이프라인이나 DAG로 재구성해야 하는 많은 데이터 오케스트레이션 프레임워크와 달리, Functional API는 엄격한 실행 모델을 강제하지 않고도 이러한 기능을 통합할 수 있게 해줍니다. Functional API는 두 가지 핵심 구성 요소를 사용합니다:
  • @entrypoint – 함수를 워크플로의 시작점으로 표시하여 로직을 캡슐화하고 장시간 실행되는 작업 및 인터럽트 처리를 포함한 실행 흐름을 관리합니다.
  • @task – API 호출이나 데이터 처리 단계와 같은 개별 작업 단위를 나타내며, entrypoint 내에서 비동기적으로 실행될 수 있습니다. Task는 await하거나 동기적으로 해결할 수 있는 future와 유사한 객체를 반환합니다.
이를 통해 상태 관리 및 스트리밍을 갖춘 워크플로를 구축하기 위한 최소한의 추상화를 제공합니다.
Functional API 사용 방법에 대한 정보는 Functional API 사용하기를 참조하세요.

Functional API vs. Graph API

선언적 접근 방식을 선호하는 사용자를 위해 LangGraph의 Graph API는 Graph 패러다임을 사용하여 워크플로를 정의할 수 있게 해줍니다. 두 API는 동일한 기본 런타임을 공유하므로 동일한 애플리케이션에서 함께 사용할 수 있습니다. 주요 차이점은 다음과 같습니다:
  • 제어 흐름: Functional API는 그래프 구조에 대해 생각할 필요가 없습니다. 표준 Python 구조를 사용하여 워크플로를 정의할 수 있습니다. 이렇게 하면 일반적으로 작성해야 하는 코드의 양을 줄일 수 있습니다.
  • 단기 메모리: GraphAPIState를 선언해야 하며 그래프 상태 업데이트를 관리하기 위해 reducer를 정의해야 할 수 있습니다. @entrypoint@task는 상태가 함수 범위로 제한되고 함수 간에 공유되지 않으므로 명시적인 상태 관리가 필요하지 않습니다.
  • 체크포인팅: 두 API 모두 체크포인트를 생성하고 사용합니다. Graph API에서는 모든 superstep 이후에 새 체크포인트가 생성됩니다. Functional API에서는 task가 실행될 때 새 체크포인트를 생성하는 대신 해당 entrypoint와 연결된 기존 체크포인트에 결과가 저장됩니다.
  • 시각화: Graph API를 사용하면 워크플로를 그래프로 쉽게 시각화할 수 있어 디버깅, 워크플로 이해, 다른 사람과의 공유에 유용합니다. Functional API는 런타임에 그래프가 동적으로 생성되므로 시각화를 지원하지 않습니다.

예제

아래에서는 에세이를 작성하고 사람의 검토를 요청하기 위해 인터럽트하는 간단한 애플리케이션을 보여줍니다.
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt

@task
def write_essay(topic: str) -> str:
    """주어진 주제에 대한 에세이를 작성합니다."""
    time.sleep(1) # 장시간 실행되는 작업의 플레이스홀더입니다.
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """에세이를 작성하고 검토를 요청하는 간단한 워크플로입니다."""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # interrupt에 인자로 제공되는 모든 json 직렬화 가능한 페이로드입니다.
        # 워크플로에서 데이터를 스트리밍할 때 클라이언트 측에서 Interrupt로 표시됩니다.
        "essay": essay, # 검토를 원하는 에세이입니다.
        # 필요한 추가 정보를 추가할 수 있습니다.
        # 예를 들어, 일부 지침과 함께 "action"이라는 키를 도입합니다.
        "action": "Please approve/reject the essay",
    })

    return {
        "essay": essay, # 생성된 에세이
        "is_approved": is_approved, # HIL의 응답
    }
이 워크플로는 “cat” 주제에 대한 에세이를 작성한 다음 사람의 검토를 받기 위해 일시 중지됩니다. 워크플로는 검토가 제공될 때까지 무기한 중단될 수 있습니다.워크플로가 재개되면 처음부터 실행되지만, writeEssay task의 결과가 이미 저장되었기 때문에 task 결과는 다시 계산되지 않고 체크포인트에서 로드됩니다.
import time
import uuid
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver


@task
def write_essay(topic: str) -> str:
    """주어진 주제에 대한 에세이를 작성합니다."""
    time.sleep(1)  # 장시간 실행되는 작업의 플레이스홀더입니다.
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """에세이를 작성하고 검토를 요청하는 간단한 워크플로입니다."""
    essay = write_essay("cat").result()
    is_approved = interrupt(
        {
            # interrupt에 인자로 제공되는 모든 json 직렬화 가능한 페이로드입니다.
            # 워크플로에서 데이터를 스트리밍할 때 클라이언트 측에서 Interrupt로 표시됩니다.
            "essay": essay,  # 검토를 원하는 에세이입니다.
            # 필요한 추가 정보를 추가할 수 있습니다.
            # 예를 들어, 일부 지침과 함께 "action"이라는 키를 도입합니다.
            "action": "Please approve/reject the essay",
        }
    )
    return {
        "essay": essay,  # 생성된 에세이
        "is_approved": is_approved,  # HIL의 응답
    }


thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
for item in workflow.stream("cat", config):
    print(item)
# > {'write_essay': 'An essay about topic: cat'}
# > {
# >     '__interrupt__': (
# >        Interrupt(
# >            value={
# >                'essay': 'An essay about topic: cat',
# >                'action': 'Please approve/reject the essay'
# >            },
# >            id='b9b2b9d788f482663ced6dc755c9e981'
# >        ),
# >    )
# > }
에세이가 작성되어 검토 준비가 완료되었습니다. 검토가 제공되면 워크플로를 재개할 수 있습니다:
from langgraph.types import Command

# 사용자로부터 검토를 받습니다 (예: UI를 통해)
# 이 경우 bool을 사용하지만, json 직렬화 가능한 모든 값이 될 수 있습니다.
human_review = True

for item in workflow.stream(Command(resume=human_review), config):
    print(item)
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
워크플로가 완료되고 검토가 에세이에 추가되었습니다.

Entrypoint

@entrypoint 데코레이터를 사용하여 함수로부터 워크플로를 생성할 수 있습니다. 이는 워크플로 로직을 캡슐화하고 장시간 실행되는 작업인터럽트 처리를 포함한 실행 흐름을 관리합니다.

정의

entrypoint@entrypoint 데코레이터로 함수를 장식하여 정의됩니다. 함수는 단일 위치 인자를 받아야 하며, 이는 워크플로 입력으로 사용됩니다. 여러 데이터를 전달해야 하는 경우 첫 번째 인자의 입력 타입으로 딕셔너리를 사용하세요. 함수를 entrypoint로 장식하면 Pregel 인스턴스가 생성되며, 이는 워크플로 실행을 관리하는 데 도움이 됩니다(예: 스트리밍, 재개, 체크포인팅 처리). 일반적으로 체크포인터@entrypoint 데코레이터에 전달하여 영속성을 활성화하고 human-in-the-loop과 같은 기능을 사용하고자 할 것입니다.
  • Sync
  • Async
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: dict) -> int:
    # API 호출과 같은 장시간 실행되는 작업을 포함할 수 있는 일부 로직이며,
    # human-in-the-loop을 위해 중단될 수 있습니다.
    ...
    return result
직렬화 체크포인팅을 지원하려면 entrypoint의 입력출력이 JSON 직렬화 가능해야 합니다. 자세한 내용은 직렬화 섹션을 참조하세요.

주입 가능한 매개변수

entrypoint를 선언할 때 런타임에 자동으로 주입될 추가 매개변수에 대한 액세스를 요청할 수 있습니다. 이러한 매개변수는 다음과 같습니다:
매개변수설명
previous주어진 스레드에 대한 이전 checkpoint와 연결된 상태에 액세스합니다. 단기 메모리를 참조하세요.
store[BaseStore][langgraph.store.base.BaseStore]의 인스턴스입니다. 장기 메모리에 유용합니다.
writerAsync Python < 3.11에서 작업할 때 StreamWriter에 액세스하는 데 사용합니다. 자세한 내용은 Functional API를 사용한 스트리밍을 참조하세요.
config런타임 구성에 액세스하기 위한 것입니다. 정보는 RunnableConfig를 참조하세요.
적절한 이름과 타입 어노테이션으로 매개변수를 선언하세요.
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore

in_memory_store = InMemoryStore(...)  # 장기 메모리를 위한 InMemoryStore 인스턴스

@entrypoint(
    checkpointer=checkpointer,  # 체크포인터를 지정합니다.
    store=in_memory_store  # 스토어를 지정합니다.
)
def my_workflow(
    some_input: dict,  # 입력 (예: `invoke`를 통해 전달됨)
    *,
    previous: Any = None, # 단기 메모리용
    store: BaseStore,  # 장기 메모리용
    writer: StreamWriter,  # 사용자 정의 데이터 스트리밍용
    config: RunnableConfig  # entrypoint에 전달된 구성에 액세스하기 위한 것
) -> ...:

실행

@entrypoint를 사용하면 invoke, ainvoke, stream, astream 메서드를 사용하여 실행할 수 있는 Pregel 객체를 생성합니다.
  • Invoke
  • Async Invoke
  • Stream
  • Async Stream
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}
my_workflow.invoke(some_input, config)  # 결과를 동기적으로 대기합니다.

재개

interrupt 후 실행을 재개하려면 Command 프리미티브에 resume 값을 전달하면 됩니다.
  • Invoke
  • Async Invoke
  • Stream
  • Async Stream
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(Command(resume=some_resume_value), config)
오류 후 재개 오류 후 재개하려면 동일한 thread id(config)로 None을 사용하여 entrypoint를 실행합니다. 이는 기본 오류가 해결되었고 실행이 성공적으로 진행될 수 있다고 가정합니다.
  • Invoke
  • Async Invoke
  • Stream
  • Async Stream

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(None, config)

단기 메모리

entrypointcheckpointer와 함께 정의되면 체크포인트에서 동일한 thread id에 대한 연속적인 호출 사이의 정보를 저장합니다. 이를 통해 previous 매개변수를 사용하여 이전 호출의 상태에 액세스할 수 있습니다. 기본적으로 previous 매개변수는 이전 호출의 반환 값입니다.
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
    previous = previous or 0
    return number + previous

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(1, config)  # 1 (previous는 None이었습니다)
my_workflow.invoke(2, config)  # 3 (previous는 이전 호출의 1이었습니다)

entrypoint.final

entrypoint.final은 entrypoint에서 반환할 수 있는 특수 프리미티브로, 체크포인트에 저장되는 값entrypoint의 반환 값분리할 수 있게 해줍니다. 첫 번째 값은 entrypoint의 반환 값이고, 두 번째 값은 체크포인트에 저장될 값입니다. 타입 어노테이션은 entrypoint.final[return_type, save_type]입니다.
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # 호출자에게 previous 값을 반환하고,
    # 2 * number를 체크포인트에 저장합니다. 이는 다음 호출의
    # `previous` 매개변수에 사용됩니다.
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "1"
    }
}

my_workflow.invoke(3, config)  # 0 (previous는 None이었습니다)
my_workflow.invoke(1, config)  # 6 (previous는 이전 호출의 3 * 2였습니다)

Task

task는 API 호출이나 데이터 처리 단계와 같은 개별 작업 단위를 나타냅니다. 두 가지 주요 특성이 있습니다:
  • 비동기 실행: Task는 비동기적으로 실행되도록 설계되어 여러 작업이 차단 없이 동시에 실행될 수 있습니다.
  • 체크포인팅: Task 결과는 체크포인트에 저장되어 마지막으로 저장된 상태에서 워크플로를 재개할 수 있습니다. (자세한 내용은 영속성을 참조하세요).

정의

Task는 일반 Python 함수를 래핑하는 @task 데코레이터를 사용하여 정의됩니다.
from langgraph.func import task

@task()
def slow_computation(input_value):
    # 장시간 실행되는 작업을 시뮬레이션합니다.
    ...
    return result
직렬화 체크포인팅을 지원하려면 task의 출력이 JSON 직렬화 가능해야 합니다.

실행

Taskentrypoint, 다른 task, 또는 state graph node 내에서만 호출할 수 있습니다. Task는 메인 애플리케이션 코드에서 직접 호출할 수 없습니다. task를 호출하면 future 객체와 함께 즉시 반환됩니다. Future는 나중에 사용할 수 있는 결과의 플레이스홀더입니다. task의 결과를 얻으려면 동기적으로 대기(result() 사용)하거나 비동기적으로 대기(await 사용)할 수 있습니다.
  • Synchronous Invocation
  • Asynchronous Invocation
@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: int) -> int:
    future = slow_computation(some_input)
    return future.result()  # 결과를 동기적으로 대기합니다.

task를 사용해야 하는 경우

Task는 다음 시나리오에서 유용합니다:
  • 체크포인팅: 장시간 실행되는 작업의 결과를 체크포인트에 저장하여 워크플로를 재개할 때 다시 계산할 필요가 없도록 하고자 할 때.
  • Human-in-the-loop: 사람의 개입이 필요한 워크플로를 구축하는 경우, 워크플로가 올바르게 재개될 수 있도록 모든 무작위성(예: API 호출)을 캡슐화하기 위해 task를 사용해야 합니다. 자세한 내용은 결정론 섹션을 참조하세요.
  • 병렬 실행: I/O 바운드 작업의 경우 task는 병렬 실행을 가능하게 하여 여러 작업이 차단 없이 동시에 실행될 수 있게 합니다(예: 여러 API 호출).
  • 관찰 가능성: 작업을 task로 래핑하면 워크플로의 진행 상황을 추적하고 LangSmith를 사용하여 개별 작업의 실행을 모니터링할 수 있는 방법을 제공합니다.
  • 재시도 가능한 작업: 실패나 불일치를 처리하기 위해 작업을 재시도해야 할 때, task는 재시도 로직을 캡슐화하고 관리하는 방법을 제공합니다.

직렬화

LangGraph에서 직렬화에는 두 가지 주요 측면이 있습니다:
  1. entrypoint 입력 및 출력은 JSON 직렬화 가능해야 합니다.
  2. task 출력은 JSON 직렬화 가능해야 합니다.
이러한 요구 사항은 체크포인팅 및 워크플로 재개를 가능하게 하기 위해 필요합니다. 입력 및 출력이 직렬화 가능하도록 딕셔너리, 리스트, 문자열, 숫자, 불리언과 같은 Python 프리미티브를 사용하세요. 직렬화는 task 결과 및 중간 값과 같은 워크플로 상태를 안정적으로 저장하고 복원할 수 있도록 보장합니다. 이는 human-in-the-loop 상호 작용, 내결함성 및 병렬 실행을 가능하게 하는 데 중요합니다. 워크플로가 체크포인터로 구성된 경우 직렬화할 수 없는 입력이나 출력을 제공하면 런타임 오류가 발생합니다.

결정론

human-in-the-loop과 같은 기능을 활용하려면 모든 무작위성을 task 내부에 캡슐화해야 합니다. 이렇게 하면 실행이 중지되고(예: human-in-the-loop의 경우) 재개될 때 task 결과가 비결정적이더라도 동일한 _단계 시퀀스_를 따르게 됩니다. LangGraph는 실행되는 대로 tasksubgraph 결과를 유지하여 이러한 동작을 달성합니다. 잘 설계된 워크플로는 실행을 재개할 때 동일한 _단계 시퀀스_를 따르도록 보장하여 이전에 계산된 결과를 다시 실행하지 않고 올바르게 검색할 수 있게 합니다. 이는 장시간 실행되는 task 또는 비결정적 결과를 가진 task에 특히 유용합니다. 이전에 수행한 작업을 반복하지 않고 본질적으로 동일한 위치에서 재개할 수 있기 때문입니다. 워크플로의 다른 실행은 다른 결과를 생성할 수 있지만, 특정 실행을 재개하면 항상 동일한 기록된 단계 시퀀스를 따라야 합니다. 이를 통해 LangGraph는 그래프가 중단되기 전에 실행된 tasksubgraph 결과를 효율적으로 조회하고 다시 계산하지 않을 수 있습니다.

멱등성

멱등성은 동일한 작업을 여러 번 실행해도 동일한 결과를 생성하도록 보장합니다. 이는 실패로 인해 단계가 재실행되는 경우 중복 API 호출 및 중복 처리를 방지하는 데 도움이 됩니다. 체크포인팅을 위해 항상 API 호출을 task 함수 내부에 배치하고, 재실행 시 멱등적이 되도록 설계하세요. task가 시작되었지만 성공적으로 완료되지 않으면 재실행이 발생할 수 있습니다. 그런 다음 워크플로가 재개되면 task가 다시 실행됩니다. 중복을 방지하기 위해 멱등성 키를 사용하거나 기존 결과를 확인하세요.

일반적인 함정

부작용 처리

워크플로를 재개할 때 여러 번 실행되지 않도록 부작용(예: 파일 쓰기, 이메일 전송)을 task로 캡슐화하세요.
  • Incorrect
  • Correct
이 예제에서는 부작용(파일 쓰기)이 워크플로에 직접 포함되어 있으므로 워크플로를 재개할 때 두 번째로 실행됩니다.
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    # 이 코드는 워크플로를 재개할 때 두 번째로 실행됩니다.
    # 이는 원하는 것이 아닐 가능성이 높습니다.
    with open("output.txt", "w") as f:
        f.write("Side effect executed")
    value = interrupt("question")
    return value

비결정적 제어 흐름

매번 다른 결과를 제공할 수 있는 작업(현재 시간 가져오기 또는 난수 생성 등)은 task로 캡슐화하여 재개 시 동일한 결과가 반환되도록 해야 합니다.
  • task 내: 난수 가져오기 (5) → 인터럽트 → 재개 → (다시 5 반환) → …
  • task 외부: 난수 가져오기 (5) → 인터럽트 → 재개 → 새 난수 가져오기 (7) → …
이는 여러 interrupt 호출이 있는 human-in-the-loop 워크플로를 사용할 때 특히 중요합니다. LangGraph는 각 task/entrypoint에 대한 resume 값의 목록을 유지합니다. interrupt가 발생하면 해당 resume 값과 일치합니다. 이 매칭은 엄격하게 인덱스 기반이므로 resume 값의 순서는 interrupt의 순서와 일치해야 합니다. 재개 시 실행 순서가 유지되지 않으면 하나의 interrupt 호출이 잘못된 resume 값과 일치하여 잘못된 결과를 초래할 수 있습니다. 자세한 내용은 결정론 섹션을 참조하세요.
  • Incorrect
  • Correct
이 예제에서 워크플로는 현재 시간을 사용하여 실행할 task를 결정합니다. 이는 워크플로의 결과가 실행되는 시간에 따라 달라지기 때문에 비결정적입니다.
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    t0 = inputs["t0"]
    t1 = time.time()

    delta_t = t1 - t0

    if delta_t > 1:
        result = slow_task(1).result()
        value = interrupt("question")
    else:
        result = slow_task(2).result()
        value = interrupt("question")

    return {
        "result": result,
        "value": value
    }

I