🎯 系列导读:在前面的章节中,我们探讨了Agent的感知、记忆和规划模块。本篇将深入Agent的"手脚"——行动模块,了解Agent如何通过工具调用和具身执行与真实世界交互。


📑 目录


1. 引言:从思考到行动

在这里插入图片描述

在人工智能的发展历程中,我们见证了语言模型从简单的文本生成演进到复杂的推理和规划。然而,一个真正智能的Agent不仅需要"思考",更需要"行动"——将其认知转化为对世界的实际影响。

💡 思考:为什么说行动能力是Agent从"智能助手"跃迁为"智能代理"的关键?

🤔 解答:传统的语言模型就像一位博学的顾问,能够提供建议但无法执行。而具备行动能力的Agent则像一位称职的助理,不仅能理解需求、制定计划,还能亲自完成任务。这种从"说"到"做"的转变,正是Agent革命的核心所在。

让我们先看一个直观的对比:

传统 LLM 交互:
用户:帮我查询北京今天的天气
LLM:您可以访问weather.com查询,或者使用手机天气应用...

具备行动能力的 Agent:
用户:帮我查询北京今天的天气
Agent:[调用天气API] 北京今天晴,气温 -2°C 到 8°C,
       空气质量良好,适合户外活动。

这个简单的例子揭示了行动模块的本质价值:将语言理解转化为实际操作,将抽象意图转化为具体结果

本文将系统性地探讨Agent行动模块的设计与实现,从工具使用的理论基础(Toolformer)到具身智能的前沿实践(SayCan),从API调用的工程细节到安全沙箱的防护机制,为读者呈现一幅完整的技术图景。


2. 行动模块概述

2.1 什么是行动模块

行动模块(Action Module)是Agent系统中负责执行具体操作的核心组件。如果将Agent比作人类,那么感知模块是"眼睛和耳朵",记忆模块是"大脑的存储区",规划模块是"前额叶皮层",而行动模块就是"手脚"——将意图转化为行为的执行器。

┌─────────────────────────────────────────────────────────────────┐
│                        Agent 系统架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│    ┌──────────┐    ┌──────────┐    ┌──────────┐                │
│    │ 感知模块 │───▶│ 规划模块 │───▶│ 行动模块 │                │
│    │ (Input)  │    │(Planning)│    │ (Action) │                │
│    └──────────┘    └────┬─────┘    └────┬─────┘                │
│                         │               │                       │
│                    ┌────▼────┐          │                       │
│                    │记忆模块 │◀─────────┘                       │
│                    │(Memory) │                                  │
│                    └─────────┘                                  │
│                                                                  │
│    ┌─────────────────────────────────────────────────────┐      │
│    │                    行动模块详解                       │      │
│    ├─────────────────────────────────────────────────────┤      │
│    │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌────────┐ │      │
│    │  │工具调用 │  │API请求  │  │代码执行 │  │具身控制│ │      │
│    │  │ Tools   │  │  APIs   │  │  Code   │  │Embodied│ │      │
│    │  └────┬────┘  └────┬────┘  └────┬────┘  └───┬────┘ │      │
│    │       │            │            │           │       │      │
│    │       └────────────┴─────┬──────┴───────────┘       │      │
│    │                          │                          │      │
│    │                   ┌──────▼──────┐                   │      │
│    │                   │  安全沙箱    │                   │      │
│    │                   │  (Sandbox)  │                   │      │
│    │                   └─────────────┘                   │      │
│    └─────────────────────────────────────────────────────┘      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

2.2 行动模块的核心能力

行动模块需要具备以下核心能力:

能力维度 描述 典型应用场景
工具调用 使用预定义的工具完成特定任务 计算器、搜索引擎、日历管理
API交互 与外部服务进行数据交换 天气查询、地图导航、支付处理
代码执行 动态生成并运行代码 数据分析、图表生成、自动化脚本
具身控制 操控物理设备或机器人 智能家居、工业机器人、自动驾驶
多模态输出 生成图像、音频、视频等内容 图像生成、语音合成、视频编辑

💡 思考:这些能力之间是否存在层次关系?

🤔 解答:确实存在。我们可以将这些能力按照抽象程度分为三个层次:

                    抽象层次金字塔
                    
                         /\
                        /  \
                       / 具身 \          Layer 3: 物理世界交互
                      / 控制   \         需要感知-决策-执行闭环
                     /──────────\
                    /            \
                   /  代码执行    \      Layer 2: 动态能力扩展
                  /   (Sandbox)    \     需要运行时环境支持
                 /──────────────────\
                /                    \
               /   工具调用 & API     \   Layer 1: 基础能力层
              /      (Predefined)      \  需要接口定义和权限
             /──────────────────────────\

2.3 行动模块架构设计

一个生产级的行动模块需要考虑以下架构要素:

┌─────────────────────────────────────────────────────────────────────┐
│                       行动模块架构详图                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐                                                    │
│  │  规划模块   │                                                    │
│  │  (Planner)  │                                                    │
│  └──────┬──────┘                                                    │
│         │ Action Request                                            │
│         ▼                                                           │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    行动解析器 (Action Parser)                  │  │
│  │  ┌──────────────────────────────────────────────────────────┐│  │
│  │  │ Input: "search_web(query='AI agents')"                   ││  │
│  │  │ Output: {action: 'search_web', params: {query: '...'}}   ││  │
│  │  └──────────────────────────────────────────────────────────┘│  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│         ┌───────────────────┼───────────────────┐                  │
│         ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │  工具路由器  │    │  API网关    │    │ 代码执行器  │            │
│  │ Tool Router │    │ API Gateway │    │Code Executor│            │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘            │
│         │                  │                  │                    │
│         ▼                  ▼                  ▼                    │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │                    安全沙箱层 (Security Sandbox)              │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        │  │
│  │  │权限检查 │──│资源隔离 │──│执行监控 │──│审计日志 │        │  │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘        │  │
│  └──────────────────────────┬──────────────────────────────────┘  │
│                             │                                       │
│         ┌───────────────────┼───────────────────┐                  │
│         ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │ 本地工具集  │    │  外部API    │    │ 运行时环境  │            │
│  │Local Tools │    │External APIs│    │  Runtime    │            │
│  └─────────────┘    └─────────────┘    └─────────────┘            │
│                                                                      │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    结果聚合器 (Result Aggregator)              │  │
│  │  • 结果格式化  • 错误处理  • 重试机制  • 结果缓存              │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

下面是行动模块的核心接口定义:

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class ActionType(Enum):
    """行动类型枚举"""
    TOOL_CALL = "tool_call"
    API_REQUEST = "api_request"
    CODE_EXECUTION = "code_execution"
    EMBODIED_ACTION = "embodied_action"

@dataclass
class ActionRequest:
    """行动请求数据结构"""
    action_type: ActionType
    action_name: str
    parameters: Dict[str, Any]
    context: Optional[Dict[str, Any]] = None
    timeout: float = 30.0
    retry_count: int = 3

@dataclass
class ActionResult:
    """行动结果数据结构"""
    success: bool
    data: Any
    error: Optional[str] = None
    execution_time: float = 0.0
    metadata: Optional[Dict[str, Any]] = None

class ActionExecutor(ABC):
    """行动执行器抽象基类"""
    
    @abstractmethod
    async def execute(self, request: ActionRequest) -> ActionResult:
        """执行行动"""
        pass
    
    @abstractmethod
    def validate(self, request: ActionRequest) -> bool:
        """验证行动请求"""
        pass
    
    @abstractmethod
    def get_capabilities(self) -> List[str]:
        """获取支持的能力列表"""
        pass

class ActionModule:
    """行动模块主类"""
    
    def __init__(self):
        self.executors: Dict[ActionType, ActionExecutor] = {}
        self.sandbox = SecuritySandbox()
        self.logger = ActionLogger()
    
    def register_executor(self, action_type: ActionType, 
                         executor: ActionExecutor):
        """注册行动执行器"""
        self.executors[action_type] = executor
    
    async def execute_action(self, request: ActionRequest) -> ActionResult:
        """执行行动的主入口"""
        # 1. 安全检查
        if not self.sandbox.check_permission(request):
            return ActionResult(
                success=False, 
                data=None, 
                error="Permission denied"
            )
        
        # 2. 获取执行器
        executor = self.executors.get(request.action_type)
        if not executor:
            return ActionResult(
                success=False, 
                data=None, 
                error=f"No executor for {request.action_type}"
            )
        
        # 3. 验证请求
        if not executor.validate(request):
            return ActionResult(
                success=False, 
                data=None, 
                error="Invalid request"
            )
        
        # 4. 在沙箱中执行
        result = await self.sandbox.run(
            executor.execute, 
            request
        )
        
        # 5. 记录日志
        self.logger.log(request, result)
        
        return result

3. 工具使用:Toolformer 与工具增强学习

3.1 Toolformer 论文解读

2023年,Meta AI发布的Toolformer论文开创了语言模型自主学习使用工具的新范式。这篇工作的核心贡献在于:让语言模型自己学会何时以及如何调用外部工具,而无需大量人工标注数据

💡 思考:为什么Toolformer如此重要?传统的工具调用方式有什么局限?

🤔 解答:传统方法通常依赖于:

  1. 硬编码规则:if “天气” in query then call weather_api() — 缺乏泛化能力
  2. 监督学习:需要大量人工标注的工具调用数据 — 成本高昂
  3. 强化学习:需要精心设计的奖励函数 — 调试困难

Toolformer的创新之处在于利用语言模型自身的能力来生成训练数据,实现了自我监督的工具学习。

Toolformer 核心思想:

┌─────────────────────────────────────────────────────────────────────┐
│                      Toolformer 训练流程                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  Step 1: 采样 API 调用                                               │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │ 输入: "The Eiffel Tower is located in [MASK] and was built in" │ │
│  │                                                                 │ │
│  │ LM生成候选调用:                                                 │ │
│  │   • [QA("Where is Eiffel Tower")] → Paris                      │ │
│  │   • [Calculator(1889-0)] → 1889                                │ │
│  │   • [Search("Eiffel Tower location")] → Paris, France          │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                              │                                       │
│                              ▼                                       │
│  Step 2: 执行 API 并获取结果                                         │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │ [QA("Where is Eiffel Tower")] → "Paris"                        │ │
│  │ [Calculator(1889-0)] → "1889"                                  │ │
│  │ [Search("Eiffel Tower")] → "Paris, France, 1887-1889"          │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                              │                                       │
│                              ▼                                       │
│  Step 3: 过滤有用的 API 调用                                         │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │ 评估标准: L(with API result) < L(without API) - threshold      │ │
│  │                                                                 │ │
│  │ 如果API调用降低了模型的困惑度(perplexity),则保留该调用         │ │
│  │                                                                 │ │
│  │ 保留: [Search("Eiffel Tower")] ✓                               │ │
│  │ 丢弃: [Calculator(1889-0)] ✗ (对预测下文帮助不大)              │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                              │                                       │
│                              ▼                                       │
│  Step 4: 微调模型                                                    │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │ 原始文本:                                                       │ │
│  │ "The Eiffel Tower is located in Paris and was built..."        │ │
│  │                                                                 │ │
│  │ 增强文本:                                                       │ │
│  │ "The Eiffel Tower is located in [Search("Eiffel Tower")]       │ │
│  │  →Paris, France] Paris and was built..."                       │ │
│  │                                                                 │ │
│  │ 使用增强文本微调LM,使其学会在适当位置插入API调用               │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Toolformer 的数学形式化:

x = ( x 1 , . . . , x n ) x = (x_1, ..., x_n) x=(x1,...,xn) 为输入序列, c = ( a i , r i ) c = (a_i, r_i) c=(ai,ri) 为位置 i i i 的API调用及其结果。定义:

L i + ( c ) = − ∑ j = i n log ⁡ p ( x j ∣ x 1 : i − 1 , c , x i : j − 1 ) L_i^+(c) = -\sum_{j=i}^{n} \log p(x_j | x_{1:i-1}, c, x_{i:j-1}) Li+(c)=j=inlogp(xjx1:i1,c,xi:j1)

L i − = − ∑ j = i n log ⁡ p ( x j ∣ x 1 : j − 1 ) L_i^- = -\sum_{j=i}^{n} \log p(x_j | x_{1:j-1}) Li=j=inlogp(xjx1:j1)

过滤条件:当 L i − − L i + ( c ) ≥ τ L_i^- - L_i^+(c) \geq \tau LiLi+(c)τ 时,保留该API调用。

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Tuple, Callable
import re

class ToolformerTrainer:
    """Toolformer 训练器简化实现"""
    
    def __init__(self, model_name: str, tools: dict):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(model_name)
        self.tools = tools  # {tool_name: callable}
        self.threshold = 0.5  # 过滤阈值
        
    def sample_api_calls(self, text: str, 
                         position: int) -> List[Tuple[str, str]]:
        """
        在指定位置采样可能的API调用
        返回: [(api_call_string, result), ...]
        """
        candidates = []
        
        # 构建提示,让模型生成可能的API调用
        prompt = f"""Given the text: "{text[:position]}"
        What API call would be helpful here?
        Available APIs: {list(self.tools.keys())}
        Generate API call:"""
        
        inputs = self.tokenizer(prompt, return_tensors="pt")
        
        # 生成多个候选
        outputs = self.model.generate(
            **inputs,
            num_return_sequences=5,
            max_new_tokens=50,
            do_sample=True,
            temperature=0.7
        )
        
        for output in outputs:
            api_call = self.tokenizer.decode(output, skip_special_tokens=True)
            # 解析并执行API调用
            result = self._execute_api_call(api_call)
            if result:
                candidates.append((api_call, result))
        
        return candidates
    
    def _execute_api_call(self, api_call: str) -> str:
        """解析并执行API调用"""
        # 简化的解析逻辑
        match = re.match(r'(\w+)\((.*)\)', api_call)
        if match:
            tool_name, args = match.groups()
            if tool_name in self.tools:
                try:
                    return str(self.tools[tool_name](args))
                except Exception as e:
                    return None
        return None
    
    def compute_loss_with_api(self, text: str, position: int, 
                              api_call: str, result: str) -> float:
        """计算带API调用的损失"""
        # 插入API调用和结果
        augmented_text = (
            text[:position] + 
            f" [{api_call}{result}] " + 
            text[position:]
        )
        
        inputs = self.tokenizer(augmented_text, return_tensors="pt")
        with torch.no_grad():
            outputs = self.model(**inputs, labels=inputs["input_ids"])
        
        return outputs.loss.item()
    
    def compute_loss_without_api(self, text: str) -> float:
        """计算不带API调用的损失"""
        inputs = self.tokenizer(text, return_tensors="pt")
        with torch.no_grad():
            outputs = self.model(**inputs, labels=inputs["input_ids"])
        
        return outputs.loss.item()
    
    def filter_useful_apis(self, text: str, 
                          candidates: List[Tuple[int, str, str]]) -> List:
        """
        过滤有用的API调用
        candidates: [(position, api_call, result), ...]
        """
        useful_apis = []
        base_loss = self.compute_loss_without_api(text)
        
        for position, api_call, result in candidates:
            loss_with_api = self.compute_loss_with_api(
                text, position, api_call, result
            )
            
            # 如果带API的损失显著降低,保留该调用
            if base_loss - loss_with_api >= self.threshold:
                useful_apis.append({
                    'position': position,
                    'api_call': api_call,
                    'result': result,
                    'loss_reduction': base_loss - loss_with_api
                })
        
        return useful_apis
    
    def create_training_example(self, text: str, 
                                useful_apis: List[dict]) -> str:
        """创建训练样本"""
        # 按位置排序,从后往前插入以保持位置正确
        sorted_apis = sorted(useful_apis, 
                           key=lambda x: x['position'], 
                           reverse=True)
        
        augmented = text
        for api in sorted_apis:
            pos = api['position']
            insertion = f" [{api['api_call']}{api['result']}] "
            augmented = augmented[:pos] + insertion + augmented[pos:]
        
        return augmented


# 使用示例
def calculator(expr: str) -> float:
    """简单计算器工具"""
    try:
        return eval(expr)  # 生产环境需要安全的表达式求值
    except:
        return None

def search(query: str) -> str:
    """模拟搜索工具"""
    # 实际应调用搜索API
    mock_results = {
        "Eiffel Tower": "Paris, France, completed in 1889",
        "population of Tokyo": "13.96 million (2021)",
    }
    return mock_results.get(query, "No results found")

# 初始化训练器
tools = {
    "Calculator": calculator,
    "Search": search
}
trainer = ToolformerTrainer("gpt2", tools)

3.2 工具调用的技术实现

在实际的Agent系统中,工具调用需要一套完整的技术栈来支撑:

