前几篇我们做出了一个「铁矿石价格预测专家」智能体,并搬上了网页、加了流式输出。但有个硬伤:它记不住上一句话。这一篇彻底讲清楚 LangGraph 的 Checkpoint(检查点) 机制——它是什么、怎么用、数据到底存成了什么样、以及一个「会话」和 thread_id 到底是什么关系。


1. 为什么需要 Checkpoint

大模型本身是无状态的:每次调用都是一锤子买卖,它不会自动记得你上一轮说过什么。要实现「多轮对话」,本质上只有一个办法——每次请求都把历史消息重新喂给它

那历史消息存哪?谁来存?什么时候存、什么时候取?这一整套「对话状态的保存与恢复」机制,在 LangGraph 里就叫 Checkpoint,负责干这件事的组件叫 Checkpointer(检查点存储器)

它解决三件事:

能力 说明
多轮记忆 把每一轮的状态(消息列表等)存下来,下次同一会话进来时自动恢复,模型就能「接着聊」
故障恢复 / 续跑 智能体执行到一半崩了(断网、超时、进程挂掉),重启后能从最后一个检查点继续,而不是从头再来
时间旅行(time-travel) 每一步都有快照,可以回到任意历史步骤,查看甚至从那里分支重跑

一句话:Checkpointer 是智能体的「存档系统」,和游戏存档完全是一个意思——thread_id 就是「存档槽位」。


2. 核心概念:Checkpoint、Thread、Channel

先把几个名词理顺,后面看数据表就不懵了。

thread_id 标识

parent

parent

一次完整对话
(会话 / Thread)

Checkpoint #1
第1轮后的快照

Checkpoint #2
第2轮后的快照

Checkpoint #3
第3轮后的快照

channel_values
{messages: [...], ...}

  • Thread(会话):一条独立的对话时间线,用 thread_id 唯一标识。不同 thread_id 之间数据完全隔离、互不可见。
  • Checkpoint(检查点):某一时刻整个图状态的快照。智能体每推进一步(super-step)就生成一个新的 checkpoint,用 checkpoint_id 标识。它们通过 parent_checkpoint_id 串成一条链。
  • Channel(通道):图状态里的一个个「字段」。对我们的聊天智能体来说,最关键的通道就是 messages(消息列表)。Checkpoint 保存的就是所有通道在那一刻的值(channel_values)。

类比 Git:Thread 是一个仓库,Checkpoint 是一次次 commit(带 parent 指针连成链),channel_values 是这次提交时的工作区快照。


3. Checkpoint 功能使用了哪些组件

本项目要实现「多轮记忆 + 重启不丢」,并不是只加一个 checkpointer 就完事,而是存储层、智能体层、服务层、接口层、前端层几层组件配合。下面按层次说明每个组件是什么、干什么、落在哪个文件。

3.1 整体协作关系

启动与配置

存储层

智能体层

服务层

接口层

前端

thread_id

checkpointer

无传入时

set_agent

index.html
生成 thread_id

controller/api.py
/chat /chat/stream

entity/apireq.py
ChatRequest.thread_id

deepagent.py
achat / astream_chat

_build_config
组装 config

get_agent / set_agent

build_agent

create_deep_agent
CompiledStateGraph

AsyncSqliteSaver
正式持久化

InMemorySaver
回退/CLI

data/checkpoints.sqlite

lifespan
controller/__init__.py

get_checkpoint_db_path
modelConfig.py

3.2 组件清单(按层次)

