项目背景与架构总览

在企业日常运营中,数据分析需求无处不在。运营团队想知道上个月的销售额同比增长了多少,产品经理想了解不同用户群体的留存曲线,财务部门需要从海量交易记录中提取特定维度的汇总报表。传统的工作流程是业务人员提出问题、数据分析师编写 SQL 查询并执行、再由数据分析师解读结果并制作可视化图表,整个过程可能需要数小时甚至数天。一个智能化的数据分析 Agent 能够将这个过程压缩到分钟级,业务人员用自然语言描述自己的分析需求,Agent 自动将需求翻译为 SQL、执行查询、生成图表,最终交回一份包含数据洞察和可视化成果的完整报告。

这个系统的核心架构遵循一条清晰的流水线,自然语言问题首先被送入 SQL 生成节点,该节点在数据库 Schema 的约束下将模糊的问题表述精确化为一条可执行的 SQL 语句。生成的 SQL 不会立即执行,而是先经过一个查询校验节点,该节点检查 SQL 的语法正确性和语义安全性,它会阻止任何包含 DROPDELETEUPDATE 等写操作的语句,也会拒绝访问超出了当前上下文授权范围的数据表。通过校验的 SQL 被送入查询执行节点,该节点连接数据库并返回结构化的查询结果。随后查询结果与用户的原始问题一起被传递给可视化节点,由 LLM 自主判断需要生成什么类型的图表以及如何组织图表的数据。最后报告生成节点将数据摘要、SQL 语句、图表路径和分析结论整合为一篇连贯的报告。

与第二天的代码审查项目不同,数据分析场景中存在一个关键的约束条件,数据库的 Schema 信息是模型编写正确 SQL 的必备前提。一个对数据库结构一无所知的模型不可能凭空写出能正确执行的查询语句,因此在 SQL 生成之前,系统必须先执行一个 Schema 探查步骤,自动列出数据库中所有可用的表名,然后根据问题的语义选择相关的表并获取其详细的列定义。这个"先探查、再生成"的模式是 LangGraph 官方 SQL Agent 教程的核心设计思想,也是我们这套流水线在生成查询之前设置列表节点和获取 Schema 节点的原因。

在运行环境方面,数据分析任务涉及执行第三方库(如 pandas 和 matplotlib)中的代码,如果直接在本地环境中运行模型生成的 Python 脚本,存在潜在的安全风险。LangChain 的 Deep Agents 框架为此提供了沙箱化的执行方案,代码在隔离的后端环境中运行,Agent 通过 FilesystemMiddleware 访问沙箱内的文件系统,通过 SubAgentMiddleware 将可视化和数据处理等子任务委托给专门的子 Agent。对于学习和本地开发场景,我们可以使用 LocalShellBackend 在可控的本地目录中执行代码,而在生产环境中则推荐使用 LangSmith Sandbox、E2B 或 Daytona 等云端沙箱服务。

开发环境与数据准备

在开始构建 Agent 之前,我们需要安装核心依赖并准备一份可供分析的示例数据。LangChain 的 langchain_community 包提供了 SQLDatabase 工具类,它封装了 SQLAlchemy 的连接管理细节,让 Agent 可以通过工具接口安全地访问数据库。langgraph 负责构建有状态的工作流,将列表、获取 Schema、生成查询、校验查询、执行查询和可视化这些步骤编排为确定的节点图。matplotlib 是 Python 生态中最成熟的绑图库之一,虽然在实际项目中可以使用 Seaborn、Plotly 等更高级的封装,但 matplotlib 的基础性使它成为教学环境中最合适的选择。

# 核心依赖安装
pip install langgraph langchain langchain-openai langchain-community sqlalchemy matplotlib python-dotenv

模型初始化沿用我们在前一天项目中建立的统一方式,使用 init_chat_model 工厂函数创建兼容 OpenAI 协议的 Chat 模型实例。数据分析任务对模型的 SQL 编写能力有较高要求,需要模型理解表结构、字段含义以及 SQL 标准语法,因此选择推理能力较强的模型至关重要。temperature=0 在 SQL 生成场景下尤为重要,同一份 Schema 和同一个问题必须产生一致的 SQL 语句,任何随机性都可能导致生成的查询结果不可复现。

from langchain.chat_models import init_chat_model
from dotenv import load_dotenv

load_dotenv()