┌─────────────────────────────────────────────────────────────────────┐
│                       工具调用技术栈                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    工具注册中心 (Tool Registry)               │   │
│  │  ┌─────────────────────────────────────────────────────────┐│   │
│  │  │ {                                                        ││   │
│  │  │   "calculator": {                                        ││   │
│  │  │     "description": "执行数学计算",                       ││   │
│  │  │     "parameters": {                                      ││   │
│  │  │       "expression": {"type": "string", "required": true} ││   │
│  │  │     },                                                   ││   │
│  │  │     "returns": "number",                                 ││   │
│  │  │     "examples": ["calculator('2+2')", ...]               ││   │
│  │  │   },                                                     ││   │
│  │  │   "web_search": {...},                                   ││   │
│  │  │   "send_email": {...}                                    ││   │
│  │  │ }                                                        ││   │
│  │  └─────────────────────────────────────────────────────────┘│   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    工具选择器 (Tool Selector)                 │   │
│  │                                                              │   │
│  │  输入: 用户意图 + 工具描述                                    │   │
│  │  输出: 最匹配的工具及参数                                     │   │
│  │                                                              │   │
│  │  策略:                                                        │   │
│  │  • 语义匹配: 基于embedding的相似度计算                       │   │
│  │  • Few-shot: 基于示例的上下文学习                            │   │
│  │  • Fine-tuned: 微调的工具选择模型                            │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    参数提取器 (Parameter Extractor)           │   │
│  │                                                              │   │
│  │  从自然语言中提取工具所需的参数                               │   │
│  │                                                              │   │
│  │  示例:                                                        │   │
│  │  "帮我搜索最近的AI新闻" →                                     │   │
│  │  {tool: "web_search", params: {query: "最近的AI新闻"}}       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    工具执行器 (Tool Executor)                 │   │
│  │                                                              │   │
│  │  ┌──────────┐    ┌──────────┐    ┌──────────┐              │   │
│  │  │参数验证  │───▶│权限检查  │───▶│安全执行  │              │   │
│  │  └──────────┘    └──────────┘    └──────────┘              │   │
│  │                                       │                     │   │
│  │                              ┌────────┴────────┐            │   │
│  │                              ▼                 ▼            │   │
│  │                        ┌──────────┐     ┌──────────┐       │   │
│  │                        │ 成功结果 │     │ 错误处理 │       │   │
│  │                        └──────────┘     └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

以下是一个完整的工具系统实现:

from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union
import json
import asyncio
from pydantic import BaseModel, validator
import numpy as np

# ============== 工具定义 ==============

@dataclass
class ToolParameter:
    """工具参数定义"""
    name: str
    type: str  # string, number, boolean, array, object
    description: str
    required: bool = True
    default: Any = None
    enum: Optional[List[Any]] = None

@dataclass
class Tool:
    """工具定义"""
    name: str
    description: str
    parameters: List[ToolParameter]
    function: Callable
    returns: str = "any"
    examples: List[str] = field(default_factory=list)
    category: str = "general"
    requires_confirmation: bool = False
    
    def to_schema(self) -> dict:
        """转换为JSON Schema格式(兼容OpenAI Function Calling)"""
        properties = {}
        required = []
        
        for param in self.parameters:
            properties[param.name] = {
                "type": param.type,
                "description": param.description
            }
            if param.enum:
                properties[param.name]["enum"] = param.enum
            if param.required:
                required.append(param.name)
        
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": properties,
                "required": required
            }
        }

# ============== 工具注册中心 ==============

class ToolRegistry:
    """工具注册中心"""
    
    def __init__(self):
        self._tools: Dict[str, Tool] = {}
        self._categories: Dict[str, List[str]] = {}
    
    def register(self, tool: Tool):
        """注册工具"""
        self._tools[tool.name] = tool
        
        if tool.category not in self._categories:
            self._categories[tool.category] = []
        self._categories[tool.category].append(tool.name)
    
    def get(self, name: str) -> Optional[Tool]:
        """获取工具"""
        return self._tools.get(name)
    
    def list_all(self) -> List[Tool]:
        """列出所有工具"""
        return list(self._tools.values())
    
    def list_by_category(self, category: str) -> List[Tool]:
        """按类别列出工具"""
        tool_names = self._categories.get(category, [])
        return [self._tools[name] for name in tool_names]
    
    def get_schemas(self) -> List[dict]:
        """获取所有工具的Schema"""
        return [tool.to_schema() for tool in self._tools.values()]
    
    def search(self, query: str, top_k: int = 5) -> List[Tool]:
        """
        搜索相关工具
        实际应用中可使用embedding进行语义搜索
        """
        # 简化实现:基于关键词匹配
        scores = []
        query_lower = query.lower()
        
        for tool in self._tools.values():
            score = 0
            # 名称匹配
            if query_lower in tool.name.lower():
                score += 10
            # 描述匹配
            for word in query_lower.split():
                if word in tool.description.lower():
                    score += 1
            scores.append((tool, score))
        
        # 按分数排序
        scores.sort(key=lambda x: x[1], reverse=True)
        return [tool for tool, score in scores[:top_k] if score > 0]

# ============== 工具执行器 ==============

class ToolExecutionError(Exception):
    """工具执行错误"""
    pass

class ToolExecutor:
    """工具执行器"""
    
    def __init__(self, registry: ToolRegistry):
        self.registry = registry
        self.execution_history: List[dict] = []
    
    def validate_parameters(self, tool: Tool, 
                           params: Dict[str, Any]) -> bool:
        """验证参数"""
        for param_def in tool.parameters:
            if param_def.required and param_def.name not in params:
                raise ToolExecutionError(
                    f"Missing required parameter: {param_def.name}"
                )
            
            if param_def.name in params:
                value = params[param_def.name]
                # 类型检查(简化版)
                type_map = {
                    'string': str,
                    'number': (int, float),
                    'boolean': bool,
                    'array': list,
                    'object': dict
                }
                expected_type = type_map.get(param_def.type)
                if expected_type and not isinstance(value, expected_type):
                    raise ToolExecutionError(
                        f"Parameter {param_def.name} should be {param_def.type}"
                    )
                
                # 枚举检查
                if param_def.enum and value not in param_def.enum:
                    raise ToolExecutionError(
                        f"Parameter {param_def.name} must be one of {param_def.enum}"
                    )
        
        return True
    
    async def execute(self, tool_name: str, 
                     params: Dict[str, Any]) -> Any:
        """执行工具"""
        tool = self.registry.get(tool_name)
        if not tool:
            raise ToolExecutionError(f"Tool not found: {tool_name}")
        
        # 参数验证
        self.validate_parameters(tool, params)
        
        # 填充默认值
        for param_def in tool.parameters:
            if param_def.name not in params and param_def.default is not None:
                params[param_def.name] = param_def.default
        
        # 执行
        try:
            if asyncio.iscoroutinefunction(tool.function):
                result = await tool.function(**params)
            else:
                result = tool.function(**params)
            
            # 记录历史
            self.execution_history.append({
                'tool': tool_name,
                'params': params,
                'result': result,
                'success': True
            })
            
            return result
            
        except Exception as e:
            self.execution_history.append({
                'tool': tool_name,
                'params': params,
                'error': str(e),
                'success': False
            })
            raise ToolExecutionError(f"Execution failed: {str(e)}")


# ============== 示例工具定义 ==============

def create_calculator_tool() -> Tool:
    """创建计算器工具"""
    def calculate(expression: str) -> float:
        # 安全的数学表达式求值
        import ast
        import operator
        
        operators = {
            ast.Add: operator.add,
            ast.Sub: operator.sub,
            ast.Mult: operator.mul,
            ast.Div: operator.truediv,
            ast.Pow: operator.pow,
            ast.USub: operator.neg,
        }
        
        def eval_expr(node):
            if isinstance(node, ast.Num):
                return node.n
            elif isinstance(node, ast.BinOp):
                return operators[type(node.op)](
                    eval_expr(node.left), 
                    eval_expr(node.right)
                )
            elif isinstance(node, ast.UnaryOp):
                return operators[type(node.op)](eval_expr(node.operand))
            else:
                raise TypeError(f"Unsupported type: {type(node)}")
        
        tree = ast.parse(expression, mode='eval')
        return eval_expr(tree.body)
    
    return Tool(
        name="calculator",
        description="执行数学计算,支持加减乘除和幂运算",
        parameters=[
            ToolParameter(
                name="expression",
                type="string",
                description="数学表达式,如 '2 + 3 * 4'"
            )
        ],
        function=calculate,
        returns="number",
        examples=["calculator('2 + 2')", "calculator('3.14 * 10 ** 2')"],
        category="math"
    )

def create_web_search_tool() -> Tool:
    """创建网页搜索工具"""
    async def web_search(query: str, num_results: int = 5) -> List[dict]:
        # 模拟搜索结果
        # 实际应调用搜索API(如Google、Bing等)
        return [
            {
                "title": f"Search result {i} for: {query}",
                "url": f"https://example.com/result{i}",
                "snippet": f"This is a snippet about {query}..."
            }
            for i in range(num_results)
        ]
    
    return Tool(
        name="web_search",
        description="搜索互联网获取相关信息",
        parameters=[
            ToolParameter(
                name="query",
                type="string",
                description="搜索关键词"
            ),
            ToolParameter(
                name="num_results",
                type="number",
                description="返回结果数量",
                required=False,
                default=5
            )
        ],
        function=web_search,
        returns="array",
        examples=["web_search('Python教程')", "web_search('今日新闻', 10)"],
        category="information"
    )


# ============== 使用示例 ==============

async def main():
    # 创建注册中心
    registry = ToolRegistry()
    
    # 注册工具
    registry.register(create_calculator_tool())
    registry.register(create_web_search_tool())
    
    # 创建执行器
    executor = ToolExecutor(registry)
    
    # 执行计算
    result = await executor.execute("calculator", {"expression": "2 + 3 * 4"})
    print(f"Calculator result: {result}")  # 14.0
    
    # 执行搜索
    results = await executor.execute("web_search", {"query": "AI agents"})
    print(f"Search results: {len(results)} items")

# asyncio.run(main())

3.3 工具选择与编排策略

当Agent面对复杂任务时,往往需要组合多个工具来完成。这就涉及到工具选择和编排的问题。

💡 思考:如何让Agent学会在正确的时机选择正确的工具?

🤔 解答:这涉及三个层面的设计:

  1. 工具选择:基于任务意图匹配最相关的工具
  2. 参数填充:从上下文中提取工具所需的参数
  3. 执行编排:处理工具之间的依赖关系和执行顺序
┌─────────────────────────────────────────────────────────────────────┐
│                       工具编排策略                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  策略1: 顺序执行 (Sequential)                                        │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Task: "搜索今日天气,然后根据天气推荐穿搭"                   │   │
│  │                                                              │   │
│  │  [Tool 1: weather_search] ──▶ [Tool 2: outfit_recommend]    │   │
│  │        "北京 晴 15°C"      ──▶    "推荐薄外套..."           │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  策略2: 并行执行 (Parallel)                                          │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Task: "同时查询北京、上海、广州的天气"                       │   │
│  │                                                              │   │
│  │       ┌── [weather_search(北京)] ──┐                        │   │
│  │       │                            │                        │   │
│  │  ─────┼── [weather_search(上海)] ──┼─────▶ [合并结果]       │   │
│  │       │                            │                        │   │
│  │       └── [weather_search(广州)] ──┘                        │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  策略3: 条件分支 (Conditional)                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Task: "如果明天下雨就提醒我带伞"                             │   │
│  │                                                              │   │
│  │  [weather_forecast] ──┬── if 雨 ──▶ [set_reminder]          │   │
│  │                       │                                     │   │
│  │                       └── else ──▶ [no_action]              │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  策略4: 循环执行 (Loop)                                              │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Task: "监控股票价格,跌破100元时提醒"                        │   │
│  │                                                              │   │
│  │  ┌──────────────────────────────────────┐                   │   │
│  │  │ while price > 100:                    │                   │   │
│  │  │   [get_stock_price] ──▶ check        │                   │   │
│  │  │   wait(interval)                      │                   │   │
│  │  │ [send_alert]                          │                   │   │
│  │  └──────────────────────────────────────┘                   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

以下是工具编排器的实现:

from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import asyncio

class OrchestrationStrategy(Enum):
    """编排策略"""
    SEQUENTIAL = "sequential"
    PARALLEL = "parallel"
    CONDITIONAL = "conditional"
    LOOP = "loop"

@dataclass
class ToolCall:
    """工具调用定义"""
    tool_name: str
    parameters: Dict[str, Any]
    output_key: str = "result"  # 存储结果的键名
    depends_on: Optional[List[str]] = None  # 依赖的输出键

@dataclass
class OrchestrationPlan:
    """编排计划"""
    strategy: OrchestrationStrategy
    calls: List[ToolCall]
    condition: Optional[str] = None  # 用于条件分支
    max_iterations: int = 100  # 用于循环

class ToolOrchestrator:
    """工具编排器"""
    
    def __init__(self, executor: ToolExecutor):
        self.executor = executor
        self.context: Dict[str, Any] = {}
    
    def _resolve_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """解析参数中的引用"""
        resolved = {}
        for key, value in params.items():
            if isinstance(value, str) and value.startswith("$"):
                # 引用上下文中的值
                ref_key = value[1:]
                if ref_key in self.context:
                    resolved[key] = self.context[ref_key]
                else:
                    raise ValueError(f"Reference not found: {ref_key}")
            else:
                resolved[key] = value
        return resolved
    
    async def execute_sequential(self, calls: List[ToolCall]) -> Dict[str, Any]:
        """顺序执行"""
        results = {}
        
        for call in calls:
            # 检查依赖
            if call.depends_on:
                for dep in call.depends_on:
                    if dep not in self.context:
                        raise ValueError(f"Dependency not satisfied: {dep}")
            
            # 解析参数
            params = self._resolve_parameters(call.parameters)
            
            # 执行
            result = await self.executor.execute(call.tool_name, params)
            
            # 存储结果
            self.context[call.output_key] = result
            results[call.output_key] = result
        
        return results
    
    async def execute_parallel(self, calls: List[ToolCall]) -> Dict[str, Any]:
        """并行执行"""
        tasks = []
        
        for call in calls:
            params = self._resolve_parameters(call.parameters)
            task = self.executor.execute(call.tool_name, params)
            tasks.append((call.output_key, task))
        
        results = {}
        gathered = await asyncio.gather(*[t[1] for t in tasks], 
                                        return_exceptions=True)
        
        for (key, _), result in zip(tasks, gathered):
            if isinstance(result, Exception):
                results[key] = {"error": str(result)}
            else:
                results[key] = result
                self.context[key] = result
        
        return results
    
    async def execute_conditional(self, plan: OrchestrationPlan) -> Dict[str, Any]:
        """条件执行"""
        # 评估条件
        condition_result = eval(plan.condition, {"context": self.context})
        
        if condition_result:
            # 执行为真分支(假设第一个调用是true分支)
            return await self.execute_sequential([plan.calls[0]])
        elif len(plan.calls) > 1:
            # 执行为假分支
            return await self.execute_sequential([plan.calls[1]])
        
        return {}
    
    async def execute_loop(self, plan: OrchestrationPlan) -> Dict[str, Any]:
        """循环执行"""
        results = []
        iteration = 0
        
        while iteration < plan.max_iterations:
            # 执行一次迭代
            iter_results = await self.execute_sequential(plan.calls)
            results.append(iter_results)
            
            # 检查退出条件
            if plan.condition:
                should_continue = eval(plan.condition, {"context": self.context})
                if not should_continue:
                    break
            
            iteration += 1
        
        return {"iterations": results, "count": iteration}
    
    async def execute(self, plan: OrchestrationPlan) -> Dict[str, Any]:
        """执行编排计划"""
        self.context = {}  # 重置上下文
        
        if plan.strategy == OrchestrationStrategy.SEQUENTIAL:
            return await self.execute_sequential(plan.calls)
        elif plan.strategy == OrchestrationStrategy.PARALLEL:
            return await self.execute_parallel(plan.calls)
        elif plan.strategy == OrchestrationStrategy.CONDITIONAL:
            return await self.execute_conditional(plan)
        elif plan.strategy == OrchestrationStrategy.LOOP:
            return await self.execute_loop(plan)
        else:
            raise ValueError(f"Unknown strategy: {plan.strategy}")


# ============== 编排计划生成器 ==============

class PlanGenerator:
    """
    根据自然语言任务生成编排计划
    实际应用中可使用LLM来生成
    """
    
    def __init__(self, registry: ToolRegistry):
        self.registry = registry
    
    def generate(self, task: str) -> OrchestrationPlan:
        """
        生成编排计划
        这里是简化实现,实际应使用LLM
        """
        # 示例:硬编码一些常见模式
        if "同时" in task or "并行" in task:
            # 并行模式
            return self._generate_parallel_plan(task)
        elif "如果" in task or "当" in task:
            # 条件模式
            return self._generate_conditional_plan(task)
        elif "监控" in task or "持续" in task:
            # 循环模式
            return self._generate_loop_plan(task)
        else:
            # 默认顺序模式
            return self._generate_sequential_plan(task)
    
    def _generate_sequential_plan(self, task: str) -> OrchestrationPlan:
        """生成顺序计划(示例)"""
        # 实际应使用LLM分析任务并匹配工具
        return OrchestrationPlan(
            strategy=OrchestrationStrategy.SEQUENTIAL,
            calls=[
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": task},
                    output_key="search_result"
                )
            ]
        )
    
    def _generate_parallel_plan(self, task: str) -> OrchestrationPlan:
        """生成并行计划(示例)"""
        return OrchestrationPlan(
            strategy=OrchestrationStrategy.PARALLEL,
            calls=[
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": "part1"},
                    output_key="result1"
                ),
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": "part2"},
                    output_key="result2"
                )
            ]
        )
    
    def _generate_conditional_plan(self, task: str) -> OrchestrationPlan:
        """生成条件计划(示例)"""
        return OrchestrationPlan(
            strategy=OrchestrationStrategy.CONDITIONAL,
            condition="context.get('check_result', False)",
            calls=[
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": "true branch"},
                    output_key="true_result"
                ),
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": "false branch"},
                    output_key="false_result"
                )
            ]
        )
    
    def _generate_loop_plan(self, task: str) -> OrchestrationPlan:
        """生成循环计划(示例)"""
        return OrchestrationPlan(
            strategy=OrchestrationStrategy.LOOP,
            condition="context.get('should_continue', True)",
            max_iterations=10,
            calls=[
                ToolCall(
                    tool_name="web_search",
                    parameters={"query": "monitor"},
                    output_key="monitor_result"
                )
            ]
        )

