让智能体「记住」对话:Checkpoint 功能、持久化数据接口与 thread_id 详解
前几篇我们做出了一个「铁矿石价格预测专家」智能体,并搬上了网页、加了流式输出。但有个硬伤:它记不住上一句话。这一篇彻底讲清楚 LangGraph 的 Checkpoint(检查点) 机制——它是什么、怎么用、数据到底存成了什么样、以及一个「会话」和
thread_id到底是什么关系。
1. 为什么需要 Checkpoint
大模型本身是无状态的:每次调用都是一锤子买卖,它不会自动记得你上一轮说过什么。要实现「多轮对话」,本质上只有一个办法——每次请求都把历史消息重新喂给它。
那历史消息存哪?谁来存?什么时候存、什么时候取?这一整套「对话状态的保存与恢复」机制,在 LangGraph 里就叫 Checkpoint,负责干这件事的组件叫 Checkpointer(检查点存储器)。
它解决三件事:
| 能力 | 说明 |
|---|---|
| 多轮记忆 | 把每一轮的状态(消息列表等)存下来,下次同一会话进来时自动恢复,模型就能「接着聊」 |
| 故障恢复 / 续跑 | 智能体执行到一半崩了(断网、超时、进程挂掉),重启后能从最后一个检查点继续,而不是从头再来 |
| 时间旅行(time-travel) | 每一步都有快照,可以回到任意历史步骤,查看甚至从那里分支重跑 |
一句话:Checkpointer 是智能体的「存档系统」,和游戏存档完全是一个意思——thread_id 就是「存档槽位」。
2. 核心概念:Checkpoint、Thread、Channel
先把几个名词理顺,后面看数据表就不懵了。
- Thread(会话):一条独立的对话时间线,用
thread_id唯一标识。不同thread_id之间数据完全隔离、互不可见。 - Checkpoint(检查点):某一时刻整个图状态的快照。智能体每推进一步(super-step)就生成一个新的 checkpoint,用
checkpoint_id标识。它们通过parent_checkpoint_id串成一条链。 - Channel(通道):图状态里的一个个「字段」。对我们的聊天智能体来说,最关键的通道就是
messages(消息列表)。Checkpoint 保存的就是所有通道在那一刻的值(channel_values)。
类比 Git:Thread 是一个仓库,Checkpoint 是一次次 commit(带 parent 指针连成链),channel_values 是这次提交时的工作区快照。
3. Checkpoint 功能使用了哪些组件
本项目要实现「多轮记忆 + 重启不丢」,并不是只加一个 checkpointer 就完事,而是存储层、智能体层、服务层、接口层、前端层几层组件配合。下面按层次说明每个组件是什么、干什么、落在哪个文件。
3.1 整体协作关系
3.2 组件清单(按层次)
存储层:真正「存档 / 读档」
| 组件 | 来源包 | 作用 | 本项目中的位置 |
|---|---|---|---|
BaseCheckpointSaver |
langgraph-checkpoint |
Checkpointer 的抽象基类,定义 setup / aget_tuple / aput / adelete_thread 等统一接口;所有存储器都实现它 |
deepagent.py 中 build_agent(checkpointer: BaseCheckpointSaver | None) 的类型约束 |
AsyncSqliteSaver |
langgraph-checkpoint-sqlite |
异步 SQLite 存储器:把每个 thread_id 的检查点写入本地 .sqlite 文件;支持 ainvoke / astream;重启进程后仍可读回历史 |
Web 应用在 lifespan 里创建,传给 build_agent(checkpointer=saver) |
InMemorySaver |
langgraph-checkpoint |
内存存储器:进程内保存检查点,速度快、零文件依赖,但进程结束即清空 | build_agent() 未传入 checkpointer 时的默认值:checkpointer or InMemorySaver() |
aiosqlite |
langgraph-checkpoint-sqlite 的依赖 |
为 AsyncSqliteSaver 提供异步 SQLite 连接;必须在事件循环内创建并保持存活 |
由 AsyncSqliteSaver.from_conn_string() 内部使用,应用代码不直接调用 |
checkpointer or InMemorySaver()的含义:调用方传了存储器就用传入的(如AsyncSqliteSaver),没传(None)就自动兜底用内存版,保证 agent 始终能存档。
智能体层:把存储器挂到图上
| 组件 | 来源 | 作用 | 本项目中的位置 |
|---|---|---|---|
create_deep_agent |
deepagents |
构建已编译的智能体图;checkpointer= 参数把存储器注入图,之后每次 ainvoke / astream 自动「读档 → 执行 → 存档」 |
deepagent.py → build_agent() |
CompiledStateGraph |
langgraph |
编译后的状态图;内部 Pregel 循环在 super-step 前后调用 checkpointer 的 aget_tuple / aput |
create_deep_agent(...) 的返回值 |
build_agent |
项目代码 | 封装模型、提示词、工具与 checkpointer,统一构建带记忆能力的 agent | mystu/buildagent/agent/deepagent.py |
get_agent / set_agent |
项目代码 | 维护「当前生效的 agent」单例:CLI 用默认内存版;Web 启动后 lifespan 用 set_agent 换成带 SQLite 的版本 |
deepagent.py;achat / astream_chat 通过 get_agent() 取 agent |
服务层:把 thread_id 带进每次调用
| 组件 | 作用 | 本项目中的位置 |
|---|---|---|
_build_config(thread_id) |
把 thread_id 包装成 LangGraph 认识的 config = {"configurable": {"thread_id": ...}};未传时自动生成新 UUID(= 新会话) |
deepagent.py |
achat |
非流式问答:调用 get_agent().ainvoke(..., config),框架据此读/写对应 thread 的检查点 |
deepagent.py → controller/api.py /chat |
astream_chat |
流式问答:同样带 config,逐 token 输出;检查点在每个 super-step 后照常写入 |
deepagent.py → /chat/stream |
aget_state / aget_state_history |
图提供的便捷接口:读某 thread 当前状态或全部历史检查点(时间旅行、查历史用) | 编译后的 agent 自带;运维/调试时直接调用 |
启动与配置层:Web 场景下何时打开 SQLite
| 组件 | 作用 | 本项目中的位置 |
|---|---|---|
lifespan |
FastAPI 应用生命周期:启动时 AsyncSqliteSaver.from_conn_string → setup() 建表 → build_agent(checkpointer=saver) → set_agent;关闭时释放连接并回退默认 agent |
mystu/controller/__init__.py |
get_checkpoint_db_path() |
返回 SQLite 文件路径,默认 data/checkpoints.sqlite;可用环境变量 CHECKPOINT_DB 覆盖;自动创建 data/ 目录 |
mystu/buildagent/agent/modelConfig.py |
接口层与前端:会话 ID 从哪来、怎么传
| 组件 | 作用 | 本项目中的位置 |
|---|---|---|
ChatRequest.thread_id |
API 请求体字段:客户端传入会话 ID,后端原样交给 _build_config |
mystu/entity/apireq.py |
/agent/api/chat、/chat/stream |
接收 thread_id,调用 achat / astream_chat |
mystu/controller/api.py |
前端 threadId |
页面加载时 crypto.randomUUID() 生成;每次请求带上;点「新对话」换一个新 ID(= 开新会话、清空 UI) |
mystu/static/index.html |
3.3 依赖包(pyproject.toml)
Checkpoint 相关能力由以下包提供(节选):
"langgraph-checkpoint-sqlite>=3.1.0", # AsyncSqliteSaver
"langgraph-checkpoint", # InMemorySaver、BaseCheckpointSaver(随 langgraph 传递依赖)
3.4 各组件在「一轮对话」里的分工
以用户第 2 轮提问「我叫什么?」为例,各组件依次做什么:
要点:业务代码只负责传 thread_id 和调 ainvoke;具体什么时候读档、什么时候存档,由 LangGraph + checkpointer 在图执行循环里自动完成。
4. 在本项目里怎么用
4.1 两种存储器
LangGraph 自带几种 Checkpointer,我们项目里用到两种:
| 存储器 | 数据存哪 | 特点 | 用在哪 |
|---|---|---|---|
InMemorySaver |
进程内存 | 快、零依赖,重启即丢 | CLI 测试、未启用持久化时的回退 |
AsyncSqliteSaver |
本地 SQLite 文件 | 重启不丢、跨进程可读、异步 | Web 应用(正式) |
注意:聊天接口用的是异步的
ainvoke / astream,必须搭配异步存储器AsyncSqliteSaver。如果误用同步的SqliteSaver,会直接报NotImplementedError: ... does not support async methods。
4.2 构建 agent 时挂上存储器
def build_agent(
checkpointer: BaseCheckpointSaver | None = None,
extra_tools: Sequence[Any] | None = None,
):
"""构建使用 DeepSeek 的铁矿石价格预测 deep agent(带多轮对话记忆)。
checkpointer:对话状态存储器。
- 不传:使用 InMemorySaver(进程内记忆,重启即丢,适合 CLI / 测试)。
- 传入 AsyncSqliteSaver 等:把各 thread_id 的对话持久化到磁盘,重启不丢。
"""
system_prompt = load_iron_ore_forecast_prompt()
tools = [*_TOOLS, *(extra_tools or [])]
return create_deep_agent(
model=build_model(),
tools=tools,
system_prompt=system_prompt,
interrupt_on=_INTERRUPT_ON,
checkpointer=checkpointer or InMemorySaver(),
)
只要把 checkpointer 传进 create_deep_agent,记忆能力就自动接上了——后续所有 ainvoke / astream 都会自动「先读档、再执行、最后存档」。
4.3 在应用启动时打开 SQLite(lifespan)
异步 SQLite 存储器自带一个 aiosqlite 连接(后台线程),必须在事件循环里创建并全程保持存活,所以放进 FastAPI 的 lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期:启动时打开 SQLite 持久化记忆,关闭时释放。
AsyncSqliteSaver 自带异步连接(aiosqlite),必须在事件循环内创建并保持存活,
因此放在 lifespan 里:进入时建表 + 构建带持久化的 agent,退出时回退默认 agent。
"""
db_path = get_checkpoint_db_path()
mcp_tools = await load_mcp_tools()
async with AsyncSqliteSaver.from_conn_string(db_path) as saver:
await saver.setup() # 首次运行自动建表(checkpoints / writes)
set_agent(build_agent(checkpointer=saver, extra_tools=mcp_tools))
try:
yield
finally:
set_agent(default_agent)
4.4 发起一次「带记忆」的对话
记忆的开关只有一个:在 config 里传 thread_id。
config = {"configurable": {"thread_id": "user-123"}}
# 第 1 轮
await agent.ainvoke({"messages": [{"role": "user", "content": "我叫赵六"}]}, config)
# 第 2 轮:同一个 thread_id,模型能记得
await agent.ainvoke({"messages": [{"role": "user", "content": "我叫什么?"}]}, config)
# → "赵六"
注意第 2 轮只传了新的一句话,并没有手动拼接历史。历史是 Checkpointer 根据 thread_id 自动读出来、补在前面的。
5. 持久化的数据接口
这部分分两层看:底层数据库表结构(数据落地的样子)和上层编程接口(代码怎么读写它)。
5.1 数据库表结构(SQLite 实测 DDL)
await saver.setup() 会自动建两张表:checkpoints 和 writes。
表一:checkpoints —— 每一步的状态快照
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL, -- 会话 ID(存档槽位)
checkpoint_ns TEXT NOT NULL DEFAULT '', -- 命名空间(主图为 '',子图/子智能体非空)
checkpoint_id TEXT NOT NULL, -- 本次检查点 ID(按时间单调递增的 UUID)
parent_checkpoint_id TEXT, -- 父检查点 ID(串成链,第一个为 NULL)
type TEXT, -- 序列化类型标记(如 msgpack)
checkpoint BLOB, -- 状态快照本体(序列化后的二进制)
metadata BLOB, -- 元数据(来源、步数等,序列化后的二进制)
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
逐字段解释:
| 字段 | 类型 | 含义 |
|---|---|---|
thread_id |
TEXT | 会话标识。同一对话的所有快照共享同一个值,是数据隔离的最小单位 |
checkpoint_ns |
TEXT | 命名空间。主图固定为空串 '';当智能体内部调用子图 / 子智能体时,子图的状态会用非空 ns 区分(deep agent 里会用到) |
checkpoint_id |
TEXT | 本检查点 ID,随时间单调递增,因此「同一 thread 下最大的 id 就是最新状态」 |
parent_checkpoint_id |
TEXT | 指向上一个检查点,形成单向链;用于「时间旅行」和回溯 |
type |
TEXT | 序列化器类型标记,配合反序列化用 |
checkpoint |
BLOB | 核心。整个图状态的快照,反序列化后就是下面的 Checkpoint 结构(含 channel_values.messages) |
metadata |
BLOB | 元数据,反序列化后含 source / step / parents 等 |
主键是 (thread_id, checkpoint_ns, checkpoint_id) 三元组——这正说明「定位一个快照」需要:哪个会话、哪个(子)图、第几步。
checkpoint 这个 BLOB 反序列化后的结构:
| 键 | 含义 |
|---|---|
v |
结构版本号 |
id |
检查点 id(与列同值) |
ts |
时间戳(ISO 字符串) |
channel_values |
各通道当前值,聊天场景下 messages 就在这里 |
channel_versions |
每个通道的版本号(增量更新用) |
versions_seen |
各节点已处理到的通道版本(决定下一步该跑谁) |
updated_channels |
本步更新了哪些通道 |
metadata BLOB 反序列化后:
| 键 | 含义 |
|---|---|
source |
这步状态的来源(input 用户输入 / loop 图循环 / update 手动更新 等) |
step |
第几步(-1 表示初始) |
parents |
各命名空间的父检查点映射 |
表二:writes —— 检查点之间的「待写入/中间写」
CREATE TABLE writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL, -- 产生这次写入的任务(节点执行)ID
idx INTEGER NOT NULL,-- 同一任务内多次写入的序号
channel TEXT NOT NULL, -- 写到哪个通道(如 messages)
type TEXT, -- 序列化类型
value BLOB, -- 写入的值(序列化)
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
writes 记录的是「某个节点产生、但还未合并成正式 checkpoint 的中间写入」。它的作用是:如果执行到一半崩溃,重启时这些 pending writes 不会丢,可以接着把它们应用上去——这就是「故障续跑」的关键。普通聊天里你一般不用直接碰它。
两张表的关系
为什么是 BLOB 而不是 JSON 字段?因为状态里可能有各种 Python 对象(消息对象、工具调用等),LangGraph 用专门的序列化器(msgpack / JsonPlus)打包成二进制,存取更快、能还原对象类型。代价是你没法直接用 SQL 读懂 BLOB 内容,要读得通过下面的编程接口反序列化。
5.2 编程接口(Checkpointer API)
所有存储器都实现同一套接口(BaseCheckpointSaver),同步方法和 a 开头的异步方法成对出现。常用的几个:
| 方法 | 作用 | 谁调用 |
|---|---|---|
setup() |
建表/迁移(首次运行) | 应用启动时 |
aget_tuple(config) |
按 config(含 thread_id)读最新或指定检查点 |
框架在每次执行前自动调用 |
alist(config, ...) |
列出某会话的全部历史检查点(可分页、可过滤) | 时间旅行 / 查历史 |
aput(config, checkpoint, metadata, versions) |
写入一个新检查点 | 框架在每步之后自动调用 |
aput_writes(config, writes, task_id) |
写入中间 pending writes | 框架内部 |
adelete_thread(thread_id) |
删除某会话的所有数据 | 「删除会话」功能 |
aget_tuple 返回的 CheckpointTuple 结构(5 个字段):
CheckpointTuple(
config, # 定位信息:{"configurable": {"thread_id", "checkpoint_ns", "checkpoint_id"}}
checkpoint, # 状态快照(含 channel_values.messages)
metadata, # 元数据(source / step / parents)
parent_config, # 父检查点的定位信息(没有则 None)
pending_writes, # 尚未合并的中间写入
)
95% 的情况下你不需要直接调这些方法——把 checkpointer 交给
create_deep_agent,框架就帮你「自动读档/存档」了。只有要做「查历史、列会话、删会话、时间旅行」这类管理功能时,才直接用它们。
5.3 上层的便捷读取:get_state / get_state_history
比起裸用 aget_tuple,编译后的图还提供了更顺手的封装:
config = {"configurable": {"thread_id": "user-123"}}
# 读当前状态(最新检查点)
snapshot = await agent.aget_state(config)
messages = snapshot.values["messages"] # 这条会话的完整消息历史
# 遍历这条会话的所有历史检查点(时间旅行)
async for snap in agent.aget_state_history(config):
print(snap.config["configurable"]["checkpoint_id"], snap.values.get("messages"))
我们当初验证持久化是否生效,用的就是 aget_state 直接把 6 条历史消息读了出来。
6. 「会话」和 thread_id 到底什么关系
这是最容易混淆、也最关键的一点,单独讲透。
6.1 一句话定义
thread_id就是「会话」的唯一身份证。一个thread_id= 一条独立的对话时间线。
它们是一一对应的:
- 想延续某段对话 → 用同一个
thread_id; - 想开一段全新对话 → 换一个新的
thread_id; - 不同
thread_id之间完全隔离,A 会话看不到 B 会话的任何消息。
6.2 它如何驱动「记忆」
整条链路里,thread_id 是唯一的钥匙:它决定了「读哪条会话的档、把新档存到哪条会话」。没有它(或每次都换新的),就退化成「每次都是新对话」,模型自然失忆。
6.3 在本项目里 thread_id 从哪来
- 前端:每开一个新会话,生成一个 UUID 作为
thread_id,之后这一整段对话的每次请求都带着它;点「新对话」按钮就换一个新 UUID(= 开新会话)。 - 后端:从请求里拿到
thread_id,塞进config:
def _build_config(thread_id: str | None) -> dict[str, Any]:
"""构造带 thread_id 的运行配置;未提供时自动生成一个新会话。"""
tid = thread_id or uuid.uuid4().hex
return {"configurable": {"thread_id": tid}}
细节:如果前端没传
thread_id,后端会临时生成一个。但这种临时 id 不会回传给前端,下次请求又是新的——所以「不传 thread_id」等价于「每次都是一次性、无记忆对话」。要多轮记忆,前端必须固定带同一个thread_id。
6.4 thread_id 该怎么设计
| 场景 | 建议的 thread_id 取值 |
|---|---|
| 一个用户一段连续对话 | 前端生成的 UUID(点「新对话」就换) |
| 按用户隔离、每人一条主线 | 直接用 user_id |
| 一个用户多个独立会话(像 ChatGPT 左侧列表) | user_id + 会话序号,或每个会话一个 UUID |
| 一次性问答、不需要记忆 | 不传,或每次用随机值 |
6.5 与 checkpoint_ns 的区别(进阶)
别把 thread_id 和 checkpoint_ns 搞混:
thread_id:横向区分——这是哪一段对话;checkpoint_ns:纵向区分——同一段对话里,主图状态('')和子智能体/子图的状态。deep agent 内部派生子任务时会产生非空 ns,但对使用者透明,平时不用管。
7. 常用运维操作速查
提示:
checkpoint/metadata/value都是序列化 BLOB,直接 SQL 读不出文本内容。统计、定位用 SQL,读消息内容请用aget_state。
-- 一共有多少条会话(去重 thread_id)
SELECT COUNT(DISTINCT thread_id) FROM checkpoints;
-- 列出所有会话及其检查点数量
SELECT thread_id, COUNT(*) AS steps
FROM checkpoints GROUP BY thread_id;
-- 某会话的检查点链(按时间)
SELECT checkpoint_id, parent_checkpoint_id
FROM checkpoints
WHERE thread_id = 'user-123' AND checkpoint_ns = ''
ORDER BY checkpoint_id;
删除某条会话(推荐用接口,连 writes 一起清干净):
await saver.adelete_thread("user-123")
读取某会话完整消息历史(反序列化):
snap = await agent.aget_state({"configurable": {"thread_id": "user-123"}})
for m in snap.values["messages"]:
print(type(m).__name__, getattr(m, "content", ""))
8. 小结
- Checkpoint = 智能体的存档系统,解决多轮记忆、故障续跑、时间旅行三件事。
- 本项目由 存储层(AsyncSqliteSaver / InMemorySaver)→ 智能体层(create_deep_agent)→ 服务层(achat + _build_config)→ 接口/前端(thread_id) 多层组件协作完成,详见 §3。
- 本项目用
AsyncSqliteSaver把对话落到本地 SQLite,配合 FastAPIlifespan在启动时打开、关闭时释放。 - 数据落在两张表:
checkpoints(每步状态快照)和writes(中间待写入);核心内容是序列化 BLOB,靠编程接口读写。 - 编程接口核心是
aget_tuple / alist / aput / adelete_thread,但日常只需把 checkpointer 交给框架,再用aget_state读历史即可。 thread_id是「会话」的唯一身份证:同 id 续聊、换 id 开新会话、不同 id 完全隔离——它是整个记忆机制的钥匙。
下一步如果要上多实例 / 生产,把 AsyncSqliteSaver 换成 AIOMySQLSaver 或官方的 PostgresSaver 即可,表结构和接口是同一套,业务代码几乎不用动。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)