llm = init_chat_model(
    "deepseek-ai/DeepSeek-V4-Pro",
    model_provider="openai",
    temperature=0,
    model_kwargs={"extra_body": {"thinking": {"type": "enabled"}}}
)

在数据准备环节,我们使用 SQLite 作为数据库引擎,因为它是 Python 标准库的一部分,无需额外的服务安装和配置。我们创建一个内存数据库并写入一组电商销售数据,这份数据包含订单表(orders)、商品表(products)和用户表(users),三者之间存在外键关联关系,能够支撑多表联查、聚合统计和分组分析等典型的业务分析场景。

import sqlite3

conn = sqlite3.connect(":memory:", check_same_thread=False)
cursor = conn.cursor()

# 创建用户表
cursor.execute("""
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        city TEXT,
        vip_level TEXT DEFAULT '普通'
    )
""")

# 创建商品表
cursor.execute("""
    CREATE TABLE products (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        category TEXT,
        price REAL NOT NULL
    )
""")

# 创建订单表
cursor.execute("""
    CREATE TABLE orders (
        id INTEGER PRIMARY KEY,
        user_id INTEGER,
        product_id INTEGER,
        quantity INTEGER NOT NULL,
        order_date TEXT NOT NULL,
        FOREIGN KEY (user_id) REFERENCES users(id),
        FOREIGN KEY (product_id) REFERENCES products(id)
    )
""")

# 插入示例用户
users = [
    (1, "张三", "北京", "VIP"),
    (2, "李四", "上海", "普通"),
    (3, "王五", "广州", "VIP"),
    (4, "赵六", "深圳", "普通"),
    (5, "钱七", "北京", "普通"),
]
cursor.executemany("INSERT INTO users VALUES (?, ?, ?, ?)", users)

# 插入示例商品
products = [
    (1, "机械键盘", "电子产品", 399.0),
    (2, "无线鼠标", "电子产品", 129.0),
    (3, "Python编程书", "图书", 89.0),
    (4, "显示器支架", "办公用品", 259.0),
    (5, "降噪耳机", "电子产品", 699.0),
]
cursor.executemany("INSERT INTO products VALUES (?, ?, ?, ?)", products)

# 插入示例订单(2025年6月数据)
orders = [
    (1, 1, 1, 2, "2025-06-01"),
    (2, 1, 3, 1, "2025-06-02"),
    (3, 2, 2, 3, "2025-06-03"),
    (4, 3, 5, 1, "2025-06-05"),
    (5, 4, 1, 1, "2025-06-08"),
    (6, 5, 4, 2, "2025-06-10"),
    (7, 1, 2, 1, "2025-06-12"),
    (8, 3, 3, 2, "2025-06-15"),
    (9, 2, 5, 1, "2025-06-18"),
    (10, 4, 1, 3, "2025-06-20"),
]
cursor.executemany("INSERT INTO orders VALUES (?, ?, ?, ?, ?)", orders)
conn.commit()

这份数据的设计有两个考量。首先三个表之间存在明确的外键关系,用户通过 user_id 关联到订单,商品通过 product_id 关联到订单,模型需要根据问题语义自行判断什么时候需要执行 JOIN 操作。其次,数据覆盖了多个分析维度,时间维度(order_date)、地域维度(city)、品类维度(category)和用户等级维度(vip_level),使得 LLM 有机会展示它在面对不同聚合粒度时的 SQL 生成能力。

构建 LangGraph SQL 工作流

与直接使用 create_agent 创建一个黑盒 Agent 不同,在数据分析场景中我们选择基于 LangGraph 的状态图 API 显式构建每一步的处理逻辑。这种选择的核心原因在于安全性和可控性,我们不希望 Agent 在执行过程中产生任何幻觉性的数据库操作,也不希望它将错误的 SQL 直接提交到数据库执行。通过 LangGraph 的显式图结构,我们可以在 SQL 生成和 SQL 执行之间插入一个强制性的校验节点,在数据库操作之前拦截一切不符合预期的行为。

LangGraph 中最基础的概念是状态(State)和节点(Node)。状态定义了在整个工作流中流转的数据结构,我们使用 MessagesState 作为基础状态类型,它内置了一个 messages 属性,类型为消息列表,通过 add_messages 归约器自动处理消息的追加和合并。节点是处理函数,每个节点接收当前状态并返回状态的部分更新。边(Edge)定义了节点之间的流转方向,而条件边(Conditional Edge)则允许根据状态中的信息动态选择下一个目标节点。