4. API 调用:连接数字世界

4.1 RESTful API 集成

API(Application Programming Interface)是Agent连接外部服务的桥梁。通过API,Agent可以获取实时数据、调用远程服务、与其他系统交互。

┌─────────────────────────────────────────────────────────────────────┐
│                      Agent API 集成架构                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                          ┌─────────────┐                            │
│                          │   Agent     │                            │
│                          │   Core      │                            │
│                          └──────┬──────┘                            │
│                                 │                                    │
│                                 ▼                                    │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                      API Gateway                              │  │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │  │
│  │  │ 认证管理    │ │ 限流控制    │ │ 请求路由    │            │  │
│  │  │ Auth Mgmt   │ │ Rate Limit  │ │ Routing     │            │  │
│  │  └─────────────┘ └─────────────┘ └─────────────┘            │  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│         ┌───────────────────┼───────────────────┐                  │
│         │                   │                   │                  │
│         ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │ Weather API │    │  Maps API   │    │ Payment API │            │
│  │             │    │             │    │             │            │
│  │ • 天气查询  │    │ • 地点搜索  │    │ • 支付处理  │            │
│  │ • 预报数据  │    │ • 路线规划  │    │ • 订单查询  │            │
│  │ • 历史数据  │    │ • 地理编码  │    │ • 退款处理  │            │
│  └─────────────┘    └─────────────┘    └─────────────┘            │
│         │                   │                   │                  │
│         └───────────────────┼───────────────────┘                  │
│                             │                                       │
│                             ▼                                       │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    Response Handler                           │  │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │  │
│  │  │ 数据解析    │ │ 错误处理    │ │ 结果缓存    │            │  │
│  │  │ Parsing     │ │ Error Hdl   │ │ Caching     │            │  │
│  │  └─────────────┘ └─────────────┘ └─────────────┘            │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

以下是一个通用的API客户端实现:

import aiohttp
import asyncio
from typing import Any, Dict, Optional, Union
from dataclasses import dataclass
from enum import Enum
import json
import time
import hashlib

class HTTPMethod(Enum):
    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
    PATCH = "PATCH"

@dataclass
class APIConfig:
    """API配置"""
    base_url: str
    api_key: Optional[str] = None
    timeout: float = 30.0
    max_retries: int = 3
    rate_limit: Optional[int] = None  # 每分钟请求数

@dataclass
class APIResponse:
    """API响应"""
    status_code: int
    data: Any
    headers: Dict[str, str]
    elapsed_time: float

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, requests_per_minute: int):
        self.requests_per_minute = requests_per_minute
        self.requests: List[float] = []
    
    async def acquire(self):
        """获取请求许可"""
        now = time.time()
        # 清理一分钟前的记录
        self.requests = [t for t in self.requests if now - t < 60]
        
        if len(self.requests) >= self.requests_per_minute:
            # 需要等待
            wait_time = 60 - (now - self.requests[0])
            await asyncio.sleep(wait_time)
        
        self.requests.append(time.time())

class APIClient:
    """通用API客户端"""
    
    def __init__(self, config: APIConfig):
        self.config = config
        self.session: Optional[aiohttp.ClientSession] = None
        self.rate_limiter = (
            RateLimiter(config.rate_limit) 
            if config.rate_limit else None
        )
        self.cache: Dict[str, tuple] = {}  # {cache_key: (response, timestamp)}
        self.cache_ttl = 300  # 缓存过期时间(秒)
    
    async def _ensure_session(self):
        """确保session存在"""
        if self.session is None or self.session.closed:
            self.session = aiohttp.ClientSession()
    
    def _build_headers(self, custom_headers: Optional[Dict] = None) -> Dict:
        """构建请求头"""
        headers = {
            "Content-Type": "application/json",
            "User-Agent": "AgentAPIClient/1.0"
        }
        
        if self.config.api_key:
            headers["Authorization"] = f"Bearer {self.config.api_key}"
        
        if custom_headers:
            headers.update(custom_headers)
        
        return headers
    
    def _get_cache_key(self, method: HTTPMethod, url: str, 
                       params: Optional[Dict] = None) -> str:
        """生成缓存键"""
        key_data = f"{method.value}:{url}:{json.dumps(params or {}, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def _get_from_cache(self, cache_key: str) -> Optional[APIResponse]:
        """从缓存获取"""
        if cache_key in self.cache:
            response, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_ttl:
                return response
            else:
                del self.cache[cache_key]
        return None
    
    def _set_cache(self, cache_key: str, response: APIResponse):
        """设置缓存"""
        self.cache[cache_key] = (response, time.time())
    
    async def request(
        self,
        method: HTTPMethod,
        endpoint: str,
        params: Optional[Dict] = None,
        data: Optional[Dict] = None,
        headers: Optional[Dict] = None,
        use_cache: bool = True
    ) -> APIResponse:
        """发送API请求"""
        await self._ensure_session()
        
        url = f"{self.config.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
        
        # 检查缓存(仅GET请求)
        if method == HTTPMethod.GET and use_cache:
            cache_key = self._get_cache_key(method, url, params)
            cached = self._get_from_cache(cache_key)
            if cached:
                return cached
        
        # 速率限制
        if self.rate_limiter:
            await self.rate_limiter.acquire()
        
        # 构建请求
        request_headers = self._build_headers(headers)
        
        # 重试逻辑
        last_error = None
        for attempt in range(self.config.max_retries):
            try:
                start_time = time.time()
                
                async with self.session.request(
                    method.value,
                    url,
                    params=params,
                    json=data,
                    headers=request_headers,
                    timeout=aiohttp.ClientTimeout(total=self.config.timeout)
                ) as response:
                    elapsed = time.time() - start_time
                    
                    response_data = await response.json()
                    
                    api_response = APIResponse(
                        status_code=response.status,
                        data=response_data,
                        headers=dict(response.headers),
                        elapsed_time=elapsed
                    )
                    
                    # 缓存成功响应
                    if method == HTTPMethod.GET and use_cache and response.status == 200:
                        self._set_cache(cache_key, api_response)
                    
                    return api_response
                    
            except asyncio.TimeoutError:
                last_error = "Request timeout"
            except aiohttp.ClientError as e:
                last_error = str(e)
            
            # 指数退避
            if attempt < self.config.max_retries - 1:
                await asyncio.sleep(2 ** attempt)
        
        raise Exception(f"API request failed after {self.config.max_retries} attempts: {last_error}")
    
    async def get(self, endpoint: str, params: Optional[Dict] = None, 
                 **kwargs) -> APIResponse:
        """GET请求"""
        return await self.request(HTTPMethod.GET, endpoint, params=params, **kwargs)
    
    async def post(self, endpoint: str, data: Optional[Dict] = None, 
                  **kwargs) -> APIResponse:
        """POST请求"""
        return await self.request(HTTPMethod.POST, endpoint, data=data, **kwargs)
    
    async def close(self):
        """关闭客户端"""
        if self.session:
            await self.session.close()


# ============== 具体API封装示例 ==============

class WeatherAPIClient(APIClient):
    """天气API客户端"""
    
    def __init__(self, api_key: str):
        super().__init__(APIConfig(
            base_url="https://api.openweathermap.org/data/2.5",
            api_key=api_key,
            rate_limit=60  # 每分钟60次
        ))
    
    async def get_current_weather(self, city: str) -> Dict:
        """获取当前天气"""
        response = await self.get("weather", params={
            "q": city,
            "appid": self.config.api_key,
            "units": "metric",
            "lang": "zh_cn"
        })
        
        if response.status_code == 200:
            data = response.data
            return {
                "city": data["name"],
                "temperature": data["main"]["temp"],
                "feels_like": data["main"]["feels_like"],
                "humidity": data["main"]["humidity"],
                "description": data["weather"][0]["description"],
                "wind_speed": data["wind"]["speed"]
            }
        else:
            raise Exception(f"Weather API error: {response.data}")
    
    async def get_forecast(self, city: str, days: int = 5) -> List[Dict]:
        """获取天气预报"""
        response = await self.get("forecast", params={
            "q": city,
            "appid": self.config.api_key,
            "units": "metric",
            "lang": "zh_cn",
            "cnt": days * 8  # 每天8个时间点
        })
        
        if response.status_code == 200:
            forecasts = []
            for item in response.data["list"]:
                forecasts.append({
                    "datetime": item["dt_txt"],
                    "temperature": item["main"]["temp"],
                    "description": item["weather"][0]["description"]
                })
            return forecasts
        else:
            raise Exception(f"Forecast API error: {response.data}")


# 使用示例
async def weather_example():
    client = WeatherAPIClient(api_key="your_api_key")
    
    try:
        weather = await client.get_current_weather("Beijing")
        print(f"北京天气: {weather['temperature']}°C, {weather['description']}")
        
        forecast = await client.get_forecast("Beijing", days=3)
        for f in forecast[:5]:
            print(f"  {f['datetime']}: {f['temperature']}°C")
    finally:
        await client.close()

4.2 Function Calling 机制

OpenAI在2023年推出的Function Calling机制极大地简化了LLM与工具的集成。这种机制让模型能够生成结构化的函数调用,而不是自由格式的文本。

💡 思考:Function Calling相比传统的prompt engineering有什么优势?

🤔 解答

  1. 结构化输出:返回JSON格式,便于解析和验证
  2. 类型安全:参数类型由schema定义,减少错误
  3. 可靠性高:模型经过专门训练,调用准确率更高
  4. 简化开发:无需复杂的prompt设计和输出解析
import openai
from typing import Any, Dict, List, Optional
import json

class FunctionCallingAgent:
    """基于Function Calling的Agent"""
    
    def __init__(self, api_key: str, model: str = "gpt-4"):
        self.client = openai.OpenAI(api_key=api_key)
        self.model = model
        self.functions = []
        self.function_handlers = {}
    
    def register_function(self, name: str, description: str,
                         parameters: Dict, handler: callable):
        """注册函数"""
        self.functions.append({
            "name": name,
            "description": description,
            "parameters": parameters
        })
        self.function_handlers[name] = handler
    
    def _execute_function(self, function_name: str, 
                          arguments: Dict) -> Any:
        """执行函数"""
        if function_name not in self.function_handlers:
            raise ValueError(f"Unknown function: {function_name}")
        
        handler = self.function_handlers[function_name]
        return handler(**arguments)
    
    def chat(self, user_message: str, 
            conversation_history: Optional[List[Dict]] = None) -> str:
        """对话接口"""
        messages = conversation_history or []
        messages.append({"role": "user", "content": user_message})
        
        # 第一次调用:获取模型响应
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            functions=self.functions,
            function_call="auto"
        )
        
        assistant_message = response.choices[0].message
        
        # 检查是否有函数调用
        while assistant_message.function_call:
            function_name = assistant_message.function_call.name
            arguments = json.loads(assistant_message.function_call.arguments)
            
            print(f"[Calling function: {function_name}({arguments})]")
            
            # 执行函数
            try:
                result = self._execute_function(function_name, arguments)
                function_response = json.dumps(result, ensure_ascii=False)
            except Exception as e:
                function_response = json.dumps({"error": str(e)})
            
            # 添加到对话历史
            messages.append({
                "role": "assistant",
                "content": None,
                "function_call": {
                    "name": function_name,
                    "arguments": json.dumps(arguments)
                }
            })
            messages.append({
                "role": "function",
                "name": function_name,
                "content": function_response
            })
            
            # 再次调用模型处理函数结果
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                functions=self.functions,
                function_call="auto"
            )
            
            assistant_message = response.choices[0].message
        
        return assistant_message.content


# ============== 使用示例 ==============

def get_weather(city: str, unit: str = "celsius") -> Dict:
    """获取天气(模拟)"""
    return {
        "city": city,
        "temperature": 22 if unit == "celsius" else 72,
        "unit": unit,
        "condition": "晴朗"
    }

def calculate(expression: str) -> Dict:
    """计算表达式"""
    try:
        result = eval(expression)  # 生产环境需要安全处理
        return {"expression": expression, "result": result}
    except Exception as e:
        return {"error": str(e)}

def search_web(query: str, num_results: int = 3) -> Dict:
    """搜索网页(模拟)"""
    return {
        "query": query,
        "results": [
            {"title": f"Result {i}", "url": f"https://example.com/{i}"}
            for i in range(num_results)
        ]
    }

# 创建Agent
agent = FunctionCallingAgent(api_key="your_key")

# 注册函数
agent.register_function(
    name="get_weather",
    description="获取指定城市的当前天气",
    parameters={
        "type": "object",
        "properties": {
            "city": {
                "type": "string",
                "description": "城市名称,如北京、上海"
            },
            "unit": {
                "type": "string",
                "enum": ["celsius", "fahrenheit"],
                "description": "温度单位"
            }
        },
        "required": ["city"]
    },
    handler=get_weather
)

agent.register_function(
    name="calculate",
    description="执行数学计算",
    parameters={
        "type": "object",
        "properties": {
            "expression": {
                "type": "string",
                "description": "数学表达式,如 2+3*4"
            }
        },
        "required": ["expression"]
    },
    handler=calculate
)

agent.register_function(
    name="search_web",
    description="搜索互联网获取信息",
    parameters={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "搜索关键词"
            },
            "num_results": {
                "type": "integer",
                "description": "返回结果数量,默认3"
            }
        },
        "required": ["query"]
    },
    handler=search_web
)

# 对话
# response = agent.chat("北京今天天气怎么样?另外帮我算一下 123 * 456")
# print(response)

4.3 API 编排与错误处理

在实际应用中,API调用常常会遇到各种问题:网络超时、服务不可用、数据格式错误等。一个健壮的Agent需要具备完善的错误处理机制。

┌─────────────────────────────────────────────────────────────────────┐
│                       API 错误处理策略                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  错误类型              处理策略              回退方案                │
│  ──────────────────────────────────────────────────────────────────│
│                                                                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │ 网络超时    │ ───▶ │ 指数退避    │ ───▶ │ 备用API     │         │
│  │ Timeout     │      │ 重试3次     │      │ 或缓存数据  │         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│                                                                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │ 认证失败    │ ───▶ │ 刷新Token   │ ───▶ │ 通知用户    │         │
│  │ 401/403     │      │ 重新认证    │      │ 重新授权    │         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│                                                                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │ 限流触发    │ ───▶ │ 等待重试    │ ───▶ │ 降级处理    │         │
│  │ 429         │      │ 按Header    │      │ 减少请求    │         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│                                                                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │ 服务不可用  │ ───▶ │ 切换备用    │ ───▶ │ 返回默认值  │         │
│  │ 500/503     │      │ 服务端点    │      │ 或告知用户  │         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│                                                                      │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │ 数据格式    │ ───▶ │ 解析修复    │ ───▶ │ 记录日志    │         │
│  │ 错误        │      │ 模糊匹配    │      │ 返回错误    │         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
from enum import Enum
from typing import Any, Callable, Optional, TypeVar, Generic
from dataclasses import dataclass
import asyncio
import logging

T = TypeVar('T')

class ErrorType(Enum):
    """错误类型"""
    TIMEOUT = "timeout"
    AUTH_FAILED = "auth_failed"
    RATE_LIMITED = "rate_limited"
    SERVER_ERROR = "server_error"
    PARSE_ERROR = "parse_error"
    UNKNOWN = "unknown"

@dataclass
class APIError(Exception):
    """API错误"""
    error_type: ErrorType
    message: str
    status_code: Optional[int] = None
    retry_after: Optional[int] = None

class Result(Generic[T]):
    """结果包装器"""
    
    def __init__(self, value: Optional[T] = None, 
                 error: Optional[APIError] = None):
        self._value = value
        self._error = error
    
    @property
    def is_success(self) -> bool:
        return self._error is None
    
    @property
    def value(self) -> T:
        if self._error:
            raise self._error
        return self._value
    
    @property
    def error(self) -> Optional[APIError]:
        return self._error
    
    @staticmethod
    def success(value: T) -> 'Result[T]':
        return Result(value=value)
    
    @staticmethod
    def failure(error: APIError) -> 'Result[T]':
        return Result(error=error)