存储层:真正「存档 / 读档」
组件 来源包 作用 本项目中的位置
BaseCheckpointSaver langgraph-checkpoint Checkpointer 的抽象基类,定义 setup / aget_tuple / aput / adelete_thread 等统一接口;所有存储器都实现它 deepagent.pybuild_agent(checkpointer: BaseCheckpointSaver | None) 的类型约束
AsyncSqliteSaver langgraph-checkpoint-sqlite 异步 SQLite 存储器:把每个 thread_id 的检查点写入本地 .sqlite 文件;支持 ainvoke / astream;重启进程后仍可读回历史 Web 应用在 lifespan 里创建,传给 build_agent(checkpointer=saver)
InMemorySaver langgraph-checkpoint 内存存储器:进程内保存检查点,速度快、零文件依赖,但进程结束即清空 build_agent() 未传入 checkpointer 时的默认值:checkpointer or InMemorySaver()
aiosqlite langgraph-checkpoint-sqlite 的依赖 AsyncSqliteSaver 提供异步 SQLite 连接;必须在事件循环内创建并保持存活 AsyncSqliteSaver.from_conn_string() 内部使用,应用代码不直接调用

checkpointer or InMemorySaver() 的含义:调用方传了存储器就用传入的(如 AsyncSqliteSaver),没传(None)就自动兜底用内存版,保证 agent 始终能存档。

智能体层:把存储器挂到图上
组件 来源 作用 本项目中的位置
create_deep_agent deepagents 构建已编译的智能体图;checkpointer= 参数把存储器注入图,之后每次 ainvoke / astream 自动「读档 → 执行 → 存档」 deepagent.pybuild_agent()
CompiledStateGraph langgraph 编译后的状态图;内部 Pregel 循环在 super-step 前后调用 checkpointer 的 aget_tuple / aput create_deep_agent(...) 的返回值
build_agent 项目代码 封装模型、提示词、工具与 checkpointer,统一构建带记忆能力的 agent mystu/buildagent/agent/deepagent.py
get_agent / set_agent 项目代码 维护「当前生效的 agent」单例:CLI 用默认内存版;Web 启动后 lifespanset_agent 换成带 SQLite 的版本 deepagent.pyachat / astream_chat 通过 get_agent() 取 agent
服务层:把 thread_id 带进每次调用
组件 作用 本项目中的位置
_build_config(thread_id) thread_id 包装成 LangGraph 认识的 config = {"configurable": {"thread_id": ...}};未传时自动生成新 UUID(= 新会话) deepagent.py
achat 非流式问答:调用 get_agent().ainvoke(..., config),框架据此读/写对应 thread 的检查点 deepagent.pycontroller/api.py /chat
astream_chat 流式问答:同样带 config,逐 token 输出;检查点在每个 super-step 后照常写入 deepagent.py/chat/stream
aget_state / aget_state_history 图提供的便捷接口:读某 thread 当前状态或全部历史检查点(时间旅行、查历史用) 编译后的 agent 自带;运维/调试时直接调用
启动与配置层:Web 场景下何时打开 SQLite
组件 作用 本项目中的位置
lifespan FastAPI 应用生命周期:启动时 AsyncSqliteSaver.from_conn_stringsetup() 建表 → build_agent(checkpointer=saver)set_agent;关闭时释放连接并回退默认 agent mystu/controller/__init__.py
get_checkpoint_db_path() 返回 SQLite 文件路径,默认 data/checkpoints.sqlite;可用环境变量 CHECKPOINT_DB 覆盖;自动创建 data/ 目录 mystu/buildagent/agent/modelConfig.py
接口层与前端:会话 ID 从哪来、怎么传
组件 作用 本项目中的位置
ChatRequest.thread_id API 请求体字段:客户端传入会话 ID,后端原样交给 _build_config mystu/entity/apireq.py
/agent/api/chat/chat/stream 接收 thread_id,调用 achat / astream_chat mystu/controller/api.py
前端 threadId 页面加载时 crypto.randomUUID() 生成;每次请求带上;点「新对话」换一个新 ID(= 开新会话、清空 UI) mystu/static/index.html

3.3 依赖包(pyproject.toml)

Checkpoint 相关能力由以下包提供(节选):

"langgraph-checkpoint-sqlite>=3.1.0",   # AsyncSqliteSaver
"langgraph-checkpoint",                  # InMemorySaver、BaseCheckpointSaver(随 langgraph 传递依赖)