整个工作流包含六个节点,list_tables 负责获取数据库中所有的表名,get_schema 负责获取相关表的完整 DDL 语句和示例数据行,generate_query 调用 LLM 将自然语言问题和 Schema 信息一起转化为 SQL,check_query 对生成的 SQL 执行安全性和语法校验,run_query 执行通过校验的 SQL 并返回结果,generate_report 将查询结果与原始问题整合为最终的分析报告。这些节点按确定的顺序首尾相连,在 generate_querycheck_query 之间插入一条条件边,当 LLM 没有产生工具调用时直接结束,避免无效的反复循环。

from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage, SystemMessage


class State(TypedDict):
    messages: Annotated[list, add_messages]


def list_tables(state: State) -> dict:
    """列出数据库中的所有表名"""
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = [row[0] for row in cursor.fetchall()]
    table_names = ", ".join(tables)
    return {"messages": [AIMessage(content=f"数据库中的表:{table_names}")]}


def get_schema(state: State) -> dict:
    """获取所有相关表的 DDL Schema"""
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = [row[0] for row in cursor.fetchall()]

    schema_parts = []
    for table in tables:
        # 获取表结构
        cursor.execute(f"PRAGMA table_info('{table}')")
        columns = cursor.fetchall()
        col_defs = ", ".join(f"{col[1]} {col[2]}" for col in columns)
        schema_parts.append(f"CREATE TABLE {table} ({col_defs});")

        # 获取示例数据(最多 3 行)
        cursor.execute(f"SELECT * FROM '{table}' LIMIT 3")
        rows = cursor.fetchall()
        if rows:
            schema_parts.append(f"-- 示例数据: {rows}")

    schema_text = "\n\n".join(schema_parts)
    return {"messages": [AIMessage(content=f"数据库 Schema:\n{schema_text}")]}


def generate_query(state: State) -> dict:
    """让 LLM 基于 Schema 信息和用户问题生成 SQL 查询"""
    messages = state["messages"]
    # 提取用户原始问题
    user_question = ""
    for msg in messages:
        if hasattr(msg, "type") and msg.type == "human":
            user_question = msg.content

    system_msg = SystemMessage(content=f"""你是一个 SQL 专家。根据以下数据库 Schema 编写 SQLite 语法兼容的 SELECT 查询。
只返回 SQL 语句,不要包含任何解释或 markdown 标记。确保只生成只读的 SELECT 语句。
用户问题:{user_question}""")

    # 将 Schema 信息和用户问题一起发送给 LLM
    schema_messages = [msg for msg in messages if hasattr(msg, "type") and msg.type == "ai" and "Schema" in str(msg.content)]
    response = llm.invoke(schema_messages + [system_msg])

    return {"messages": [AIMessage(content=f"生成的 SQL:\n```sql\n{response.content}\n```")]}


def check_query(state: State) -> dict:
    """校验 SQL 的安全性——只允许 SELECT 语句,阻止任何写操作"""
    messages = state["messages"]
    # 提取最后一条 AI 消息中的 SQL
    last_ai = messages[-1]
    sql_content = last_ai.content if hasattr(last_ai, "content") else ""

    # 提取 SQL 语句
    sql = ""
    if "```sql" in sql_content:
        sql = sql_content.split("```sql")[1].split("```")[0].strip()
    else:
        sql = sql_content.strip()

    # 禁止的关键词列表
    forbidden_keywords = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "CREATE", "TRUNCATE", "REPLACE"]
    sql_upper = sql.upper()

    for keyword in forbidden_keywords:
        if keyword in sql_upper:
            return {"messages": [AIMessage(
                content=f"❌ SQL 安全校验失败:检测到危险操作 `{keyword}`。只允许执行 SELECT 查询。"
            )]}

    # 简单语法检查
    if not sql_upper.strip().startswith("SELECT"):
        return {"messages": [AIMessage(
            content="❌ SQL 校验失败:语句不是以 SELECT 开头。只允许只读查询。"
        )]}

    return {"messages": [AIMessage(content=f"✅ SQL 校验通过,准备执行:\n{sql}")]}