class RetryPolicy:
    """重试策略"""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        retryable_errors: Optional[set] = None
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.retryable_errors = retryable_errors or {
            ErrorType.TIMEOUT,
            ErrorType.RATE_LIMITED,
            ErrorType.SERVER_ERROR
        }
    
    def should_retry(self, error: APIError, attempt: int) -> bool:
        """判断是否应该重试"""
        if attempt >= self.max_retries:
            return False
        return error.error_type in self.retryable_errors
    
    def get_delay(self, attempt: int, 
                  error: Optional[APIError] = None) -> float:
        """计算重试延迟"""
        # 如果服务器指定了重试时间,使用它
        if error and error.retry_after:
            return min(error.retry_after, self.max_delay)
        
        # 否则使用指数退避
        delay = self.base_delay * (self.exponential_base ** attempt)
        return min(delay, self.max_delay)

class CircuitBreaker:
    """熔断器"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 1
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.failures = 0
        self.last_failure_time: Optional[float] = None
        self.state = "closed"  # closed, open, half-open
        self.half_open_successes = 0
    
    def record_success(self):
        """记录成功"""
        if self.state == "half-open":
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = "closed"
                self.failures = 0
        else:
            self.failures = 0
    
    def record_failure(self):
        """记录失败"""
        self.failures += 1
        self.last_failure_time = asyncio.get_event_loop().time()
        
        if self.failures >= self.failure_threshold:
            self.state = "open"
    
    def can_execute(self) -> bool:
        """检查是否可以执行"""
        if self.state == "closed":
            return True
        
        if self.state == "open":
            # 检查是否可以进入half-open
            current_time = asyncio.get_event_loop().time()
            if current_time - self.last_failure_time >= self.recovery_timeout:
                self.state = "half-open"
                self.half_open_successes = 0
                return True
            return False
        
        # half-open状态
        return True

class ResilientAPIClient:
    """具备容错能力的API客户端"""
    
    def __init__(
        self,
        client: APIClient,
        retry_policy: Optional[RetryPolicy] = None,
        circuit_breaker: Optional[CircuitBreaker] = None,
        fallback: Optional[Callable] = None
    ):
        self.client = client
        self.retry_policy = retry_policy or RetryPolicy()
        self.circuit_breaker = circuit_breaker or CircuitBreaker()
        self.fallback = fallback
        self.logger = logging.getLogger(__name__)
    
    def _classify_error(self, status_code: Optional[int], 
                        exception: Optional[Exception]) -> APIError:
        """分类错误"""
        if exception:
            if isinstance(exception, asyncio.TimeoutError):
                return APIError(ErrorType.TIMEOUT, str(exception))
            return APIError(ErrorType.UNKNOWN, str(exception))
        
        if status_code:
            if status_code == 401 or status_code == 403:
                return APIError(ErrorType.AUTH_FAILED, "Authentication failed", 
                              status_code)
            elif status_code == 429:
                return APIError(ErrorType.RATE_LIMITED, "Rate limited", 
                              status_code)
            elif status_code >= 500:
                return APIError(ErrorType.SERVER_ERROR, "Server error", 
                              status_code)
        
        return APIError(ErrorType.UNKNOWN, "Unknown error", status_code)
    
    async def execute(
        self,
        operation: Callable,
        *args,
        **kwargs
    ) -> Result[Any]:
        """执行操作,带重试和熔断"""
        
        # 检查熔断器
        if not self.circuit_breaker.can_execute():
            self.logger.warning("Circuit breaker is open, using fallback")
            if self.fallback:
                return Result.success(self.fallback(*args, **kwargs))
            return Result.failure(
                APIError(ErrorType.SERVER_ERROR, "Circuit breaker open")
            )
        
        last_error = None
        
        for attempt in range(self.retry_policy.max_retries + 1):
            try:
                result = await operation(*args, **kwargs)
                
                # 检查响应状态
                if hasattr(result, 'status_code') and result.status_code >= 400:
                    error = self._classify_error(result.status_code, None)
                    
                    if self.retry_policy.should_retry(error, attempt):
                        delay = self.retry_policy.get_delay(attempt, error)
                        self.logger.info(
                            f"Retrying after {delay}s (attempt {attempt + 1})"
                        )
                        await asyncio.sleep(delay)
                        continue
                    
                    self.circuit_breaker.record_failure()
                    return Result.failure(error)
                
                self.circuit_breaker.record_success()
                return Result.success(result)
                
            except Exception as e:
                error = self._classify_error(None, e)
                last_error = error
                
                if self.retry_policy.should_retry(error, attempt):
                    delay = self.retry_policy.get_delay(attempt, error)
                    self.logger.info(
                        f"Retrying after {delay}s due to {e} (attempt {attempt + 1})"
                    )
                    await asyncio.sleep(delay)
                    continue
                
                break
        
        # 所有重试都失败
        self.circuit_breaker.record_failure()
        
        # 尝试fallback
        if self.fallback:
            try:
                fallback_result = self.fallback(*args, **kwargs)
                return Result.success(fallback_result)
            except Exception as e:
                self.logger.error(f"Fallback also failed: {e}")
        
        return Result.failure(last_error or APIError(ErrorType.UNKNOWN, "All retries failed"))


# ============== 使用示例 ==============

async def demo_resilient_api():
    """演示容错API使用"""
    
    # 创建基础客户端
    base_client = APIClient(APIConfig(
        base_url="https://api.example.com",
        api_key="your_key"
    ))
    
    # 定义fallback函数
    def weather_fallback(city: str) -> Dict:
        return {
            "city": city,
            "temperature": "N/A",
            "source": "fallback"
        }
    
    # 创建容错客户端
    resilient_client = ResilientAPIClient(
        client=base_client,
        retry_policy=RetryPolicy(max_retries=3),
        circuit_breaker=CircuitBreaker(failure_threshold=5),
        fallback=weather_fallback
    )
    
    # 执行请求
    result = await resilient_client.execute(
        base_client.get,
        "weather",
        params={"city": "Beijing"}
    )
    
    if result.is_success:
        print(f"Success: {result.value}")
    else:
        print(f"Failed: {result.error}")

5. 代码执行:动态能力扩展

5.1 代码生成与执行流程

代码执行是Agent最强大的能力之一。通过动态生成和执行代码,Agent可以完成几乎任何计算任务,极大地扩展了其能力边界。

💡 思考:代码执行相比预定义工具有什么优势和风险?

🤔 解答

优势:

  • 无限灵活性:可以处理任意复杂的计算逻辑
  • 动态适应:无需预先定义所有可能的操作
  • 组合能力:可以组合多个库和工具
  • 可解释性:代码本身就是执行逻辑的说明

风险:

  • 安全风险:恶意代码可能造成系统损害
  • 资源消耗:不当代码可能消耗过多资源
  • 不确定性:生成的代码可能有bug
  • 依赖管理:可能需要特定的库和环境
┌─────────────────────────────────────────────────────────────────────┐
│                      代码执行流程                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐                                                    │
│  │  用户需求   │  "分析这份CSV数据,生成销售趋势图"                │
│  └──────┬──────┘                                                    │
│         │                                                           │
│         ▼                                                           │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    代码生成 (LLM)                             │  │
│  │  ┌──────────────────────────────────────────────────────────┐│  │
│  │  │ import pandas as pd                                       ││  │
│  │  │ import matplotlib.pyplot as plt                          ││  │
│  │  │                                                          ││  │
│  │  │ df = pd.read_csv('sales.csv')                            ││  │
│  │  │ df['date'] = pd.to_datetime(df['date'])                  ││  │
│  │  │ monthly = df.groupby(df['date'].dt.month)['amount'].sum()││  │
│  │  │ plt.plot(monthly.index, monthly.values)                  ││  │
│  │  │ plt.savefig('trend.png')                                 ││  │
│  │  └──────────────────────────────────────────────────────────┘│  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│                             ▼                                       │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    安全检查                                   │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        │  │
│  │  │语法检查 │──│危险API  │──│资源限制 │──│沙箱配置 │        │  │
│  │  │Syntax   │  │Blacklist│  │Resources│  │Sandbox  │        │  │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘        │  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│                             ▼                                       │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    沙箱执行                                   │  │
│  │  ┌──────────────────────────────────────────────────────────┐│  │
│  │  │  Docker Container / VM / Process Isolation               ││  │
│  │  │  ┌────────────────────────────────────────────────────┐  ││  │
│  │  │  │  Python Runtime                                    │  ││  │
│  │  │  │  • CPU限制: 1核                                    │  ││  │
│  │  │  │  • 内存限制: 512MB                                 │  ││  │
│  │  │  │  • 执行超时: 30秒                                  │  ││  │
│  │  │  │  • 网络隔离: 仅允许白名单                          │  ││  │
│  │  │  └────────────────────────────────────────────────────┘  ││  │
│  │  └──────────────────────────────────────────────────────────┘│  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│                             ▼                                       │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    结果处理                                   │  │
│  │  • 捕获stdout/stderr                                         │  │
│  │  • 收集生成的文件                                            │  │
│  │  • 格式化返回结果                                            │  │
│  │  • 清理临时资源                                              │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

5.2 多语言运行时支持

不同的任务可能需要不同的编程语言。一个完善的代码执行模块应该支持多种语言运行时。

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import subprocess
import tempfile
import os
import asyncio
import ast

@dataclass
class ExecutionResult:
    """执行结果"""
    success: bool
    output: str
    error: Optional[str] = None
    return_value: Any = None
    execution_time: float = 0.0
    files_created: List[str] = None

class LanguageRuntime(ABC):
    """语言运行时抽象基类"""
    
    @abstractmethod
    def get_language(self) -> str:
        """获取语言名称"""
        pass
    
    @abstractmethod
    async def execute(self, code: str, 
                     context: Optional[Dict] = None) -> ExecutionResult:
        """执行代码"""
        pass
    
    @abstractmethod
    def validate(self, code: str) -> tuple:
        """验证代码"""
        pass

class PythonRuntime(LanguageRuntime):
    """Python运行时"""
    
    def __init__(self, timeout: float = 30.0):
        self.timeout = timeout
        self.forbidden_imports = {
            'os.system', 'subprocess', 'eval', 'exec',
            'compile', '__import__', 'open',  # 除非在沙箱中
        }
        self.allowed_imports = {
            'math', 'statistics', 'datetime', 'json',
            'collections', 'itertools', 'functools',
            'numpy', 'pandas', 'matplotlib'
        }
    
    def get_language(self) -> str:
        return "python"
    
    def validate(self, code: str) -> tuple:
        """
        验证Python代码
        返回: (is_valid, error_message)
        """
        try:
            tree = ast.parse(code)
            
            # 检查危险操作
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        if alias.name not in self.allowed_imports:
                            return False, f"Import not allowed: {alias.name}"
                
                elif isinstance(node, ast.ImportFrom):
                    if node.module not in self.allowed_imports:
                        return False, f"Import not allowed: {node.module}"
                
                elif isinstance(node, ast.Call):
                    if isinstance(node.func, ast.Name):
                        if node.func.id in {'eval', 'exec', 'compile'}:
                            return False, f"Function not allowed: {node.func.id}"
            
            return True, None
            
        except SyntaxError as e:
            return False, f"Syntax error: {e}"
    
    async def execute(self, code: str, 
                     context: Optional[Dict] = None) -> ExecutionResult:
        """执行Python代码"""
        # 先验证
        is_valid, error = self.validate(code)
        if not is_valid:
            return ExecutionResult(
                success=False,
                output="",
                error=error
            )
        
        # 创建临时文件
        with tempfile.NamedTemporaryFile(
            mode='w', suffix='.py', delete=False
        ) as f:
            # 注入上下文
            if context:
                for key, value in context.items():
                    f.write(f"{key} = {repr(value)}\n")
            f.write(code)
            temp_file = f.name
        
        try:
            import time
            start_time = time.time()
            
            # 执行代码
            process = await asyncio.create_subprocess_exec(
                'python', temp_file,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            try:
                stdout, stderr = await asyncio.wait_for(
                    process.communicate(),
                    timeout=self.timeout
                )
                
                execution_time = time.time() - start_time
                
                return ExecutionResult(
                    success=process.returncode == 0,
                    output=stdout.decode(),
                    error=stderr.decode() if stderr else None,
                    execution_time=execution_time
                )
                
            except asyncio.TimeoutError:
                process.kill()
                return ExecutionResult(
                    success=False,
                    output="",
                    error=f"Execution timeout ({self.timeout}s)"
                )
                
        finally:
            os.unlink(temp_file)

class JavaScriptRuntime(LanguageRuntime):
    """JavaScript运行时 (Node.js)"""
    
    def __init__(self, timeout: float = 30.0):
        self.timeout = timeout
    
    def get_language(self) -> str:
        return "javascript"
    
    def validate(self, code: str) -> tuple:
        """简单的JavaScript验证"""
        dangerous_patterns = [
            'require("child_process")',
            'require("fs")',
            'eval(',
            'Function(',
        ]
        
        for pattern in dangerous_patterns:
            if pattern in code:
                return False, f"Dangerous pattern detected: {pattern}"
        
        return True, None
    
    async def execute(self, code: str, 
                     context: Optional[Dict] = None) -> ExecutionResult:
        """执行JavaScript代码"""
        is_valid, error = self.validate(code)
        if not is_valid:
            return ExecutionResult(success=False, output="", error=error)
        
        # 包装代码
        wrapped_code = ""
        if context:
            for key, value in context.items():
                wrapped_code += f"const {key} = {json.dumps(value)};\n"
        wrapped_code += code
        
        with tempfile.NamedTemporaryFile(
            mode='w', suffix='.js', delete=False
        ) as f:
            f.write(wrapped_code)
            temp_file = f.name
        
        try:
            import time
            start_time = time.time()
            
            process = await asyncio.create_subprocess_exec(
                'node', temp_file,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            try:
                stdout, stderr = await asyncio.wait_for(
                    process.communicate(),
                    timeout=self.timeout
                )
                
                return ExecutionResult(
                    success=process.returncode == 0,
                    output=stdout.decode(),
                    error=stderr.decode() if stderr else None,
                    execution_time=time.time() - start_time
                )
                
            except asyncio.TimeoutError:
                process.kill()
                return ExecutionResult(
                    success=False, 
                    output="",
                    error=f"Execution timeout ({self.timeout}s)"
                )
                
        finally:
            os.unlink(temp_file)

class CodeExecutor:
    """代码执行器"""
    
    def __init__(self):
        self.runtimes: Dict[str, LanguageRuntime] = {}
    
    def register_runtime(self, runtime: LanguageRuntime):
        """注册运行时"""
        self.runtimes[runtime.get_language()] = runtime
    
    async def execute(self, language: str, code: str,
                     context: Optional[Dict] = None) -> ExecutionResult:
        """执行代码"""
        runtime = self.runtimes.get(language)
        if not runtime:
            return ExecutionResult(
                success=False,
                output="",
                error=f"Unsupported language: {language}"
            )
        
        return await runtime.execute(code, context)


# ============== 使用示例 ==============

async def code_execution_demo():
    """代码执行演示"""
    
    executor = CodeExecutor()
    executor.register_runtime(PythonRuntime(timeout=10.0))
    executor.register_runtime(JavaScriptRuntime(timeout=10.0))
    
    # Python示例
    python_code = """
import math

def calculate_circle_area(radius):
    return math.pi * radius ** 2

areas = [calculate_circle_area(r) for r in range(1, 6)]
for i, area in enumerate(areas, 1):
    print(f"Radius {i}: Area = {area:.2f}")
"""
    
    result = await executor.execute("python", python_code)
    print("Python Result:")
    print(result.output)
    
    # JavaScript示例
    js_code = """