3.4 各组件在「一轮对话」里的分工

以用户第 2 轮提问「我叫什么?」为例,各组件依次做什么:

checkpoints.sqlite AsyncSqliteSaver CompiledStateGraph achat + _build_config /chat 前端 threadId checkpoints.sqlite AsyncSqliteSaver CompiledStateGraph achat + _build_config /chat 前端 threadId {message, thread_id: "abc"} achat(message, "abc") config = {configurable: {thread_id: "abc"}} ainvoke(新消息, config) aget_tuple(thread_id=abc) SELECT 最新 checkpoint 历史 messages 恢复状态(含第1轮) 历史 + 新消息 → 调模型 aput(新 checkpoint) INSERT 新快照 最终回答 {status: ok, data} JSON 响应

要点:业务代码只负责传 thread_id 和调 ainvoke;具体什么时候读档、什么时候存档,由 LangGraph + checkpointer 在图执行循环里自动完成。


4. 在本项目里怎么用

4.1 两种存储器

LangGraph 自带几种 Checkpointer,我们项目里用到两种:

存储器 数据存哪 特点 用在哪
InMemorySaver 进程内存 快、零依赖,重启即丢 CLI 测试、未启用持久化时的回退
AsyncSqliteSaver 本地 SQLite 文件 重启不丢、跨进程可读、异步 Web 应用(正式)

注意:聊天接口用的是异步的 ainvoke / astream必须搭配异步存储器 AsyncSqliteSaver。如果误用同步的 SqliteSaver,会直接报 NotImplementedError: ... does not support async methods

4.2 构建 agent 时挂上存储器

def build_agent(
    checkpointer: BaseCheckpointSaver | None = None,
    extra_tools: Sequence[Any] | None = None,
):
    """构建使用 DeepSeek 的铁矿石价格预测 deep agent(带多轮对话记忆)。

    checkpointer:对话状态存储器。
      - 不传:使用 InMemorySaver(进程内记忆,重启即丢,适合 CLI / 测试)。
      - 传入 AsyncSqliteSaver 等:把各 thread_id 的对话持久化到磁盘,重启不丢。
    """
    system_prompt = load_iron_ore_forecast_prompt()
    tools = [*_TOOLS, *(extra_tools or [])]
    return create_deep_agent(
        model=build_model(),
        tools=tools,
        system_prompt=system_prompt,
        interrupt_on=_INTERRUPT_ON,
        checkpointer=checkpointer or InMemorySaver(),
    )

只要把 checkpointer 传进 create_deep_agent,记忆能力就自动接上了——后续所有 ainvoke / astream 都会自动「先读档、再执行、最后存档」。

4.3 在应用启动时打开 SQLite(lifespan)

异步 SQLite 存储器自带一个 aiosqlite 连接(后台线程),必须在事件循环里创建并全程保持存活,所以放进 FastAPI 的 lifespan