def run_query(state: State) -> dict:
    """执行通过校验的 SQL 查询"""
    messages = state["messages"]
    # 提取最后一条消息中的 SQL
    last_msg = messages[-1]
    content = last_msg.content if hasattr(last_msg, "content") else ""

    # 提取 SQL 语句
    sql = ""
    if "准备执行:" in content:
        sql = content.split("准备执行:")[1].strip()
    else:
        return {"messages": [AIMessage(content="❌ 无法提取 SQL 语句")]}

    try:
        cursor.execute(sql)
        results = cursor.fetchall()
        # 获取列名
        col_names = [desc[0] for desc in cursor.description] if cursor.description else []
        formatted = f"查询结果(共 {len(results)} 行):\n列名:{col_names}\n"
        for i, row in enumerate(results, 1):
            formatted += f"  {i}. {row}\n"
        return {"messages": [AIMessage(content=formatted)]}
    except Exception as e:
        return {"messages": [AIMessage(content=f"❌ 查询执行失败:{e}")]}


def generate_report(state: State) -> dict:
    """基于查询结果生成分析报告"""
    messages = state["messages"]
    user_question = ""
    for msg in messages:
        if hasattr(msg, "type") and msg.type == "human":
            user_question = msg.content

    query_results = [msg for msg in messages if hasattr(msg, "type") and msg.type == "ai"
                     and "查询结果" in str(msg.content)]
    sql_messages = [msg for msg in messages if hasattr(msg, "type") and msg.type == "ai"
                    and "生成的 SQL" in str(msg.content)]

    context = f"用户问题:{user_question}\n\n"
    if sql_messages:
        context += f"{sql_messages[-1].content}\n\n"
    if query_results:
        context += f"{query_results[-1].content}"

    report_prompt = f"""{context}

请基于以上查询结果,撰写一份简洁的数据分析报告(中文)。包括:
1. 数据摘要
2. 关键发现
3. 建议(如有)"""

    response = llm.invoke([HumanMessage(content=report_prompt)])
    return {"messages": [AIMessage(content=f"📊 **数据分析报告**\n\n{response.content}")]}


def should_continue(state: State) -> Literal[END, "check_query"]:
    """条件路由:如果有 SQL 生成则进入校验,否则结束"""
    messages = state["messages"]
    last_message = messages[-1]
    content = last_message.content if hasattr(last_message, "content") else ""
    if "SELECT" in content.upper() and "生成的 SQL" in content:
        return "check_query"
    return END


# 构建图
builder = StateGraph(State)

builder.add_node("list_tables", list_tables)
builder.add_node("get_schema", get_schema)
builder.add_node("generate_query", generate_query)
builder.add_node("check_query", check_query)
builder.add_node("run_query", run_query)
builder.add_node("generate_report", generate_report)

builder.add_edge(START, "list_tables")
builder.add_edge("list_tables", "get_schema")
builder.add_edge("get_schema", "generate_query")
builder.add_conditional_edges("generate_query", should_continue)
builder.add_edge("check_query", "run_query")
builder.add_edge("run_query", "generate_report")
builder.add_edge("generate_report", END)

graph = builder.compile()

这段图的编译结果是一个可执行的 LangGraph 应用。六步流水线的设计体现了 LangGraph 官方 SQL Agent 教程中推荐的模式,在执行任何数据库操作之前,先通过 list_tables 了解数据库的拓扑结构,再通过 get_schema 获取相关表的详细定义,这两个步骤为 LLM 提供了生成正确 SQL 所需的最小上下文。generate_query 节点在 SystemMessage 中明确指示 LLM 只生成 SELECT 语句并且只返回 SQL 文本本身,避免模型在 SQL 周围添加解释性文字。check_query 节点则是一个强制性的安全门,它检查 SQL 中是否包含 DROPDELETEUPDATE 等危险关键词,也验证语句是否以 SELECT 开头,任何不符合要求的 SQL 都会在这里被拦截而不是被送到数据库执行。这是构建生产级 SQL Agent 时必须坚守的红线,LangChain 官方文档也明确指出"确保数据库连接权限始终被尽可能地缩小范围"是降低模型驱动系统风险的关键措施。

数据可视化工具

SQL 查询返回的是数字和文本的表格结构,但人类的认知系统对视觉模式的敏感度远高于对原始数据的敏感度。一个柱状图中的高度差异比一组并列的数字更能让人瞬间理解销量排名,一条折线的走势比按时间排列的数值列表更能传达趋势信息。因此数据可视化是数据分析 Agent 不可或缺的下游能力。