const numbers = [1, 2, 3, 4, 5];
const doubled = numbers.map(n => n * 2);
console.log("Original:", numbers);
console.log("Doubled:", doubled);
"""
    
    result = await executor.execute("javascript", js_code)
    print("\nJavaScript Result:")
    print(result.output)

# asyncio.run(code_execution_demo())

5.3 代码执行的挑战与解决方案

┌─────────────────────────────────────────────────────────────────────┐
│                   代码执行挑战与解决方案                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  挑战1: 安全性                                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  问题: 恶意代码可能删除文件、窃取数据、消耗资源               │   │
│  │                                                              │   │
│  │  解决方案:                                                   │   │
│  │  • 沙箱隔离 (Docker/VM/进程隔离)                            │   │
│  │  • API白名单 (只允许安全的函数)                             │   │
│  │  • 资源限制 (CPU/内存/磁盘/网络)                            │   │
│  │  • 代码审查 (AST分析检测危险模式)                           │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  挑战2: 依赖管理                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  问题: 代码可能需要特定版本的库                              │   │
│  │                                                              │   │
│  │  解决方案:                                                   │   │
│  │  • 预装常用库的基础镜像                                     │   │
│  │  • 虚拟环境按需创建                                         │   │
│  │  • 包管理器集成 (pip/npm/cargo)                             │   │
│  │  • 依赖缓存加速                                             │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  挑战3: 状态管理                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  问题: 多次执行之间如何保持状态?                            │   │
│  │                                                              │   │
│  │  解决方案:                                                   │   │
│  │  • 会话级持久化 (保持解释器实例)                            │   │
│  │  • 文件系统挂载 (持久化数据文件)                            │   │
│  │  • 变量序列化 (pickle/JSON)                                 │   │
│  │  • 数据库连接 (SQLite/Redis)                                │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  挑战4: 错误处理                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  问题: 生成的代码可能有语法或逻辑错误                        │   │
│  │                                                              │   │
│  │  解决方案:                                                   │   │
│  │  • 语法预检查                                               │   │
│  │  • 详细错误信息返回给LLM                                    │   │
│  │  • 自动修复重试                                             │   │
│  │  • 单元测试验证                                             │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

6. 安全沙箱:行动的边界

6.1 沙箱技术原理

安全沙箱是Agent行动模块的关键安全组件。它通过隔离技术限制代码的执行环境,防止恶意或错误代码对主系统造成损害。

💡 思考:为什么Agent需要沙箱?直接执行代码有什么风险?

🤔 解答:直接执行LLM生成的代码存在多重风险:

  1. 系统安全:可能执行删除文件、修改系统配置等危险操作
  2. 数据安全:可能读取敏感数据并外泄
  3. 资源滥用:可能进入死循环或消耗大量内存
  4. 网络风险:可能发起恶意网络请求

沙箱通过建立安全边界,让代码"只能在笼子里活动"。

┌─────────────────────────────────────────────────────────────────────┐
│                       沙箱技术层次                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  Level 4: 虚拟机隔离 (最强隔离)                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  ┌─────────────────────────────────────────────────────┐    │   │
│  │  │  Guest OS (Linux/Windows)                           │    │   │
│  │  │  ┌─────────────────────────────────────────────┐    │    │   │
│  │  │  │  Application                                │    │    │   │
│  │  │  │  完全隔离的操作系统环境                     │    │    │   │
│  │  │  │  优点: 最强安全性                           │    │    │   │
│  │  │  │  缺点: 启动慢,资源开销大                   │    │    │   │
│  │  │  └─────────────────────────────────────────────┘    │    │   │
│  │  └─────────────────────────────────────────────────────┘    │   │
│  │  Hypervisor (KVM/Xen/VMware)                                │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  Level 3: 容器隔离 (平衡方案)                                        │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐                  │   │
│  │  │Container │  │Container │  │Container │                  │   │
│  │  │    A     │  │    B     │  │    C     │                  │   │
│  │  └──────────┘  └──────────┘  └──────────┘                  │   │
│  │  Docker Engine / containerd                                 │   │
│  │  优点: 快速启动,较好隔离                                   │   │
│  │  缺点: 共享内核,隔离不如VM                                 │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  Level 2: 进程隔离                                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  ┌──────────────────────────────────────────────────┐       │   │
│  │  │  Sandboxed Process                               │       │   │
│  │  │  • seccomp (系统调用过滤)                        │       │   │
│  │  │  • namespaces (命名空间隔离)                     │       │   │
│  │  │  • cgroups (资源限制)                            │       │   │
│  │  │  优点: 轻量级,快速                              │       │   │
│  │  │  缺点: 需要精细配置                              │       │   │
│  │  └──────────────────────────────────────────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  Level 1: 语言级沙箱                                                 │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  ┌──────────────────────────────────────────────────┐       │   │
│  │  │  Restricted Execution Environment                │       │   │
│  │  │  • RestrictedPython                              │       │   │
│  │  │  • PyPy Sandbox                                  │       │   │
│  │  │  • 自定义 __builtins__                           │       │   │
│  │  │  优点: 最轻量                                    │       │   │
│  │  │  缺点: 可能被绕过                                │       │   │
│  │  └──────────────────────────────────────────────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

6.2 容器化隔离方案

Docker是目前最流行的容器化方案,非常适合作为代码执行的沙箱。

import docker
import asyncio
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
import tempfile
import os
import tarfile
import io

@dataclass
class SandboxConfig:
    """沙箱配置"""
    image: str = "python:3.10-slim"
    cpu_limit: float = 1.0  # CPU核数
    memory_limit: str = "512m"  # 内存限制
    timeout: int = 30  # 执行超时(秒)
    network_disabled: bool = True  # 禁用网络
    read_only: bool = True  # 只读文件系统
    working_dir: str = "/sandbox"
    user: str = "nobody"  # 非特权用户

@dataclass
class SandboxResult:
    """沙箱执行结果"""
    exit_code: int
    stdout: str
    stderr: str
    files: Dict[str, bytes]  # 输出文件
    execution_time: float
    memory_used: Optional[int] = None
    cpu_time: Optional[float] = None

class DockerSandbox:
    """Docker沙箱"""
    
    def __init__(self, config: Optional[SandboxConfig] = None):
        self.config = config or SandboxConfig()
        self.client = docker.from_env()
        self._ensure_image()
    
    def _ensure_image(self):
        """确保镜像存在"""
        try:
            self.client.images.get(self.config.image)
        except docker.errors.ImageNotFound:
            print(f"Pulling image: {self.config.image}")
            self.client.images.pull(self.config.image)
    
    def _create_tar(self, files: Dict[str, str]) -> bytes:
        """创建tar归档"""
        tar_stream = io.BytesIO()
        with tarfile.open(fileobj=tar_stream, mode='w') as tar:
            for name, content in files.items():
                data = content.encode('utf-8')
                info = tarfile.TarInfo(name=name)
                info.size = len(data)
                tar.addfile(info, io.BytesIO(data))
        tar_stream.seek(0)
        return tar_stream.read()
    
    def _extract_files(self, container, paths: List[str]) -> Dict[str, bytes]:
        """从容器提取文件"""
        files = {}
        for path in paths:
            try:
                bits, stat = container.get_archive(path)
                tar_stream = io.BytesIO()
                for chunk in bits:
                    tar_stream.write(chunk)
                tar_stream.seek(0)
                
                with tarfile.open(fileobj=tar_stream) as tar:
                    for member in tar.getmembers():
                        if member.isfile():
                            f = tar.extractfile(member)
                            files[member.name] = f.read()
            except docker.errors.NotFound:
                continue
        return files
    
    async def execute(
        self,
        code: str,
        language: str = "python",
        input_files: Optional[Dict[str, str]] = None,
        output_paths: Optional[List[str]] = None
    ) -> SandboxResult:
        """在沙箱中执行代码"""
        
        import time
        start_time = time.time()
        
        # 准备文件
        files_to_copy = input_files or {}
        
        if language == "python":
            files_to_copy["main.py"] = code
            command = ["python", "main.py"]
        elif language == "javascript":
            files_to_copy["main.js"] = code
            command = ["node", "main.js"]
        else:
            raise ValueError(f"Unsupported language: {language}")
        
        # 创建容器
        container = self.client.containers.create(
            image=self.config.image,
            command=command,
            working_dir=self.config.working_dir,
            user=self.config.user,
            cpu_period=100000,
            cpu_quota=int(100000 * self.config.cpu_limit),
            mem_limit=self.config.memory_limit,
            network_disabled=self.config.network_disabled,
            read_only=self.config.read_only,
            tmpfs={self.config.working_dir: "size=100M,mode=1777"},
            detach=True
        )
        
        try:
            # 复制文件到容器
            tar_data = self._create_tar(files_to_copy)
            container.put_archive(self.config.working_dir, tar_data)
            
            # 启动容器
            container.start()
            
            # 等待执行完成
            try:
                result = container.wait(timeout=self.config.timeout)
                exit_code = result['StatusCode']
            except Exception:
                container.kill()
                return SandboxResult(
                    exit_code=-1,
                    stdout="",
                    stderr=f"Execution timeout ({self.config.timeout}s)",
                    files={},
                    execution_time=time.time() - start_time
                )
            
            # 获取输出
            stdout = container.logs(stdout=True, stderr=False).decode('utf-8')
            stderr = container.logs(stdout=False, stderr=True).decode('utf-8')
            
            # 获取输出文件
            output_files = {}
            if output_paths:
                output_files = self._extract_files(container, output_paths)
            
            # 获取资源使用情况
            stats = container.stats(stream=False)
            
            return SandboxResult(
                exit_code=exit_code,
                stdout=stdout,
                stderr=stderr,
                files=output_files,
                execution_time=time.time() - start_time,
                memory_used=stats.get('memory_stats', {}).get('usage')
            )
            
        finally:
            # 清理容器
            container.remove(force=True)


class SecureSandbox:
    """安全沙箱封装层"""
    
    def __init__(self, sandbox: DockerSandbox):
        self.sandbox = sandbox
        self.code_analyzers = []
    
    def add_analyzer(self, analyzer: callable):
        """添加代码分析器"""
        self.code_analyzers.append(analyzer)
    
    def _analyze_code(self, code: str, language: str) -> tuple:
        """分析代码安全性"""
        for analyzer in self.code_analyzers:
            is_safe, reason = analyzer(code, language)
            if not is_safe:
                return False, reason
        return True, None
    
    async def execute_safely(
        self,
        code: str,
        language: str = "python",
        **kwargs
    ) -> SandboxResult:
        """安全执行代码"""
        
        # 安全检查
        is_safe, reason = self._analyze_code(code, language)
        if not is_safe:
            return SandboxResult(
                exit_code=-1,
                stdout="",
                stderr=f"Security check failed: {reason}",
                files={},
                execution_time=0.0
            )
        
        # 在沙箱中执行
        return await self.sandbox.execute(code, language, **kwargs)


# ============== 代码分析器 ==============

def python_security_analyzer(code: str, language: str) -> tuple:
    """Python安全分析器"""
    if language != "python":
        return True, None
    
    dangerous_patterns = [
        ("import os", "OS module is not allowed"),
        ("import subprocess", "Subprocess module is not allowed"),
        ("__import__", "Dynamic import is not allowed"),
        ("eval(", "eval() is not allowed"),
        ("exec(", "exec() is not allowed"),
        ("open(", "File operations require explicit permission"),
    ]
    
    for pattern, reason in dangerous_patterns:
        if pattern in code:
            return False, reason
    
    return True, None


# ============== 使用示例 ==============

async def sandbox_demo():
    """沙箱演示"""
    
    config = SandboxConfig(
        image="python:3.10-slim",
        cpu_limit=0.5,
        memory_limit="256m",
        timeout=10,
        network_disabled=True
    )
    
    docker_sandbox = DockerSandbox(config)
    secure_sandbox = SecureSandbox(docker_sandbox)
    secure_sandbox.add_analyzer(python_security_analyzer)
    
    # 安全的代码
    safe_code = """
import math

def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

for i in range(10):
    print(f"fib({i}) = {fibonacci(i)}")
"""
    
    result = await secure_sandbox.execute_safely(safe_code, "python")
    print("Safe code result:")
    print(f"Exit code: {result.exit_code}")
    print(f"Output: {result.stdout}")
    
    # 危险的代码
    dangerous_code = """