Agent AsyncSqliteSaver lifespan 启动 Agent AsyncSqliteSaver lifespan 启动 应用运行期间,连接一直开着 应用启动 from_conn_string(db_path) await setup() 建表 build_agent(checkpointer=saver) set_agent(带持久化的 agent) 应用关闭 自动释放连接
@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期:启动时打开 SQLite 持久化记忆,关闭时释放。

    AsyncSqliteSaver 自带异步连接(aiosqlite),必须在事件循环内创建并保持存活,
    因此放在 lifespan 里:进入时建表 + 构建带持久化的 agent,退出时回退默认 agent。
    """
    db_path = get_checkpoint_db_path()
    mcp_tools = await load_mcp_tools()
    async with AsyncSqliteSaver.from_conn_string(db_path) as saver:
        await saver.setup()  # 首次运行自动建表(checkpoints / writes)
        set_agent(build_agent(checkpointer=saver, extra_tools=mcp_tools))
        try:
            yield
        finally:
            set_agent(default_agent)

4.4 发起一次「带记忆」的对话

记忆的开关只有一个:在 config 里传 thread_id

config = {"configurable": {"thread_id": "user-123"}}

# 第 1 轮
await agent.ainvoke({"messages": [{"role": "user", "content": "我叫赵六"}]}, config)
# 第 2 轮:同一个 thread_id,模型能记得
await agent.ainvoke({"messages": [{"role": "user", "content": "我叫什么?"}]}, config)
# → "赵六"

注意第 2 轮只传了新的一句话,并没有手动拼接历史。历史是 Checkpointer 根据 thread_id 自动读出来、补在前面的。


5. 持久化的数据接口

这部分分两层看:底层数据库表结构(数据落地的样子)和上层编程接口(代码怎么读写它)。

5.1 数据库表结构(SQLite 实测 DDL)

await saver.setup() 会自动建两张表:checkpointswrites

表一:checkpoints —— 每一步的状态快照
CREATE TABLE checkpoints (
    thread_id            TEXT NOT NULL,            -- 会话 ID(存档槽位)
    checkpoint_ns        TEXT NOT NULL DEFAULT '', -- 命名空间(主图为 '',子图/子智能体非空)
    checkpoint_id        TEXT NOT NULL,            -- 本次检查点 ID(按时间单调递增的 UUID)
    parent_checkpoint_id TEXT,                     -- 父检查点 ID(串成链,第一个为 NULL)
    type                 TEXT,                     -- 序列化类型标记(如 msgpack)
    checkpoint           BLOB,                     -- 状态快照本体(序列化后的二进制)
    metadata             BLOB,                     -- 元数据(来源、步数等,序列化后的二进制)
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

逐字段解释:

字段 类型 含义
thread_id TEXT 会话标识。同一对话的所有快照共享同一个值,是数据隔离的最小单位
checkpoint_ns TEXT 命名空间。主图固定为空串 '';当智能体内部调用子图 / 子智能体时,子图的状态会用非空 ns 区分(deep agent 里会用到)
checkpoint_id TEXT 本检查点 ID,随时间单调递增,因此「同一 thread 下最大的 id 就是最新状态」
parent_checkpoint_id TEXT 指向上一个检查点,形成单向链;用于「时间旅行」和回溯
type TEXT 序列化器类型标记,配合反序列化用
checkpoint BLOB 核心。整个图状态的快照,反序列化后就是下面的 Checkpoint 结构(含 channel_values.messages
metadata BLOB 元数据,反序列化后含 source / step / parents

主键是 (thread_id, checkpoint_ns, checkpoint_id) 三元组——这正说明「定位一个快照」需要:哪个会话、哪个(子)图、第几步。

checkpoint 这个 BLOB 反序列化后的结构:

含义
v 结构版本号
id 检查点 id(与列同值)
ts 时间戳(ISO 字符串)
channel_values 各通道当前值,聊天场景下 messages 就在这里
channel_versions 每个通道的版本号(增量更新用)
versions_seen 各节点已处理到的通道版本(决定下一步该跑谁)
updated_channels 本步更新了哪些通道

metadata BLOB 反序列化后:

含义
source 这步状态的来源(input 用户输入 / loop 图循环 / update 手动更新 等)
step 第几步(-1 表示初始)
parents 各命名空间的父检查点映射
表二:writes —— 检查点之间的「待写入/中间写」
CREATE TABLE writes (
    thread_id     TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id       TEXT NOT NULL,   -- 产生这次写入的任务(节点执行)ID
    idx           INTEGER NOT NULL,-- 同一任务内多次写入的序号
    channel       TEXT NOT NULL,   -- 写到哪个通道(如 messages)
    type          TEXT,            -- 序列化类型
    value         BLOB,            -- 写入的值(序列化)
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

writes 记录的是「某个节点产生、但还未合并成正式 checkpoint 的中间写入」。它的作用是:如果执行到一半崩溃,重启时这些 pending writes 不会丢,可以接着把它们应用上去——这就是「故障续跑」的关键。普通聊天里你一般不用直接碰它。

两张表的关系

thread_id + checkpoint_ns + checkpoint_id

checkpoints

TEXT

thread_id

PK

TEXT

checkpoint_ns

PK

TEXT

checkpoint_id

PK

TEXT

parent_checkpoint_id

BLOB

checkpoint

BLOB

metadata

writes

TEXT

thread_id

PK

TEXT

checkpoint_ns

PK

TEXT

checkpoint_id

PK

TEXT

task_id

PK

INTEGER

idx

PK

TEXT

channel

BLOB

value

为什么是 BLOB 而不是 JSON 字段?因为状态里可能有各种 Python 对象(消息对象、工具调用等),LangGraph 用专门的序列化器(msgpack / JsonPlus)打包成二进制,存取更快、能还原对象类型。代价是你没法直接用 SQL 读懂 BLOB 内容,要读得通过下面的编程接口反序列化。

5.2 编程接口(Checkpointer API)

所有存储器都实现同一套接口(BaseCheckpointSaver),同步方法和 a 开头的异步方法成对出现。常用的几个:

方法 作用 谁调用
setup() 建表/迁移(首次运行) 应用启动时
aget_tuple(config) config(含 thread_id)读最新或指定检查点 框架在每次执行前自动调用
alist(config, ...) 列出某会话的全部历史检查点(可分页、可过滤) 时间旅行 / 查历史
aput(config, checkpoint, metadata, versions) 写入一个新检查点 框架在每步之后自动调用
aput_writes(config, writes, task_id) 写入中间 pending writes 框架内部
adelete_thread(thread_id) 删除某会话的所有数据 「删除会话」功能

aget_tuple 返回的 CheckpointTuple 结构(5 个字段):

CheckpointTuple(
    config,          # 定位信息:{"configurable": {"thread_id", "checkpoint_ns", "checkpoint_id"}}
    checkpoint,      # 状态快照(含 channel_values.messages)
    metadata,        # 元数据(source / step / parents)
    parent_config,   # 父检查点的定位信息(没有则 None)
    pending_writes,  # 尚未合并的中间写入
)

95% 的情况下你不需要直接调这些方法——把 checkpointer 交给 create_deep_agent,框架就帮你「自动读档/存档」了。只有要做「查历史、列会话、删会话、时间旅行」这类管理功能时,才直接用它们。

5.3 上层的便捷读取:get_state / get_state_history

比起裸用 aget_tuple,编译后的图还提供了更顺手的封装:

config = {"configurable": {"thread_id": "user-123"}}

# 读当前状态(最新检查点)
snapshot = await agent.aget_state(config)
messages = snapshot.values["messages"]      # 这条会话的完整消息历史

# 遍历这条会话的所有历史检查点(时间旅行)
async for snap in agent.aget_state_history(config):
    print(snap.config["configurable"]["checkpoint_id"], snap.values.get("messages"))

我们当初验证持久化是否生效,用的就是 aget_state 直接把 6 条历史消息读了出来。


6. 「会话」和 thread_id 到底什么关系

这是最容易混淆、也最关键的一点,单独讲透。

6.1 一句话定义

thread_id 就是「会话」的唯一身份证。一个 thread_id = 一条独立的对话时间线。

它们是一一对应的:

  • 延续某段对话 → 用同一个 thread_id
  • 开一段全新对话 → 换一个新的 thread_id
  • 不同 thread_id 之间完全隔离,A 会话看不到 B 会话的任何消息。

6.2 它如何驱动「记忆」

Checkpointer(SQLite) Agent 前端 Checkpointer(SQLite) Agent 前端 第2轮:只发"我叫什么?" 带上 thread_id=user-123 把历史 + 新消息一起喂给模型 ainvoke(新消息, config{thread_id}) aget_tuple(thread_id=user-123) 历史快照(含第1轮消息) 调用大模型推理 aput(新检查点=历史+本轮) "赵六"

整条链路里,thread_id唯一的钥匙:它决定了「读哪条会话的档、把新档存到哪条会话」。没有它(或每次都换新的),就退化成「每次都是新对话」,模型自然失忆。

6.3 在本项目里 thread_id 从哪来

  • 前端:每开一个新会话,生成一个 UUID 作为 thread_id,之后这一整段对话的每次请求都带着它;点「新对话」按钮就换一个新 UUID(= 开新会话)。
  • 后端:从请求里拿到 thread_id,塞进 config
def _build_config(thread_id: str | None) -> dict[str, Any]:
    """构造带 thread_id 的运行配置;未提供时自动生成一个新会话。"""
    tid = thread_id or uuid.uuid4().hex
    return {"configurable": {"thread_id": tid}}

细节:如果前端没传 thread_id,后端会临时生成一个。但这种临时 id 不会回传给前端,下次请求又是新的——所以「不传 thread_id」等价于「每次都是一次性、无记忆对话」。要多轮记忆,前端必须固定带同一个 thread_id

6.4 thread_id 该怎么设计

场景 建议的 thread_id 取值
一个用户一段连续对话 前端生成的 UUID(点「新对话」就换)
按用户隔离、每人一条主线 直接用 user_id
一个用户多个独立会话(像 ChatGPT 左侧列表) user_id + 会话序号,或每个会话一个 UUID
一次性问答、不需要记忆 不传,或每次用随机值

6.5 与 checkpoint_ns 的区别(进阶)

别把 thread_idcheckpoint_ns 搞混:

  • thread_id横向区分——这是哪一段对话;
  • checkpoint_ns纵向区分——同一段对话里,主图状态('')和子智能体/子图的状态。deep agent 内部派生子任务时会产生非空 ns,但对使用者透明,平时不用管。

7. 常用运维操作速查

提示:checkpoint / metadata / value 都是序列化 BLOB,直接 SQL 读不出文本内容。统计、定位用 SQL,读消息内容请用 aget_state

-- 一共有多少条会话(去重 thread_id)
SELECT COUNT(DISTINCT thread_id) FROM checkpoints;

-- 列出所有会话及其检查点数量
SELECT thread_id, COUNT(*) AS steps
FROM checkpoints GROUP BY thread_id;

-- 某会话的检查点链(按时间)
SELECT checkpoint_id, parent_checkpoint_id
FROM checkpoints
WHERE thread_id = 'user-123' AND checkpoint_ns = ''
ORDER BY checkpoint_id;

删除某条会话(推荐用接口,连 writes 一起清干净):

await saver.adelete_thread("user-123")

读取某会话完整消息历史(反序列化):

snap = await agent.aget_state({"configurable": {"thread_id": "user-123"}})
for m in snap.values["messages"]:
    print(type(m).__name__, getattr(m, "content", ""))

8. 小结

  • Checkpoint = 智能体的存档系统,解决多轮记忆、故障续跑、时间旅行三件事。
  • 本项目由 存储层(AsyncSqliteSaver / InMemorySaver)→ 智能体层(create_deep_agent)→ 服务层(achat + _build_config)→ 接口/前端(thread_id) 多层组件协作完成,详见 §3。
  • 本项目用 AsyncSqliteSaver 把对话落到本地 SQLite,配合 FastAPI lifespan 在启动时打开、关闭时释放。
  • 数据落在两张表:checkpoints(每步状态快照)和 writes(中间待写入);核心内容是序列化 BLOB,靠编程接口读写。
  • 编程接口核心是 aget_tuple / alist / aput / adelete_thread,但日常只需把 checkpointer 交给框架,再用 aget_state 读历史即可。
  • thread_id 是「会话」的唯一身份证:同 id 续聊、换 id 开新会话、不同 id 完全隔离——它是整个记忆机制的钥匙。

下一步如果要上多实例 / 生产,把 AsyncSqliteSaver 换成 AIOMySQLSaver 或官方的 PostgresSaver 即可,表结构和接口是同一套,业务代码几乎不用动。

在这里插入图片描述

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