在我们的系统中,可视化工具被设计为一个独立的 @tool 函数,它接受查询结果和图表类型作为输入,使用 matplotlib 生成图表并保存为 PNG 文件,最后将文件路径返回给 Agent。将可视化封装为独立工具有两个好处,一方面 Agent 可以根据数据特征自主选择合适的图表类型,另一方面绘图代码的执行可以被限制在隔离的环境中。在 LangChain 官方的 Deep Agents 数据分分析教程中,可视化甚至被提升为一个单独的 visualizer 子 Agent,通过 SubAgentMiddleware 在主 Agent 的上下文中隔离执行,避免大量中间输出污染主 Agent 的推理窗口。

import matplotlib
matplotlib.use("Agg")  # 使用非 GUI 后端,适合无头环境和服务器部署
import matplotlib.pyplot as plt
from langchain.tools import tool
import numpy as np


@tool(parse_docstring=True)
def generate_chart(query_result: str, chart_type: str, title: str = "Data Analysis") -> str:
    """根据查询结果生成可视化图表并保存为 PNG 文件。

    Args:
        query_result: 数据库查询返回的格式化结果文本
        chart_type: 图表类型,可选值:bar(柱状图)、line(折线图)、pie(饼图)
        title: 图表标题
    """
    # 解析查询结果中的数据行
    lines = query_result.strip().split("\n")
    data_lines = [l for l in lines if l.strip() and l.strip()[0].isdigit()]
    parsed = []
    for line in data_lines:
        # 提取形如 "1. (...)" 的行
        if ". (" in line:
            content = line.split(". (", 1)[1].rstrip(")")
            parts = content.split(",")
            # 尝试解析为 (类别, 数值) 的二元组
            if len(parts) >= 2:
                label = parts[0].strip().strip("'")
                try:
                    value = float(parts[1].strip())
                except ValueError:
                    # 如果第二列不是数字,尝试第三列
                    if len(parts) >= 3:
                        try:
                            value = float(parts[2].strip())
                            label = parts[1].strip().strip("'")
                        except ValueError:
                            continue
                    else:
                        continue
                parsed.append((label, value))

    if not parsed:
        return "❌ 无法从查询结果中解析出可视化数据,请确保结果包含可绘制的数值列。"

    labels = [p[0] for p in parsed]
    values = [p[1] for p in parsed]

    fig, ax = plt.subplots(figsize=(8, 5))

    if chart_type == "bar":
        colors = plt.cm.Set3(np.linspace(0, 1, len(labels)))
        bars = ax.bar(labels, values, color=colors)
        # 在柱顶标注数值
        for bar, val in zip(bars, values):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(values)*0.01,
                    f"{val:,.1f}", ha="center", va="bottom", fontsize=9)

    elif chart_type == "line":
        ax.plot(labels, values, marker="o", linewidth=2, markersize=8, color="#2196F3")
        ax.fill_between(range(len(labels)), values, alpha=0.1, color="#2196F3")
        for i, (x, y) in enumerate(zip(range(len(labels)), values)):
            ax.text(x, y + max(values)*0.02, f"{y:,.1f}", ha="center", fontsize=9)

    elif chart_type == "pie":
        wedges, texts, autotexts = ax.pie(values, labels=labels, autopct="%1.1f%%",
                                           colors=plt.cm.Set3(np.linspace(0, 1, len(labels))))
        for t in autotexts:
            t.set_fontsize(10)

    else:
        plt.close(fig)
        return f"❌ 不支持的图表类型:{chart_type},可选值:bar、line、pie"

    ax.set_title(title, fontsize=14, fontweight="bold")
    ax.set_ylabel("数值" if chart_type != "pie" else "")
    # 旋转长标签避免重叠
    if chart_type != "pie":
        plt.xticks(rotation=30, ha="right")

    plt.tight_layout()

    filepath = f"./chart_{chart_type}_{np.random.randint(1000, 9999)}.png"
    fig.savefig(filepath, dpi=150, bbox_inches="tight")
    plt.close(fig)

    return f"✅ 图表已生成:{filepath}(类型:{chart_type},数据点:{len(parsed)} 个)"