import os
os.system("rm -rf /")
"""
    
    result = await secure_sandbox.execute_safely(dangerous_code, "python")
    print("\nDangerous code result:")
    print(f"Exit code: {result.exit_code}")
    print(f"Error: {result.stderr}")

# asyncio.run(sandbox_demo())

6.3 权限控制与审计

┌─────────────────────────────────────────────────────────────────────┐
│                     权限控制与审计系统                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    权限模型 (RBAC)                           │   │
│  │                                                              │   │
│  │  ┌─────────┐    ┌─────────┐    ┌─────────┐                 │   │
│  │  │  User   │───▶│  Role   │───▶│Permission│                 │   │
│  │  │ Agent-1 │    │ Basic   │    │ read_web │                 │   │
│  │  └─────────┘    │ Premium │    │ exec_code│                 │   │
│  │                 │ Admin   │    │ send_email│                │   │
│  │                 └─────────┘    └─────────┘                 │   │
│  │                                                              │   │
│  │  权限矩阵:                                                   │   │
│  │  ┌──────────┬─────────┬─────────┬─────────┐                │   │
│  │  │ Action   │ Basic   │ Premium │ Admin   │                │   │
│  │  ├──────────┼─────────┼─────────┼─────────┤                │   │
│  │  │ web_read │   ✓     │    ✓    │    ✓    │                │   │
│  │  │ web_write│   ✗     │    ✓    │    ✓    │                │   │
│  │  │ code_exec│   ✗     │    ✓    │    ✓    │                │   │
│  │  │ file_read│   ✗     │    ✓    │    ✓    │                │   │
│  │  │ file_write│  ✗     │    ✗    │    ✓    │                │   │
│  │  │ sys_admin│   ✗     │    ✗    │    ✓    │                │   │
│  │  └──────────┴─────────┴─────────┴─────────┘                │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    审计日志系统                              │   │
│  │                                                              │   │
│  │  记录内容:                                                   │   │
│  │  • 操作时间戳                                               │   │
│  │  • 操作者身份                                               │   │
│  │  • 操作类型                                                 │   │
│  │  • 操作参数                                                 │   │
│  │  • 操作结果                                                 │   │
│  │  • 资源消耗                                                 │   │
│  │                                                              │   │
│  │  日志格式示例:                                               │   │
│  │  {                                                          │   │
│  │    "timestamp": "2024-01-15T10:30:00Z",                     │   │
│  │    "agent_id": "agent-001",                                 │   │
│  │    "action": "code_execution",                              │   │
│  │    "params": {"language": "python", "code_hash": "abc123"}, │   │
│  │    "result": "success",                                     │   │
│  │    "duration_ms": 1520,                                     │   │
│  │    "memory_mb": 45                                          │   │
│  │  }                                                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

以下是权限控制和审计系统的实现:

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from enum import Enum
from datetime import datetime
import json
import hashlib
import logging

class Permission(Enum):
    """权限枚举"""
    WEB_READ = "web_read"
    WEB_WRITE = "web_write"
    CODE_EXECUTE = "code_execute"
    FILE_READ = "file_read"
    FILE_WRITE = "file_write"
    API_CALL = "api_call"
    EMAIL_SEND = "email_send"
    SYSTEM_ADMIN = "system_admin"

@dataclass
class Role:
    """角色定义"""
    name: str
    permissions: Set[Permission]
    description: str = ""

@dataclass
class Agent:
    """Agent身份"""
    agent_id: str
    name: str
    roles: List[str]
    metadata: Dict[str, Any] = field(default_factory=dict)

class PermissionManager:
    """权限管理器"""
    
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.agents: Dict[str, Agent] = {}
        self._init_default_roles()
    
    def _init_default_roles(self):
        """初始化默认角色"""
        self.roles["basic"] = Role(
            name="basic",
            permissions={Permission.WEB_READ},
            description="Basic read-only access"
        )
        self.roles["standard"] = Role(
            name="standard",
            permissions={
                Permission.WEB_READ, 
                Permission.API_CALL,
                Permission.FILE_READ
            },
            description="Standard user access"
        )
        self.roles["premium"] = Role(
            name="premium",
            permissions={
                Permission.WEB_READ, 
                Permission.WEB_WRITE,
                Permission.API_CALL,
                Permission.CODE_EXECUTE,
                Permission.FILE_READ,
                Permission.FILE_WRITE
            },
            description="Premium user access"
        )
        self.roles["admin"] = Role(
            name="admin",
            permissions=set(Permission),  # 所有权限
            description="Full administrative access"
        )
    
    def register_agent(self, agent: Agent):
        """注册Agent"""
        self.agents[agent.agent_id] = agent
    
    def get_permissions(self, agent_id: str) -> Set[Permission]:
        """获取Agent的所有权限"""
        agent = self.agents.get(agent_id)
        if not agent:
            return set()
        
        permissions = set()
        for role_name in agent.roles:
            role = self.roles.get(role_name)
            if role:
                permissions.update(role.permissions)
        
        return permissions
    
    def check_permission(self, agent_id: str, 
                        permission: Permission) -> bool:
        """检查权限"""
        permissions = self.get_permissions(agent_id)
        return permission in permissions
    
    def require_permission(self, agent_id: str, 
                          permission: Permission):
        """要求权限(不满足则抛出异常)"""
        if not self.check_permission(agent_id, permission):
            raise PermissionError(
                f"Agent {agent_id} lacks permission: {permission.value}"
            )


@dataclass
class AuditEntry:
    """审计条目"""
    timestamp: datetime
    agent_id: str
    action: str
    parameters: Dict[str, Any]
    result: str  # success, failure, denied
    error: Optional[str] = None
    duration_ms: Optional[float] = None
    resource_usage: Optional[Dict[str, Any]] = None

class AuditLogger:
    """审计日志记录器"""
    
    def __init__(self, log_file: Optional[str] = None):
        self.entries: List[AuditEntry] = []
        self.log_file = log_file
        self.logger = logging.getLogger("audit")
        
        if log_file:
            handler = logging.FileHandler(log_file)
            handler.setFormatter(logging.Formatter(
                '%(asctime)s - %(message)s'
            ))
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
    
    def log(self, entry: AuditEntry):
        """记录审计日志"""
        self.entries.append(entry)
        
        log_data = {
            "timestamp": entry.timestamp.isoformat(),
            "agent_id": entry.agent_id,
            "action": entry.action,
            "params": self._sanitize_params(entry.parameters),
            "result": entry.result,
            "error": entry.error,
            "duration_ms": entry.duration_ms,
            "resources": entry.resource_usage
        }
        
        self.logger.info(json.dumps(log_data))
    
    def _sanitize_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """清理敏感参数"""
        sanitized = {}
        sensitive_keys = {'password', 'api_key', 'secret', 'token'}
        
        for key, value in params.items():
            if key.lower() in sensitive_keys:
                sanitized[key] = "***REDACTED***"
            elif key == 'code':
                # 对代码计算哈希
                sanitized['code_hash'] = hashlib.sha256(
                    str(value).encode()
                ).hexdigest()[:16]
                sanitized['code_length'] = len(str(value))
            else:
                sanitized[key] = value
        
        return sanitized
    
    def query(self, 
             agent_id: Optional[str] = None,
             action: Optional[str] = None,
             start_time: Optional[datetime] = None,
             end_time: Optional[datetime] = None,
             result: Optional[str] = None) -> List[AuditEntry]:
        """查询审计日志"""
        filtered = self.entries
        
        if agent_id:
            filtered = [e for e in filtered if e.agent_id == agent_id]
        if action:
            filtered = [e for e in filtered if e.action == action]
        if start_time:
            filtered = [e for e in filtered if e.timestamp >= start_time]
        if end_time:
            filtered = [e for e in filtered if e.timestamp <= end_time]
        if result:
            filtered = [e for e in filtered if e.result == result]
        
        return filtered
    
    def get_statistics(self, agent_id: Optional[str] = None) -> Dict:
        """获取统计信息"""
        entries = self.entries
        if agent_id:
            entries = [e for e in entries if e.agent_id == agent_id]
        
        stats = {
            "total_actions": len(entries),
            "success_count": len([e for e in entries if e.result == "success"]),
            "failure_count": len([e for e in entries if e.result == "failure"]),
            "denied_count": len([e for e in entries if e.result == "denied"]),
            "actions_by_type": {},
            "avg_duration_ms": 0
        }
        
        durations = [e.duration_ms for e in entries if e.duration_ms]
        if durations:
            stats["avg_duration_ms"] = sum(durations) / len(durations)
        
        for entry in entries:
            action = entry.action
            if action not in stats["actions_by_type"]:
                stats["actions_by_type"][action] = 0
            stats["actions_by_type"][action] += 1
        
        return stats


class SecureActionExecutor:
    """安全行动执行器(集成权限控制和审计)"""
    
    def __init__(self, 
                 permission_manager: PermissionManager,
                 audit_logger: AuditLogger):
        self.permission_manager = permission_manager
        self.audit_logger = audit_logger
        self.action_handlers: Dict[str, callable] = {}
        self.action_permissions: Dict[str, Permission] = {}
    
    def register_action(self, action_name: str, 
                       handler: callable,
                       required_permission: Permission):
        """注册行动"""
        self.action_handlers[action_name] = handler
        self.action_permissions[action_name] = required_permission
    
    async def execute(self, agent_id: str, action_name: str,
                     parameters: Dict[str, Any]) -> Any:
        """执行行动"""
        import time
        start_time = time.time()
        
        # 检查权限
        required_permission = self.action_permissions.get(action_name)
        if not required_permission:
            self._log_audit(agent_id, action_name, parameters, 
                          "failure", "Unknown action")
            raise ValueError(f"Unknown action: {action_name}")
        
        if not self.permission_manager.check_permission(
            agent_id, required_permission
        ):
            self._log_audit(agent_id, action_name, parameters,
                          "denied", "Permission denied")
            raise PermissionError(
                f"Permission denied for action: {action_name}"
            )
        
        # 执行行动
        handler = self.action_handlers[action_name]
        try:
            result = await handler(**parameters)
            duration = (time.time() - start_time) * 1000
            self._log_audit(agent_id, action_name, parameters,
                          "success", duration_ms=duration)
            return result
        except Exception as e:
            duration = (time.time() - start_time) * 1000
            self._log_audit(agent_id, action_name, parameters,
                          "failure", str(e), duration_ms=duration)
            raise
    
    def _log_audit(self, agent_id: str, action: str,
                   params: Dict, result: str,
                   error: Optional[str] = None,
                   duration_ms: Optional[float] = None):
        """记录审计"""
        entry = AuditEntry(
            timestamp=datetime.now(),
            agent_id=agent_id,
            action=action,
            parameters=params,
            result=result,
            error=error,
            duration_ms=duration_ms
        )
        self.audit_logger.log(entry)

7. 具身智能:从数字到物理

7.1 具身智能基础概念

具身智能(Embodied Intelligence)是AI研究的前沿方向,它强调智能体需要有"身体"来与物理世界交互。与纯数字环境中的Agent不同,具身智能Agent需要处理物理约束、连续动作空间和真实世界的不确定性。

💡 思考:为什么说具身智能是AI的下一个里程碑?

🤔 解答

  1. 完整的智能闭环:只有与物理世界交互,AI才能真正理解因果关系
  2. 实用价值:机器人、自动驾驶等应用需要具身智能
  3. 学习效率:通过身体与环境的交互学习,可能比纯数据训练更高效
  4. 通用智能:具身交互可能是实现AGI的关键路径
┌─────────────────────────────────────────────────────────────────────┐
│                    具身智能系统架构                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                        ┌─────────────────┐                          │
│                        │   语言模型      │                          │
│                        │   (LLM Brain)   │                          │
│                        └────────┬────────┘                          │
│                                 │                                    │
│           ┌─────────────────────┼─────────────────────┐             │
│           │                     │                     │             │
│           ▼                     ▼                     ▼             │
│    ┌─────────────┐      ┌─────────────┐      ┌─────────────┐       │
│    │  语言理解   │      │  任务规划   │      │  世界模型   │       │
│    │ "拿起红色  │      │ 1.定位物体  │      │  空间理解   │       │
│    │  的杯子"   │      │ 2.移动手臂  │      │  物理常识   │       │
│    └──────┬──────┘      │ 3.抓取     │      └──────┬──────┘       │
│           │             │ 4.移动     │             │               │
│           │             └──────┬──────┘             │               │
│           └─────────────────────┼─────────────────────┘             │
│                                 │                                    │
│                                 ▼                                    │
│                   ┌──────────────────────────┐                      │
│                   │     行动价值评估          │                      │
│                   │   (Affordance Model)     │                      │
│                   │   • 可行性检查            │                      │
│                   │   • 成功概率估计          │                      │
│                   │   • 安全性评估            │                      │
│                   └────────────┬─────────────┘                      │
│                                │                                    │
│                                ▼                                    │
│    ┌───────────────────────────────────────────────────────────┐   │
│    │                    机器人控制层                            │   │
│    │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │   │
│    │  │  视觉感知    │  │  运动规划    │  │  力反馈控制  │    │   │
│    │  │  RGB-D相机   │  │  逆运动学    │  │  柔顺控制    │    │   │
│    │  └──────────────┘  └──────────────┘  └──────────────┘    │   │
│    └───────────────────────────────────────────────────────────┘   │
│                                │                                    │
│                                ▼                                    │
│    ┌───────────────────────────────────────────────────────────┐   │
│    │                      物理世界                              │   │
│    │     🤖 ───── 🔴 ───── 📦 ───── 🏠                         │   │
│    │    机器人   目标物体   容器    环境                        │   │
│    └───────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

7.2 SayCan:语言模型遇见机器人

SayCan是Google在2022年发布的开创性工作,首次成功将大语言模型与机器人控制结合起来。其核心思想是:用语言模型提供"说什么"(Say),用价值函数评估"能做什么"(Can)

┌─────────────────────────────────────────────────────────────────────┐
│                       SayCan 核心原理                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  用户指令: "I spilled my drink, can you help?"                      │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Step 1: LLM生成技能候选                   │   │
│  │                                                              │   │
│  │  LLM Output (语言评分):                                      │   │
│  │  • "find a sponge"      → P(say) = 0.35                     │   │
│  │  • "pick up the sponge" → P(say) = 0.25                     │   │
│  │  • "find a cup"         → P(say) = 0.05                     │   │
│  │  • "go to the table"    → P(say) = 0.15                     │   │
│  │  • ...                                                       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Step 2: 价值函数评估可行性                │   │
│  │                                                              │   │
│  │  Affordance Model (能力评分):                                │   │
│  │  当前场景: 机器人在厨房,面前有海绵和杯子                    │   │
│  │                                                              │   │
│  │  • "find a sponge"      → P(can) = 0.90  (海绵在视野中)     │   │
│  │  • "pick up the sponge" → P(can) = 0.85  (可达且可抓取)     │   │
│  │  • "find a cup"         → P(can) = 0.70  (杯子也在视野中)   │   │
│  │  • "go to the table"    → P(can) = 0.95  (路径畅通)         │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Step 3: 联合评分选择行动                  │   │
│  │                                                              │   │
│  │  Score = P(say) × P(can)                                    │   │
│  │                                                              │   │
│  │  • "find a sponge"      → 0.35 × 0.90 = 0.315 ✓ 最高       │   │
│  │  • "pick up the sponge" → 0.25 × 0.85 = 0.213               │   │
│  │  • "go to the table"    → 0.15 × 0.95 = 0.143               │   │
│  │  • "find a cup"         → 0.05 × 0.70 = 0.035               │   │
│  │                                                              │   │
│  │  选择: "find a sponge" → 执行该技能                         │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Step 4: 执行并迭代                        │   │
│  │                                                              │   │
│  │  执行 "find a sponge" → 成功                                │   │
│  │  更新上下文: "I found a sponge."                            │   │
│  │  继续规划下一步...                                          │   │
│  │                                                              │   │
│  │  完整执行序列:                                               │   │
│  │  1. find a sponge ✓                                         │   │
│  │  2. pick up the sponge ✓                                    │   │
│  │  3. go to the spill ✓                                       │   │
│  │  4. clean the spill ✓                                       │   │
│  │  5. done ✓                                                  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

以下是SayCan思想的代码实现:

from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np

@dataclass
class Skill:
    """机器人技能"""
    name: str
    description: str
    execute: Callable  # 执行函数
    affordance_model: Callable  # 可行性评估函数
    preconditions: List[str] = None  # 前置条件
    effects: List[str] = None  # 执行效果

@dataclass
class WorldState:
    """世界状态"""
    robot_position: Tuple[float, float, float]
    objects: Dict[str, Dict[str, Any]]  # {object_name: {position, graspable, ...}}
    robot_holding: Optional[str] = None
    completed_tasks: List[str] = None

class LanguageModel:
    """语言模型(模拟)"""
    
    def __init__(self, model_name: str = "gpt-4"):
        self.model_name = model_name
    
    def score_skills(self, 
                    instruction: str,
                    context: str,
                    skills: List[str]) -> Dict[str, float]:
        """
        为技能评分:该技能对完成指令有多大帮助?
        返回: {skill_name: probability}
        
        实际实现应调用LLM API
        """
        # 模拟:构建prompt让LLM评估每个技能
        prompt = f"""
        User instruction: {instruction}
        Current context: {context}
        
        Available skills: {skills}
        
        For each skill, estimate the probability that it should be 
        the next step to help complete the instruction.
        """
        
        # 模拟返回(实际应解析LLM输出)
        # 这里假设一些合理的分布
        scores = {}
        for skill in skills:
            if "spill" in instruction.lower():
                if "sponge" in skill.lower():
                    scores[skill] = 0.35
                elif "clean" in skill.lower():
                    scores[skill] = 0.25
                else:
                    scores[skill] = 0.1
            else:
                scores[skill] = 1.0 / len(skills)
        
        # 归一化
        total = sum(scores.values())
        return {k: v/total for k, v in scores.items()}

class AffordanceModel:
    """可行性模型:评估机器人当前能否执行某技能"""
    
    def __init__(self):
        self.skill_models: Dict[str, Callable] = {}
    
    def register_skill(self, skill_name: str, 
                      model: Callable[[WorldState], float]):
        """注册技能的可行性模型"""
        self.skill_models[skill_name] = model
    
    def evaluate(self, skill_name: str, 
                world_state: WorldState) -> float:
        """评估技能的可行性(0-1)"""
        if skill_name not in self.skill_models:
            return 0.0
        
        model = self.skill_models[skill_name]
        return model(world_state)

class SayCanAgent:
    """SayCan Agent实现"""
    
    def __init__(self, 
                 language_model: LanguageModel,
                 affordance_model: AffordanceModel):
        self.lm = language_model
        self.affordance = affordance_model
        self.skills: Dict[str, Skill] = {}
        self.execution_history: List[str] = []
    
    def register_skill(self, skill: Skill):
        """注册技能"""
        self.skills[skill.name] = skill
        self.affordance.register_skill(
            skill.name, 
            skill.affordance_model
        )
    
    def select_action(self, 
                     instruction: str,
                     world_state: WorldState) -> Optional[str]:
        """
        选择下一个要执行的动作
        使用SayCan公式: score = P(say) × P(can)
        """
        skill_names = list(self.skills.keys())
        
        # 构建上下文
        context = self._build_context()
        
        # Step 1: 语言模型评分 P(say)
        say_scores = self.lm.score_skills(
            instruction, context, skill_names
        )
        
        # Step 2: 可行性评分 P(can)
        can_scores = {}
        for skill_name in skill_names:
            can_scores[skill_name] = self.affordance.evaluate(
                skill_name, world_state
            )
        
        # Step 3: 联合评分
        combined_scores = {}
        for skill_name in skill_names:
            combined_scores[skill_name] = (
                say_scores.get(skill_name, 0) * 
                can_scores.get(skill_name, 0)
            )
        
        # 选择最高分的技能
        if not combined_scores:
            return None
        
        best_skill = max(combined_scores, key=combined_scores.get)
        best_score = combined_scores[best_skill]
        
        # 如果最高分太低,可能任务已完成或无法继续
        if best_score < 0.01:
            return None
        
        return best_skill
    
    def _build_context(self) -> str:
        """构建执行上下文"""
        if not self.execution_history:
            return "No actions taken yet."
        
        context = "Actions taken so far:\n"
        for i, action in enumerate(self.execution_history, 1):
            context += f"{i}. {action}\n"
        
        return context
    
    async def execute_plan(self, 
                          instruction: str,
                          world_state: WorldState,
                          max_steps: int = 10) -> List[str]:
        """
        执行完整计划
        迭代选择和执行动作,直到任务完成或达到最大步数
        """
        self.execution_history = []
        
        for step in range(max_steps):
            # 选择动作
            action = self.select_action(instruction, world_state)
            
            if action is None:
                print(f"Step {step + 1}: No valid action, task complete")
                break
            
            print(f"Step {step + 1}: Executing '{action}'")
            
            # 执行动作
            skill = self.skills[action]
            success = await skill.execute(world_state)
            
            if success:
                self.execution_history.append(f"{action} - success")
                # 更新世界状态(这里简化处理)
            else:
                self.execution_history.append(f"{action} - failed")
                print(f"  Action failed, retrying...")
        
        return self.execution_history


# ============== 示例技能定义 ==============

def create_find_object_skill(object_name: str) -> Skill:
    """创建寻找物体的技能"""
    
    async def execute(world_state: WorldState) -> bool:
        """执行寻找物体"""
        if object_name in world_state.objects:
            print(f"  Found {object_name} at position "
                  f"{world_state.objects[object_name]['position']}")
            return True
        return False
    
    def affordance(world_state: WorldState) -> float:
        """评估可行性"""
        if object_name in world_state.objects:
            obj = world_state.objects[object_name]
            # 根据距离和可见性计算可行性
            distance = np.linalg.norm(
                np.array(obj['position']) - 
                np.array(world_state.robot_position)
            )
            visibility = obj.get('visible', 1.0)
            return visibility * max(0, 1 - distance / 10)
        return 0.0
    
    return Skill(
        name=f"find {object_name}",
        description=f"Locate the {object_name} in the environment",
        execute=execute,
        affordance_model=affordance,
        effects=[f"located_{object_name}"]
    )

def create_pick_object_skill(object_name: str) -> Skill:
    """创建拾取物体的技能"""
    
    async def execute(world_state: WorldState) -> bool:
        """执行拾取"""
        if object_name in world_state.objects:
            obj = world_state.objects[object_name]
            if obj.get('graspable', False):
                world_state.robot_holding = object_name
                print(f"  Picked up {object_name}")
                return True
        return False
    
    def affordance(world_state: WorldState) -> float:
        """评估可行性"""
        if object_name not in world_state.objects:
            return 0.0
        
        obj = world_state.objects[object_name]
        if not obj.get('graspable', False):
            return 0.1  # 不可抓取
        
        if world_state.robot_holding is not None:
            return 0.0  # 手中已有物体
        
        # 根据距离计算
        distance = np.linalg.norm(
            np.array(obj['position']) - 
            np.array(world_state.robot_position)
        )
        return max(0, 1 - distance / 5)
    
    return Skill(
        name=f"pick up {object_name}",
        description=f"Grasp and pick up the {object_name}",
        execute=execute,
        affordance_model=affordance,
        preconditions=[f"located_{object_name}"],
        effects=[f"holding_{object_name}"]
    )


# ============== 使用示例 ==============

async def saycan_demo():
    """SayCan演示"""
    
    # 初始化
    lm = LanguageModel()
    affordance = AffordanceModel()
    agent = SayCanAgent(lm, affordance)
    
    # 注册技能
    agent.register_skill(create_find_object_skill("sponge"))
    agent.register_skill(create_find_object_skill("cup"))
    agent.register_skill(create_pick_object_skill("sponge"))
    agent.register_skill(create_pick_object_skill("cup"))
    
    # 初始化世界状态
    world_state = WorldState(
        robot_position=(0, 0, 0),
        objects={
            "sponge": {
                "position": (1, 0, 0),
                "graspable": True,
                "visible": 1.0
            },
            "cup": {
                "position": (2, 1, 0),
                "graspable": True,
                "visible": 0.8
            }
        }
    )
    
    # 执行任务
    instruction = "I spilled my drink, can you help?"
    history = await agent.execute_plan(instruction, world_state)
    
    print("\nExecution history:")
    for action in history:
        print(f"  - {action}")

# asyncio.run(saycan_demo())

7.3 多模态感知与行动

具身智能Agent需要处理多种感知模态,并将它们融合为统一的世界理解。

┌─────────────────────────────────────────────────────────────────────┐
│                   多模态感知与行动融合                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      感知模态                                │   │
│  │                                                              │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │   │
│  │  │  视觉   │  │  深度   │  │  触觉   │  │  语音   │       │   │
│  │  │  RGB    │  │  Depth  │  │  Force  │  │  Audio  │       │   │
│  │  │ Camera  │  │ Sensor  │  │ Sensor  │  │  Input  │       │   │
│  │  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘       │   │
│  │       │            │            │            │              │   │
│  │       ▼            ▼            ▼            ▼              │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │   │
│  │  │物体检测 │  │3D重建   │  │接触感知 │  │语音识别 │       │   │
│  │  │分割    │  │点云处理 │  │力估计   │  │意图理解 │       │   │
│  │  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘       │   │
│  │       │            │            │            │              │   │
│  │       └────────────┴──────┬─────┴────────────┘              │   │
│  │                           │                                 │   │
│  │                           ▼                                 │   │
│  │            ┌─────────────────────────────────┐              │   │
│  │            │      多模态融合 (VLM)           │              │   │
│  │            │   Vision-Language Model         │              │   │
│  │            │   • 场景理解                    │              │   │
│  │            │   • 物体关系推理                │              │   │
│  │            │   • 空间语义映射                │              │   │
│  │            └───────────────┬─────────────────┘              │   │
│  └────────────────────────────┼─────────────────────────────────┘   │
│                               │                                     │
│                               ▼                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    行动规划与执行                            │   │
│  │                                                              │   │
│  │   场景理解 ─────▶ 任务分解 ─────▶ 技能选择 ─────▶ 执行     │   │
│  │       │              │              │              │         │   │
│  │       ▼              ▼              ▼              ▼         │   │
│  │   "厨房台面上     "1.找到杯子    "pick up       电机控制    │   │
│  │    有一个红杯"    2.拿起来       red cup"      轨迹规划    │   │
│  │                   3.放到水槽"                              │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import numpy as np

@dataclass
class VisualObservation:
    """视觉观察"""
    rgb_image: np.ndarray  # HxWx3
    depth_image: Optional[np.ndarray] = None  # HxW
    segmentation_mask: Optional[np.ndarray] = None  # HxW
    detected_objects: Optional[List[Dict]] = None

@dataclass
class ProprioceptiveState:
    """本体感知状态"""
    joint_positions: np.ndarray
    joint_velocities: np.ndarray
    end_effector_position: np.ndarray
    end_effector_orientation: np.ndarray
    gripper_state: float  # 0-1, 0=closed, 1=open

@dataclass
class TactileReading:
    """触觉读数"""
    contact_force: np.ndarray  # 3D force vector
    contact_location: Optional[np.ndarray] = None
    is_contact: bool = False

@dataclass
class MultimodalObservation:
    """多模态观察"""
    visual: VisualObservation
    proprioceptive: ProprioceptiveState
    tactile: Optional[TactileReading] = None
    audio: Optional[np.ndarray] = None
    language_instruction: Optional[str] = None

class MultimodalEncoder:
    """多模态编码器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        # 这里应初始化各模态的编码器
        # 如: ViT for vision, BERT for language等
    
    def encode_visual(self, visual: VisualObservation) -> np.ndarray:
        """编码视觉信息"""
        # 使用预训练视觉模型(如CLIP、ViT)
        # 返回视觉特征向量
        pass
    
    def encode_proprioceptive(self, 
                             proprio: ProprioceptiveState) -> np.ndarray:
        """编码本体感知"""
        # 简单拼接和归一化
        return np.concatenate([
            proprio.joint_positions,
            proprio.joint_velocities,
            proprio.end_effector_position,
            proprio.end_effector_orientation,
            [proprio.gripper_state]
        ])
    
    def encode_language(self, instruction: str) -> np.ndarray:
        """编码语言指令"""
        # 使用语言模型编码
        pass
    
    def fuse(self, 
            visual_emb: np.ndarray,
            proprio_emb: np.ndarray,
            language_emb: Optional[np.ndarray] = None) -> np.ndarray:
        """融合多模态特征"""
        # 可以使用简单拼接、交叉注意力等方法
        embeddings = [visual_emb, proprio_emb]
        if language_emb is not None:
            embeddings.append(language_emb)
        
        return np.concatenate(embeddings)