generate_chart 工具的设计中有一个值得注意的细节,即数据解析部分。数据库查询结果通过 cursor.fetchall() 返回的是一个元组列表,在 run_query 节点中被格式化为带有行号和列名的文本。图表工具需要反向解析这个文本格式,提取出标签和数值的配对。这段解析代码使用了简单的字符串分割和类型推断,虽然在实际项目中更好的做法是在节点间传递结构化的 JSON 数据而非文本,但文本格式的优点是 LLM 可以直接阅读和理解中间结果,便于调试时通过 LangSmith 或终端日志追踪数据流转的每一步。

图表样式方面,我们为三种图表类型分别设计了视觉效果。柱状图使用 Set3 色板为每个柱子分配不同的颜色,并在柱顶标注精确数值,帮助读者在视觉比较之外还能获取精确的数据。折线图在数据点下方添加了半透明的填充区域,这种设计源自数据可视化中的"面积图"变体,能让趋势的幅度变化更加直观。饼图使用 autopct="%1.1f%%" 自动计算并显示每个扇区的百分比。所有图表在保存时都使用 150 DPI 的分辨率,在文件大小和清晰度之间取得平衡。matplotlib.use("Agg") 的前置设置至关重要,Agg 是 matplotlib 的非交互式后端,它不依赖任何 GUI 框架,因此在服务器环境、Docker 容器或无桌面的 Linux 系统中都能正常工作。

整合为端到端 Agent

有了工作流图、数据可视化工具和 LLM 实例,下一步是将它们整合为一个完整的端到端数据分析 Agent。我们使用 create_agent 函数创建一个具备工具调用能力的高级 Agent,将 LangGraph 的 SQL 工作流图和可视化工具一起注册为 Agent 可用的工具集。系统提示词中明确规定 Agent 的行为规则,先通过 SQL 工作流获取数据,再根据数据特征选择合适的可视化方式,最后输出完整的分析报告。

from langchain.agents import create_agent
from langchain_core.messages import HumanMessage


# 将 LangGraph 工作流封装为一个可调用的工具
@tool(parse_docstring=True)
def sql_analysis_workflow(question: str) -> str:
    """对数据库执行自然语言数据分析。将用户的数据分析问题转化为 SQL 查询、
    执行查询并返回格式化的结果。

    Args:
        question: 用户的数据分析问题,如"各城市的订单总金额是多少"
    """
    result = graph.invoke({"messages": [HumanMessage(content=question)]})
    # 提取最终报告
    for msg in reversed(result["messages"]):
        if hasattr(msg, "type") and msg.type == "ai" and "📊" in str(msg.content):
            return msg.content
    # 如果没有报告则返回最后一条 AI 消息
    for msg in reversed(result["messages"]):
        if hasattr(msg, "type") and msg.type == "ai":
            return msg.content
    return "分析完成"


system_prompt = """你是一位资深的数据分析师。你的工作流程如下:
1. 当用户提出数据分析问题时,使用 `sql_analysis_workflow` 工具将问题转换为 SQL 查询并获取结果。
2. 根据查询结果的数据特征,判断适合的图表类型(数值对比用柱状图、趋势变化用折线图、占比分布用饼图)。
3. 使用 `generate_chart` 工具生成可视化图表。
4. 最后用自然语言总结分析发现,将图表文件路径一并呈现给用户。

注意事项:
- 始终先查询再绘图,不要在没有数据的情况下猜测图表内容。
- 如果查询结果为空,诚实地告诉用户数据不足,不要编造分析结论。
- 图表标题应使用中文,清晰概括分析主题。
"""

agent = create_agent(
    model=llm,
    tools=[sql_analysis_workflow, generate_chart],
    system_prompt=system_prompt,
)

这里有一个关键的设计决策,LangGraph 的 SQL 工作流本身是一个多步骤的状态图,但我们通过 @tool 装饰器将其封装为一个工具对外暴露。这个封装的好处是让高级 Agent 不必关心 SQL 工作流的内部细节——表名列表、Schema 获取、查询校验这些步骤全部在工具内部完成,高级 Agent 只需要发起一个自然语言问题并接收分析结果。这种分层设计体现了 LangChain 生态中"组合优于继承"的理念,也呼应了官方 Deep Agents 教程中将可视化作为子 Agent 隔离执行的思路。

高级 Agent 的系统提示词中明确写明了一个决策链条,先 SQL 分析,再判断图表类型,再绘图,最后总结。这个顺序约束确保了 Agent 不会在数据尚未获取时就尝试绘图,也不会在绘图完成后遗漏了总结分析环节。在实际运行中,create_agent 内置的 ReAct 推理循环会让模型在每一步工具调用之后重新评估当前状态,如果它发现查询结果适合用饼图展示而饼图尚未生成,它会发出 generate_charttool_call,在图表生成完成后再向用户输出最终回复。