class EmbodiedActionSpace:
    """具身行动空间"""
    
    def __init__(self, 
                 position_dim: int = 3,
                 rotation_dim: int = 3,
                 gripper_dim: int = 1):
        self.position_dim = position_dim
        self.rotation_dim = rotation_dim
        self.gripper_dim = gripper_dim
        self.total_dim = position_dim + rotation_dim + gripper_dim
    
    def clip_action(self, action: np.ndarray) -> np.ndarray:
        """裁剪动作到有效范围"""
        clipped = action.copy()
        # 位置增量限制
        clipped[:self.position_dim] = np.clip(
            clipped[:self.position_dim], -0.1, 0.1
        )
        # 旋转增量限制
        clipped[self.position_dim:self.position_dim+self.rotation_dim] = np.clip(
            clipped[self.position_dim:self.position_dim+self.rotation_dim], 
            -0.5, 0.5
        )
        # 夹爪状态
        clipped[-1] = np.clip(clipped[-1], 0, 1)
        
        return clipped
    
    def action_to_command(self, 
                         action: np.ndarray,
                         current_state: ProprioceptiveState) -> Dict:
        """将动作转换为机器人命令"""
        position_delta = action[:self.position_dim]
        rotation_delta = action[self.position_dim:self.position_dim+self.rotation_dim]
        gripper_cmd = action[-1]
        
        return {
            "target_position": current_state.end_effector_position + position_delta,
            "target_orientation": current_state.end_effector_orientation + rotation_delta,
            "gripper_command": gripper_cmd
        }

class EmbodiedAgent:
    """具身智能Agent"""
    
    def __init__(self,
                 encoder: MultimodalEncoder,
                 action_space: EmbodiedActionSpace,
                 policy_model: Any):  # 策略模型
        self.encoder = encoder
        self.action_space = action_space
        self.policy = policy_model
        self.observation_history: List[MultimodalObservation] = []
    
    def observe(self, observation: MultimodalObservation):
        """接收观察"""
        self.observation_history.append(observation)
        if len(self.observation_history) > 10:  # 保留最近10帧
            self.observation_history.pop(0)
    
    def act(self) -> np.ndarray:
        """基于当前观察生成动作"""
        if not self.observation_history:
            return np.zeros(self.action_space.total_dim)
        
        current_obs = self.observation_history[-1]
        
        # 编码多模态输入
        visual_emb = self.encoder.encode_visual(current_obs.visual)
        proprio_emb = self.encoder.encode_proprioceptive(
            current_obs.proprioceptive
        )
        
        language_emb = None
        if current_obs.language_instruction:
            language_emb = self.encoder.encode_language(
                current_obs.language_instruction
            )
        
        # 融合特征
        fused_features = self.encoder.fuse(
            visual_emb, proprio_emb, language_emb
        )
        
        # 策略网络生成动作
        action = self.policy.forward(fused_features)
        
        # 裁剪动作
        action = self.action_space.clip_action(action)
        
        return action
    
    def reset(self):
        """重置Agent状态"""
        self.observation_history = []

8. 实战:构建完整的行动模块

8.1 系统架构设计

现在让我们将前面讨论的所有组件整合起来,构建一个完整的Agent行动模块。

┌─────────────────────────────────────────────────────────────────────┐
│                     完整行动模块架构                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │                     Agent Controller                          │ │
│  │  • 接收规划模块的行动请求                                     │ │
│  │  • 协调各子模块执行                                           │ │
│  │  • 返回执行结果给规划模块                                     │ │
│  └─────────────────────────────┬─────────────────────────────────┘ │
│                                │                                    │
│         ┌──────────────────────┼──────────────────────┐            │
│         │                      │                      │            │
│         ▼                      ▼                      ▼            │
│  ┌─────────────┐       ┌─────────────┐       ┌─────────────┐      │
│  │  Tool       │       │  API        │       │  Code       │      │
│  │  Executor   │       │  Client     │       │  Executor   │      │
│  │             │       │             │       │             │      │
│  │ • 计算器    │       │ • HTTP请求  │       │ • Python    │      │
│  │ • 搜索      │       │ • 认证管理  │       │ • JavaScript│      │
│  │ • 日历      │       │ • 重试逻辑  │       │ • SQL       │      │
│  └──────┬──────┘       └──────┬──────┘       └──────┬──────┘      │
│         │                      │                      │            │
│         └──────────────────────┼──────────────────────┘            │
│                                │                                    │
│                                ▼                                    │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │                     Security Layer                            │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │ │
│  │  │ Permission  │  │  Sandbox    │  │  Audit      │          │ │
│  │  │ Manager     │  │  Runtime    │  │  Logger     │          │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘          │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

8.2 核心代码实现

import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union
from enum import Enum
from datetime import datetime
import json
import logging

# ============== 行动类型定义 ==============

class ActionCategory(Enum):
    """行动类别"""
    TOOL = "tool"
    API = "api"
    CODE = "code"
    EMBODIED = "embodied"

@dataclass
class ActionDefinition:
    """行动定义"""
    name: str
    category: ActionCategory
    description: str
    parameters_schema: Dict[str, Any]
    handler: Callable
    permission_required: Optional[str] = None
    timeout: float = 30.0
    requires_confirmation: bool = False

@dataclass
class ActionRequest:
    """行动请求"""
    action_name: str
    parameters: Dict[str, Any]
    agent_id: str
    request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    priority: int = 0
    context: Optional[Dict[str, Any]] = None

@dataclass
class ActionResponse:
    """行动响应"""
    request_id: str
    success: bool
    result: Any = None
    error: Optional[str] = None
    execution_time: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)

# ============== 行动模块主类 ==============

class ActionModule:
    """Agent行动模块"""
    
    def __init__(self, config: Optional[Dict] = None):
        self.config = config or {}
        self.actions: Dict[str, ActionDefinition] = {}
        self.permission_manager = PermissionManager()
        self.audit_logger = AuditLogger()
        self.executors: Dict[ActionCategory, Any] = {}
        self.middleware: List[Callable] = []
        self.logger = logging.getLogger(__name__)
        
        self._init_executors()
    
    def _init_executors(self):
        """初始化执行器"""
        # 工具执行器
        self.executors[ActionCategory.TOOL] = ToolExecutor(
            ToolRegistry()
        )
        
        # API执行器
        self.executors[ActionCategory.API] = ResilientAPIClient(
            APIClient(APIConfig(
                base_url=self.config.get('api_base_url', ''),
                timeout=self.config.get('api_timeout', 30.0)
            ))
        )
        
        # 代码执行器(沙箱)
        if self.config.get('enable_code_execution', False):
            sandbox_config = SandboxConfig(
                **self.config.get('sandbox', {})
            )
            self.executors[ActionCategory.CODE] = SecureSandbox(
                DockerSandbox(sandbox_config)
            )
    
    def register_action(self, action_def: ActionDefinition):
        """注册行动"""
        self.actions[action_def.name] = action_def
        self.logger.info(f"Registered action: {action_def.name}")
    
    def add_middleware(self, middleware: Callable):
        """添加中间件"""
        self.middleware.append(middleware)
    
    async def execute(self, request: ActionRequest) -> ActionResponse:
        """执行行动"""
        import time
        import uuid
        
        start_time = time.time()
        
        # 获取行动定义
        action_def = self.actions.get(request.action_name)
        if not action_def:
            return ActionResponse(
                request_id=request.request_id,
                success=False,
                error=f"Unknown action: {request.action_name}"
            )
        
        # 权限检查
        if action_def.permission_required:
            if not self.permission_manager.check_permission(
                request.agent_id,
                Permission(action_def.permission_required)
            ):
                self._log_audit(request, "denied", "Permission denied")
                return ActionResponse(
                    request_id=request.request_id,
                    success=False,
                    error="Permission denied"
                )
        
        # 执行中间件
        for middleware in self.middleware:
            try:
                request = await middleware(request)
            except Exception as e:
                return ActionResponse(
                    request_id=request.request_id,
                    success=False,
                    error=f"Middleware error: {str(e)}"
                )
        
        # 确认检查(如果需要)
        if action_def.requires_confirmation:
            # 这里应该有确认逻辑
            pass
        
        # 执行行动
        try:
            result = await asyncio.wait_for(
                self._execute_action(action_def, request),
                timeout=action_def.timeout
            )
            
            execution_time = time.time() - start_time
            self._log_audit(request, "success", 
                          duration_ms=execution_time * 1000)
            
            return ActionResponse(
                request_id=request.request_id,
                success=True,
                result=result,
                execution_time=execution_time
            )
            
        except asyncio.TimeoutError:
            self._log_audit(request, "failure", "Timeout")
            return ActionResponse(
                request_id=request.request_id,
                success=False,
                error=f"Action timeout ({action_def.timeout}s)"
            )
        except Exception as e:
            self._log_audit(request, "failure", str(e))
            return ActionResponse(
                request_id=request.request_id,
                success=False,
                error=str(e)
            )
    
    async def _execute_action(self, 
                             action_def: ActionDefinition,
                             request: ActionRequest) -> Any:
        """执行具体行动"""
        executor = self.executors.get(action_def.category)
        
        if action_def.category == ActionCategory.TOOL:
            return await executor.execute(
                action_def.name, 
                request.parameters
            )
        
        elif action_def.category == ActionCategory.API:
            return await executor.execute(
                action_def.handler,
                **request.parameters
            )
        
        elif action_def.category == ActionCategory.CODE:
            code = request.parameters.get('code', '')
            language = request.parameters.get('language', 'python')
            return await executor.execute_safely(code, language)
        
        else:
            # 通用处理
            if asyncio.iscoroutinefunction(action_def.handler):
                return await action_def.handler(**request.parameters)
            else:
                return action_def.handler(**request.parameters)
    
    def _log_audit(self, request: ActionRequest, result: str,
                   error: Optional[str] = None,
                   duration_ms: Optional[float] = None):
        """记录审计日志"""
        entry = AuditEntry(
            timestamp=datetime.now(),
            agent_id=request.agent_id,
            action=request.action_name,
            parameters=request.parameters,
            result=result,
            error=error,
            duration_ms=duration_ms
        )
        self.audit_logger.log(entry)
    
    def get_available_actions(self, 
                             agent_id: str) -> List[Dict[str, Any]]:
        """获取Agent可用的行动列表"""
        available = []
        agent_permissions = self.permission_manager.get_permissions(agent_id)
        
        for name, action_def in self.actions.items():
            if action_def.permission_required:
                required_perm = Permission(action_def.permission_required)
                if required_perm not in agent_permissions:
                    continue
            
            available.append({
                "name": name,
                "category": action_def.category.value,
                "description": action_def.description,
                "parameters": action_def.parameters_schema
            })
        
        return available
    
    def to_function_schemas(self, 
                           agent_id: str) -> List[Dict[str, Any]]:
        """转换为Function Calling格式"""
        actions = self.get_available_actions(agent_id)
        schemas = []
        
        for action in actions:
            schemas.append({
                "name": action["name"],
                "description": action["description"],
                "parameters": {
                    "type": "object",
                    "properties": action["parameters"],
                    "required": [
                        k for k, v in action["parameters"].items()
                        if v.get("required", False)
                    ]
                }
            })
        
        return schemas


# ============== 便捷的装饰器 ==============

def action(name: str, 
          category: ActionCategory = ActionCategory.TOOL,
          description: str = "",
          permission: Optional[str] = None,
          timeout: float = 30.0,
          requires_confirmation: bool = False):
    """行动注册装饰器"""
    def decorator(func):
        # 从函数签名推断参数schema
        import inspect
        sig = inspect.signature(func)
        params_schema = {}
        
        for param_name, param in sig.parameters.items():
            if param_name in ('self', 'cls'):
                continue
            
            param_info = {"type": "string"}  # 默认类型
            if param.annotation != inspect.Parameter.empty:
                if param.annotation == int:
                    param_info["type"] = "integer"
                elif param.annotation == float:
                    param_info["type"] = "number"
                elif param.annotation == bool:
                    param_info["type"] = "boolean"
                elif param.annotation == list:
                    param_info["type"] = "array"
            
            if param.default == inspect.Parameter.empty:
                param_info["required"] = True
            else:
                param_info["default"] = param.default
            
            params_schema[param_name] = param_info
        
        func._action_definition = ActionDefinition(
            name=name,
            category=category,
            description=description or func.__doc__ or "",
            parameters_schema=params_schema,
            handler=func,
            permission_required=permission,
            timeout=timeout,
            requires_confirmation=requires_confirmation
        )
        
        return func
    return decorator


# ============== 完整使用示例 ==============

class MyAgentActions:
    """自定义Agent行动集合"""
    
    def __init__(self):
        self.action_module = ActionModule({
            'enable_code_execution': True,
            'sandbox': {
                'cpu_limit': 0.5,
                'memory_limit': '256m',
                'timeout': 10
            }
        })
        self._register_actions()
    
    def _register_actions(self):
        """注册所有行动"""
        
        # 搜索行动
        @action(
            name="web_search",
            category=ActionCategory.API,
            description="Search the web for information",
            permission="web_read"
        )
        async def web_search(query: str, num_results: int = 5):
            # 实际应调用搜索API
            return {"query": query, "results": []}
        
        self.action_module.register_action(web_search._action_definition)
        
        # 计算器行动
        @action(
            name="calculator",
            category=ActionCategory.TOOL,
            description="Perform mathematical calculations"
        )
        def calculator(expression: str):
            import ast
            import operator
            
            ops = {
                ast.Add: operator.add,
                ast.Sub: operator.sub,
                ast.Mult: operator.mul,
                ast.Div: operator.truediv,
            }
            
            def eval_node(node):
                if isinstance(node, ast.Num):
                    return node.n
                elif isinstance(node, ast.BinOp):
                    return ops[type(node.op)](
                        eval_node(node.left),
                        eval_node(node.right)
                    )
                raise ValueError("Invalid expression")
            
            tree = ast.parse(expression, mode='eval')
            return eval_node(tree.body)
        
        self.action_module.register_action(calculator._action_definition)
        
        # 代码执行行动
        @action(
            name="execute_code",
            category=ActionCategory.CODE,
            description="Execute Python code in sandbox",
            permission="code_execute",
            timeout=30.0,
            requires_confirmation=True
        )
        async def execute_code(code: str, language: str = "python"):
            pass  # 由沙箱执行器处理
        
        self.action_module.register_action(execute_code._action_definition)
    
    async def run_demo(self):
        """演示运行"""
        import uuid
        
        # 注册Agent
        self.action_module.permission_manager.register_agent(
            Agent(
                agent_id="demo-agent",
                name="Demo Agent",
                roles=["premium"]
            )
        )
        
        # 执行计算
        request = ActionRequest(
            action_name="calculator",
            parameters={"expression": "2 + 3 * 4"},
            agent_id="demo-agent"
        )
        
        response = await self.action_module.execute(request)
        print(f"Calculator result: {response.result}")
        
        # 获取可用行动
        available = self.action_module.get_available_actions("demo-agent")
        print(f"Available actions: {[a['name'] for a in available]}")

# 运行
# agent_actions = MyAgentActions()
# asyncio.run(agent_actions.run_demo())

8.3 测试与优化

import pytest
import asyncio
from unittest.mock import Mock, patch, AsyncMock

class TestActionModule:
    """行动模块测试"""
    
    @pytest.fixture
    def action_module(self):
        """创建测试用的行动模块"""
        return ActionModule({
            'enable_code_execution': False
        })
    
    @pytest.fixture
    def sample_action(self):
        """创建示例行动"""
        async def handler(x: int, y: int) -> int:
            return x + y
        
        return ActionDefinition(
            name="add",
            category=ActionCategory.TOOL,
            description="Add two numbers",
            parameters_schema={
                "x": {"type": "integer", "required": True},
                "y": {"type": "integer", "required": True}
            },
            handler=handler
        )
    
    @pytest.mark.asyncio
    async def test_register_and_execute(self, action_module, sample_action):
        """测试注册和执行行动"""
        action_module.register_action(sample_action)
        
        # 注册Agent
        action_module.permission_manager.register_agent(
            Agent("test-agent", "Test", ["basic"])
        )
        
        request = ActionRequest(
            action_name="add",
            parameters={"x": 1, "y": 2},
            agent_id="test-agent"
        )
        
        response = await action_module.execute(request)
        
        assert response.success
        assert response.result == 3
    
    @pytest.mark.asyncio
    async def test_permission_denied(self, action_module):
        """测试权限拒绝"""
        action = ActionDefinition(
            name="admin_action",
            category=ActionCategory.TOOL,
            description="Admin only action",
            parameters_schema={},
            handler=lambda: None,
            permission_required="system_admin"
        )
        action_module.register_action(action)
        
        action_module.permission_manager.register_agent(
            Agent("basic-agent", "Basic", ["basic"])
        )
        
        request = ActionRequest(
            action_name="admin_action",
            parameters={},
            agent_id="basic-agent"
        )
        
        response = await action_module.execute(request)
        
        assert not response.success
        assert "Permission denied" in response.error
    
    @pytest.mark.asyncio
    async def test_timeout(self, action_module):
        """测试超时处理"""
        async def slow_handler():
            await asyncio.sleep(10)
            return "done"
        
        action = ActionDefinition(
            name="slow_action",
            category=ActionCategory.TOOL,
            description="Slow action",
            parameters_schema={},
            handler=slow_handler,
            timeout=0.1
        )
        action_module.register_action(action)
        
        action_module.permission_manager.register_agent(
            Agent("test-agent", "Test", ["basic"])
        )
        
        request = ActionRequest(
            action_name="slow_action",
            parameters={},
            agent_id="test-agent"
        )
        
        response = await action_module.execute(request)
        
        assert not response.success
        assert "timeout" in response.error.lower()
    
    @pytest.mark.asyncio
    async def test_audit_logging(self, action_module, sample_action):
        """测试审计日志"""
        action_module.register_action(sample_action)
        action_module.permission_manager.register_agent(
            Agent("test-agent", "Test", ["basic"])
        )
        
        request = ActionRequest(
            action_name="add",
            parameters={"x": 1, "y": 2},
            agent_id="test-agent"
        )
        
        await action_module.execute(request)
        
        # 检查审计日志
        logs = action_module.audit_logger.query(agent_id="test-agent")
        assert len(logs) == 1
        assert logs[0].action == "add"
        assert logs[0].result == "success"


class TestSandbox:
    """沙箱测试"""
    
    @pytest.fixture
    def secure_sandbox(self):
        """创建安全沙箱"""
        config = SandboxConfig(
            timeout=5,
            memory_limit="128m"
        )
        docker_sandbox = DockerSandbox(config)
        sandbox = SecureSandbox(docker_sandbox)
        sandbox.add_analyzer(python_security_analyzer)
        return sandbox
    
    @pytest.mark.asyncio
    async def test_safe_code_execution(self, secure_sandbox):
        """测试安全代码执行"""
        code = """
print("Hello, World!")
result = 1 + 1
print(f"Result: {result}")
"""
        result = await secure_sandbox.execute_safely(code, "python")
        
        assert result.exit_code == 0
        assert "Hello, World!" in result.stdout
    
    @pytest.mark.asyncio
    async def test_dangerous_code_blocked(self, secure_sandbox):
        """测试危险代码阻止"""
        dangerous_code = """
import os
os.system("rm -rf /")
"""
        result = await secure_sandbox.execute_safely(dangerous_code, "python")
        
        assert result.exit_code == -1
        assert "not allowed" in result.stderr.lower()


# 性能测试
class TestPerformance:
    """性能测试"""
    
    @pytest.mark.asyncio
    async def test_concurrent_execution(self):
        """测试并发执行性能"""
        action_module = ActionModule()
        
        async def quick_action():
            await asyncio.sleep(0.01)
            return "done"
        
        action = ActionDefinition(
            name="quick",
            category=ActionCategory.TOOL,
            description="Quick action",
            parameters_schema={},
            handler=quick_action
        )
        action_module.register_action(action)
        action_module.permission_manager.register_agent(
            Agent("test", "Test", ["basic"])
        )
        
        # 并发执行100个请求
        import time
        start = time.time()
        
        tasks = []
        for i in range(100):
            request = ActionRequest(
                action_name="quick",
                parameters={},
                agent_id="test"
            )
            tasks.append(action_module.execute(request))
        
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start
        
        success_count = sum(1 for r in results if r.success)
        print(f"Completed {success_count}/100 in {elapsed:.2f}s")
        
        assert success_count == 100
        assert elapsed < 5.0  # 应在5秒内完成

9. 前沿进展与未来展望

🔬 前沿研究方向

┌─────────────────────────────────────────────────────────────────────┐
│                    Agent行动模块前沿研究                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  1. 工具学习 (Tool Learning)                                        │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │  • Toolformer: 自监督工具使用学习                            │ │
│  │  • ToolBench: 大规模工具调用基准测试                         │ │
│  │  • TALM: 工具增强语言模型                                    │ │
│  │  • API-Bank: API调用能力评估                                 │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                      │
│  2. 具身智能 (Embodied AI)                                          │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │  • PaLM-E: 具身多模态语言模型                                │ │
│  │  • RT-2: Vision-Language-Action模型                          │ │
│  │  • SayCan: 语言模型指导机器人                                │ │
│  │  • Open X-Embodiment: 跨机器人迁移学习                       │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                      │
│  3. 代码生成与执行                                                   │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │  • CodeAct: 代码作为行动的Agent框架                          │ │
│  │  • OpenInterpreter: 自然语言编程接口                         │ │
│  │  • TaskWeaver: 代码优先的Agent框架                           │ │
│  │  • AutoGen: 多Agent代码协作                                  │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                      │
│  4. 安全与可控性                                                     │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │  • Constitutional AI: 价值对齐                               │ │
│  │  • Tool Use Safety: 工具使用安全性                           │ │
│  │  • Sandboxing Techniques: 沙箱技术创新                       │ │
│  │  • Interpretable Actions: 可解释的行动决策                   │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

🔮 未来展望

💡 思考:Agent行动能力的发展会走向何方?

🤔 解答

短期(1-2年):

  • Function Calling能力持续增强,支持更复杂的工具组合
  • 代码执行沙箱更加成熟,安全性和性能平衡更好
  • 多Agent协作完成复杂任务成为常态

中期(3-5年):

  • 具身智能取得突破,LLM驱动的机器人进入家庭
  • 通用工具使用能力:Agent能自主学习新工具
  • 行动规划与执行的深度融合,端到端优化

长期(5-10年):

  • 真正的通用Agent:能处理开放世界的任意任务
  • 自主行动决策:在复杂环境中做出安全、有效的决策
  • 人机协作新范式:Agent成为人类的延伸

10. 总结

本文系统性地探讨了Agent行动模块的设计与实现,主要内容包括:

📝 核心要点回顾

  1. 行动模块概述

    • 行动模块是Agent与世界交互的"手脚"
    • 核心能力包括:工具调用、API交互、代码执行、具身控制
    • 架构设计需要考虑安全性、可扩展性、可观测性
  2. 工具使用与Toolformer

    • Toolformer开创了自监督工具学习的新范式
    • 工具选择与编排是复杂任务执行的关键
    • 策略包括:顺序、并行、条件、循环
  3. API调用

    • RESTful API是连接外部服务的主要方式
    • Function Calling简化了LLM与工具的集成
    • 错误处理、重试、熔断是生产级必备
  4. 代码执行

    • 代码执行提供无限灵活性
    • 多语言运行时支持不同场景
    • 安全是首要考虑
  5. 安全沙箱

    • 沙箱是执行不可信代码的必要保障
    • 容器化(Docker)是主流方案
    • 权限控制与审计保障合规
  6. 具身智能

    • SayCan证明了LLM与机器人结合的可行性
    • 多模态感知是具身智能的基础
    • 行动价值评估(Affordance)是关键

🎯 设计原则

  1. 安全第一:所有行动都应在安全边界内执行
  2. 最小权限:只授予必要的权限
  3. 可审计:所有行动都应被记录
  4. 可恢复:错误应能被检测和恢复
  5. 可扩展:新行动应易于添加

🚀 实践建议

  1. 从简单开始:先实现基础工具调用,再扩展复杂能力
  2. 重视测试:行动模块需要全面的单元测试和集成测试
  3. 监控先行:在生产环境部署前,确保有完善的监控
  4. 安全审查:定期审查权限配置和审计日志
  5. 持续迭代:根据使用情况不断优化行动库

通过本文的学习,相信读者已经对Agent行动模块有了全面的理解。行动能力是Agent从"智能助手"进化为"智能代理"的关键一步,也是AI走向实用化的重要里程碑。


💬 互动讨论:你在构建Agent行动模块时遇到过哪些挑战?欢迎在评论区分享你的经验!

如果这篇文章对你有帮助,别忘了点赞、收藏、关注三连!


参考文献

[1] Schick, T., Dwivedi-Yu, J., Dessì, R., Raileanu, R., Lomeli, M., Zettlemoyer, L., … & Scialom, T. (2023). Toolformer: Language Models Can Teach Themselves to Use Tools. arXiv preprint arXiv:2302.04761.

[2] Ahn, M., Brohan, A., Brown, N., Chebotar, Y., Cortes, O., David, B., … & Zeng, A. (2022). Do As I Can, Not As I Say: Grounding Language in Robotic Affordances. arXiv preprint arXiv:2204.01691.

[3] Driess, D., Xia, F., Sajjadi, M. S., Lynch, C., Chowdhery, A., Ichter, B., … & Florence, P. (2023). PaLM-E: An Embodied Multimodal Language Model. arXiv preprint arXiv:2303.03378.

[4] Brohan, A., Brown, N., Carbajal, J., Chebotar, Y., Chen, X., Choromanski, K., … & Zitkovich, B. (2023). RT-2: Vision-Language-Action Models Transfer Web Knowledge to Robotic Control. arXiv preprint arXiv:2307.15818.

[5] Qin, Y., Liang, S., Ye, Y., Zhu, K., Yan, L., Lu, Y., … & Sun, M. (2023). ToolLLM: Facilitating Large Language Models to Master 16000+ Real-world APIs. arXiv preprint arXiv:2307.16789.

[6] Wang, L., Ma, C., Feng, X., Zhang, Z., Yang, H., Zhang, J., … & Wang, J. (2023). A Survey on Large Language Model based Autonomous Agents. arXiv preprint arXiv:2308.11432.

[7] Patil, S. G., Zhang, T., Wang, X., & Gonzalez, J. E. (2023). Gorilla: Large Language Model Connected with Massive APIs. arXiv preprint arXiv:2305.15334.

[8] Wang, G., Xie, Y., Jiang, Y., Mandlekar, A., Xiao, C., Zhu, Y., … & Anandkumar, A. (2023). Voyager: An Open-Ended Embodied Agent with Large Language Models. arXiv preprint arXiv:2305.16291.

[9] Wu, Q., Bansal, G., Zhang, J., Wu, Y., Zhang, S., Zhu, E., … & Wang, C. (2023). AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation. arXiv preprint arXiv:2308.08155.

[10] Open X-Embodiment Collaboration. (2023). Open X-Embodiment: Robotic Learning Datasets and RT-X Models. arXiv preprint arXiv:2310.08864.

[11] Liu, Z., Yao, W., Zhang, J., Xue, L., Heinecke, S., Murber, R., … & Savarese, S. (2023). BOLAA: Benchmarking and Orchestrating LLM-augmented Autonomous Agents. arXiv preprint arXiv:2308.05960.

[12] Tang, Q., Deng, Z., Lin, H., Han, X., Liang, Q., & Sun, L. (2023). ToolAlpaca: Generalized Tool Learning for Language Models with 3000 Simulated Cases. arXiv preprint arXiv:2306.05301.

[13] Shen, Y., Song, K., Tan, X., Li, D., Lu, W., & Zhuang, Y. (2023). HuggingGPT: Solving AI Tasks with ChatGPT and its Friends in Hugging Face. arXiv preprint arXiv:2303.17580.

[14] Li, M., Song, F., Yu, B., Yu, H., Li, Z., Huang, F., & Li, Y. (2023). API-Bank: A Benchmark for Tool-Augmented LLMs. arXiv preprint arXiv:2304.08244.

[15] Hao, S., Gu, Y., Ma, H., Hong, J., Wang, Z., Wang, D., & Hu, Z. (2023). Reasoning with Language Model is Planning with World Model. arXiv preprint arXiv:2305.14992.

[16] Nakano, R., Hilton, J., Balaji, S., Wu, J., Ouyang, L., Kim, C., … & Schulman, J. (2021). WebGPT: Browser-assisted Question-answering with Human Feedback. arXiv preprint arXiv:2112.09332.

[17] Yang, Z., Li, L., Wang, J., Lin, K., Azarnasab, E., Ahmed, F., … & Wang, L. (2023). MM-REACT: Prompting ChatGPT for Multimodal Reasoning and Action. arXiv preprint arXiv:2303.11381.

[18] Xie, T., Zhou, F., Cheng, Z., Shi, P., Weng, L., Liu, Y., … & Lou, J. (2023). OpenAgents: An Open Platform for Language Agents in the Wild. arXiv preprint arXiv:2310.10634.

[19] Reed, S., Zolna, K., Parisotto, E., Colmenarejo, S. G., Novikov, A., Barth-Maron, G., … & de Freitas, N. (2022). A Generalist Agent. arXiv preprint arXiv:2205.06175.

[20] Xi, Z., Chen, W., Guo, X., He, W., Ding, Y., Hong, B., … & Gui, T. (2023). The Rise and Potential of Large Language Model Based Agents: A Survey. arXiv preprint arXiv:2309.07864.

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