以下是使用 Agent 进行完整数据分析的端到端示例:

# 示例一:分析各城市订单数量
question1 = "统计每个城市的订单总数,用柱状图展示"
result1 = agent.invoke({"messages": [HumanMessage(content=question1)]})
print(result1["messages"][-1].content)

# 示例二:分析各品类销售额占比
question2 = "统计每个产品品类的销售总额,用饼图展示占比"
result2 = agent.invoke({"messages": [HumanMessage(content=question2)]})
print(result2["messages"][-1].content)

# 示例三:趋势分析
question3 = "VIP 用户和普通用户各消费了多少,用柱状图对比"
result3 = agent.invoke({"messages": [HumanMessage(content=question3)]})
print(result3["messages"][-1].content)

三个示例问题覆盖了不同的分析维度。第一个问题涉及订单表和用户表之间的 JOIN 操作,需要按城市分组并计数。第二个问题需要联查订单表和商品表,按品类分组后对金额求和并计算百分比。第三个问题则需要先通过用户表的 vip_level 字段筛选出不同等级的用户,再与订单表联查求和。这三个问题的 SQL 复杂度递增,从简单的单表聚合到多表关联加条件筛选,构成了一个自然的难度梯度,正好对应考核要求中"用 3 个不同复杂度的自然语言问题测试"这一项。

数据安全深度分析

数据安全是数据分析 Agent 生产化过程中不可回避的话题。任何一个接受自然语言输入并自动生成 SQL 的系统都面临两个核心安全威胁:SQL 注入攻击和敏感数据泄露。SQL 注入是注入攻击在数据库领域的体现——如果攻击者在自然语言问题中嵌入了恶意的 SQL 片段,而系统不加过滤地将模型生成的 SQL 直接拼接执行,数据库就可能遭遇破坏或数据窃取。更隐蔽但同样危险的威胁来自敏感数据泄露,例如当用户询问"显示所有用户的密码"时,一个缺乏权限控制的 Agent 可能忠实地执行这个查询并返回隐私数据。

在我们的系统中,安全防护是一个多层纵深体系。第一层防护位于 check_query 节点,它充当一个语法级的防火墙,阻截一切非 SELECT 语句和包含危险关键词的查询。这个节点的关键词黑名单覆盖了 SQLite 中所有可能修改数据库状态的操作,任何包含 DROPDELETEUPDATEINSERTALTER 等关键词的 SQL 都会被当场拒绝。第二层防护在于数据库权限的窄化,在生产环境中,数据分析 Agent 应该使用一个只具有 SELECT 权限的数据库用户连接,即使校验节点被意外绕过,数据库本身的权限机制也会阻止写操作。第三层防护是在工具设计中刻意不暴露数据库凭证和连接字符串给 LLM——在我们的代码中,数据库连接由 query_database 工具在服务端持有,LLM 只能通过工具接口发起查询请求但永远无法获取底层的连接对象或连接字符串。

更高级的安全实践还包括 SQL 查询的脱敏化处理。在 run_query 节点返回查询结果之前,可以对结果集执行一轮数据脱敏检查,例如识别和遮蔽手机号、身份证号、邮箱地址等个人信息模式。此外,LangSmith 的追踪功能也应该在安全审计中发挥作用,通过回放 Agent 执行的每一步操作,安全团队可以事后审查是否有任何未经授权的数据访问行为发生。LangChain 官方在 SQL Agent 教程中特别强调:构建 SQL 数据库的问答系统时执行模型生成的 SQL 查询存在固有风险,确保数据库连接权限始终被尽可能地缩小范围是降低风险的关键措施,但并不能完全消除风险。

练习任务

  • 构建端到端数据分析 Agent
  • 实现自然语言到 SQL 的转换
  • 集成 Matplotlib 图表生成

考核点 ✅

  1. 端到端流程:提交"提问→SQL→查询→图表→报告"的完整运行日志
  2. NL2SQL 验证:用 3 个不同复杂度的自然语言问题测试 SQL 生成准确率
  3. 可视化产出:提交至少 2 种图表类型(柱状图/折线图/饼图)的生成截图
  4. 数据安全:口头解释如何防止 SQL 注入和敏感数据泄露
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