Dify Chatflow 使用指南

一、Chatflow 基础概念

1.1 什么是 Chatflow?

Chatflow 是 Dify 的核心功能之一,用于可视化构建复杂的对话流程。它通过节点连接的方式,让开发者可以设计多步骤、带逻辑判断的对话应用。

1.2 主要组件

组件 说明
节点 执行特定功能的单元(LLM调用、知识库检索等)
连接线 定义数据流向和流程逻辑
变量 存储和传递数据的容器
输入/输出 流程的开始和结束点

二、快速开始:创建第一个 Chatflow

2.1 基础聊天流程

# 最简单的 Chatflow 示例
流程:
  开始 → LLM节点 → 结束

操作步骤:

  1. 登录 Dify → 创建应用 → 选择"对话型应用"
  2. 切换到工作流模式:点击右上角"工作流"按钮
  3. 添加节点:从左侧拖拽节点到画布
  4. 配置节点:双击节点设置参数
  5. 连接节点:从一个节点的输出拖到下一个节点的输入
  6. 测试发布:点击测试按钮,然后发布

2.2 节点详解

常用节点类型
基础节点:
  - Start: 流程起始点,定义输入变量
  - LLM: 调用大语言模型
  - Knowledge Retrieval: 知识库检索
  - Question Classifier: 问题分类器
  - If/Else: 条件分支
  - End: 流程结束点

高级节点:
  - Code: 执行Python代码
  - HTTP Request: 调用外部API
  - Template Transform: 模板转换
  - Variable Assigner: 变量赋值
  - Loop: 循环执行
  - Join: 合并分支

三、核心功能详解

3.1 变量系统

# 变量类型和使用
variables:
  # 系统变量(自动创建)
  - name: query       # 用户输入
  - name: conversation_id  # 对话ID
  
  # 自定义变量
  - name: user_name   # 字符串
  - name: user_age    # 数字
  - name: search_results  # 数组
  - name: user_info   # 对象

在节点中使用变量:

# 在LLM节点提示词中使用
你好,{{user_name}}!你今天{{query}},有什么我可以帮助的?

# 在知识库检索中使用
{{query}} 的相关信息是什么?

# 在HTTP请求中使用
{
  "question": "{{query}}",
  "user_id": "{{user_id}}"
}

3.2 条件分支 (If/Else)

# 条件判断示例
流程:
  开始 → Question Classifier → If/Else
              ↓                      ↓
        [简单问题]           [复杂问题]
              ↓                      ↓
        LLM直接回答       知识库检索 → LLM处理

配置条件表达式:

# 基于问题分类结果
{{classification}} == "greeting"

# 基于知识库检索结果
{{knowledge_retrieval.output}} != ""

# 组合条件
{{query.length}} > 10 and "error" not in {{code_execution.result}}

3.3 知识库检索集成

# 知识库检索流程
节点配置:
  知识库检索:
    - 选择知识库: "产品手册"
    - 查询文本: "{{query}}"
    - 返回条数: 3
    - 相似度阈值: 0.7
    - 启用重排: true
    
输出结构:
  - documents: 检索到的文档列表
  - context: 合并的上下文文本

高级检索技巧:

# 多知识库检索
步骤:
  1. HTTP Request → 调用自定义检索API
  2. Template Transform → 格式化结果
  3. Join → 合并多个检索结果
  
# 混合检索
- 向量检索: 语义相似度
- 关键词检索: BM25/Elasticsearch
- 元数据过滤: 按标签、时间过滤

四、实战案例

4.1 案例1:智能客服机器人

name: 智能客服流程
description: 处理用户咨询,自动分类并调用相应知识库

flow:
  # 开始节点
  - id: start
    type: Start
    outputs:
      - name: question
        type: string
        description: 用户问题
  
  # 问题分类
  - id: classifier
    type: QuestionClassifier
    config:
      categories:
        - name: product_query
          description: 产品咨询
        - name: technical_support
          description: 技术支持
        - name: billing
          description: 账单问题
        - name: other
          description: 其他问题
      default_category: other
  
  # 条件分支
  - id: router
    type: IfElse
    conditions:
      - expression: "{{classifier.output}} == 'product_query'"
        target: product_kb
      - expression: "{{classifier.output}} == 'technical_support'"
        target: tech_kb
      - expression: "{{classifier.output}} == 'billing'"
        target: billing_kb
      - expression: true  # 默认
        target: fallback_llm
  
  # 知识库检索分支
  - id: product_kb
    type: KnowledgeRetrieval
    config:
      knowledge_base_id: "product_manual"
      query: "{{question}}"
      top_k: 5
  
  - id: tech_kb
    type: KnowledgeRetrieval
    config:
      knowledge_base_id: "technical_docs"
      query: "{{question}}"
      top_k: 3
  
  - id: billing_kb
    type: HTTPRequest
    config:
      url: "https://api.company.com/billing/query"
      method: POST
      headers:
        Authorization: "Bearer {{api_token}}"
      body:
        question: "{{question}}"
        user_id: "{{user_id}}"
  
  # LLM处理
  - id: llm_process
    type: LLM
    config:
      model: gpt-4
      prompt: |
        你是专业客服助手,基于以下信息回答用户问题:
        
        用户问题:{{question}}
        
        相关信息:
        {{context}}
        
        回答要求:
        1. 如果信息充分,直接回答问题
        2. 如果信息不足,说明无法回答并提供帮助渠道
        3. 保持专业友好的语气
        
        回答:
    variables:
      context: |
        {{#if product_kb.output}}
          产品信息:{{product_kb.output}}
        {{/if}}
        {{#if tech_kb.output}}
          技术文档:{{tech_kb.output}}
        {{/if}}
        {{#if billing_kb.output}}
          账单信息:{{billing_kb.output}}
        {{/if}}
  
  # 备用回答
  - id: fallback_llm
    type: LLM
    config:
      model: gpt-3.5-turbo
      prompt: |
        用户问:{{question}}
        
        这个问题超出了我的知识范围,请帮我生成一个礼貌的回复,
        建议用户联系人工客服或访问帮助中心。
        
        回复:
  
  # 结束节点
  - id: end
    type: End
    outputs:
      - name: answer
        value: "{{#if llm_process.output}}{{llm_process.output}}{{else}}{{fallback_llm.output}}{{/if}}"

4.2 案例2:数据分析助手

name: 数据分析流程
description: 分析用户问题,执行代码,返回分析结果

flow:
  # 开始
  - id: start
    type: Start
    outputs:
      - name: user_query
        type: string
  
  # 解析用户意图
  - id: intent_parser
    type: LLM
    config:
      model: gpt-4
      prompt: |
        分析用户的数据分析需求:
        
        用户查询:{{user_query}}
        
        请输出JSON格式:
        {
          "data_source": "sales|users|products|custom",
          "analysis_type": "summary|trend|comparison|prediction",
          "metrics": ["metric1", "metric2"],
          "time_range": "last_week|last_month|last_year|custom",
          "filters": {"key": "value"}
        }
  
  # 数据获取
  - id: data_fetcher
    type: IfElse
    conditions:
      - expression: "{{intent_parser.output.data_source}} == 'sales'"
        target: fetch_sales_data
      - expression: "{{intent_parser.output.data_source}} == 'users'"
        target: fetch_user_data
      - expression: true
        target: fetch_custom_data
  
  - id: fetch_sales_data
    type: HTTPRequest
    config:
      url: "{{sales_api}}/query"
      method: POST
      body:
        metrics: "{{intent_parser.output.metrics}}"
        time_range: "{{intent_parser.output.time_range}}"
        filters: "{{intent_parser.output.filters}}"
  
  # 数据分析
  - id: data_analyzer
    type: Code
    config:
      code: |
        import pandas as pd
        import json
        
        # 获取数据
        data = {{data_fetcher.output}}
        df = pd.DataFrame(data)
        
        # 根据分析类型处理
        analysis_type = "{{intent_parser.output.analysis_type}}"
        
        if analysis_type == "summary":
            result = {
                "row_count": len(df),
                "columns": list(df.columns),
                "summary_stats": df.describe().to_dict()
            }
        elif analysis_type == "trend":
            # 时间序列分析
            if 'date' in df.columns:
                df['date'] = pd.to_datetime(df['date'])
                result = df.groupby(df['date'].dt.to_period('M')).sum().to_dict()
            else:
                result = {"error": "No date column for trend analysis"}
        elif analysis_type == "comparison":
            # 对比分析
            if 'category' in df.columns:
                result = df.groupby('category').mean().to_dict()
            else:
                result = {"error": "No category column for comparison"}
        else:
            result = {"error": f"Unsupported analysis type: {analysis_type}"}
        
        return json.dumps(result, default=str)
  
  # 生成报告
  - id: report_generator
    type: LLM
    config:
      model: gpt-4
      prompt: |
        根据数据分析结果生成中文报告:
        
        用户问题:{{user_query}}
        分析类型:{{intent_parser.output.analysis_type}}
        数据源:{{intent_parser.output.data_source}}
        
        分析结果(JSON格式):
        {{data_analyzer.output}}
        
        请生成结构化的分析报告,包括:
        1. 主要发现
        2. 关键指标
        3. 趋势分析(如果有)
        4. 建议
        
        报告:
  
  # 可视化(可选)
  - id: visualization
    type: Code
    config:
      code: |
        import plotly.graph_objects as go
        import json
        
        data = json.loads('{{data_fetcher.output}}')
        
        # 创建图表
        if "{{intent_parser.output.analysis_type}}" == "trend":
            fig = go.Figure(data=go.Scatter(
                x=list(data.keys()),
                y=list(data.values()),
                mode='lines+markers'
            ))
        else:
            fig = go.Figure(data=go.Bar(
                x=list(data.keys()),
                y=list(data.values())
            ))
        
        # 转换为HTML
        chart_html = fig.to_html(include_plotlyjs='cdn')
        return chart_html
  
  # 结束
  - id: end
    type: End
    outputs:
      - name: analysis_report
        value: "{{report_generator.output}}"
      - name: visualization_html
        value: "{{visualization.output}}"

4.3 案例3:多轮对话系统

name: 多轮对话管理
description: 处理带上下文的连续对话

flow:
  # 开始(带历史上下文)
  - id: start
    type: Start
    outputs:
      - name: current_query
        type: string
      - name: conversation_history
        type: array
        default: []
  
  # 对话状态管理
  - id: state_manager
    type: Code
    config:
      code: |
        import json
        
        history = {{conversation_history}}
        current_query = "{{current_query}}"
        
        # 分析对话状态
        if not history:
            state = "new_conversation"
            context_summary = ""
        else:
            # 从历史中提取关键信息
            last_3_turns = history[-6:]  # 最近3轮对话
            context_summary = " ".join([
                f"用户:{turn['user']} 助手:{turn['assistant']}"
                for turn in last_3_turns
            ])
            
            # 判断是否在同一个话题
            topics = ["价格", "功能", "技术支持", "购买"]
            current_topic = next((t for t in topics if t in current_query), "其他")
            
            state = f"continuing_{current_topic}_topic"
        
        return json.dumps({
            "state": state,
            "context_summary": context_summary,
            "turn_count": len(history) // 2
        })
  
  # 查询改写(考虑上下文)
  - id: query_rewriter
    type: LLM
    config:
      model: gpt-3.5-turbo
      prompt: |
        根据对话历史改写当前查询,使其更完整:
        
        对话历史:
        {{state_manager.output.context_summary}}
        
        当前查询:{{current_query}}
        
        改写后的查询:
  
  # 知识库检索
  - id: knowledge_retrieval
    type: KnowledgeRetrieval
    config:
      query: "{{query_rewriter.output}}"
      knowledge_base_id: "faq_kb"
      top_k: 5
  
  # 生成回答(带上下文感知)
  - id: response_generator
    type: LLM
    config:
      model: gpt-4
      prompt: |
        基于以下信息生成回答:
        
        当前问题:{{current_query}}
        上下文摘要:{{state_manager.output.context_summary}}
        对话轮次:{{state_manager.output.turn_count}}
        
        相关知识点:
        {{knowledge_retrieval.output}}
        
        生成要求:
        1. 如果这是新对话(轮次=0),要热情欢迎
        2. 如果对话超过5轮,可以主动询问是否需要其他帮助
        3. 回答要连贯,保持对话的连续性
        4. 必要时可以反问以澄清需求
        
        回答:
  
  # 更新对话历史
  - id: history_updater
    type: Code
    config:
      code: |
        import json
        
        history = {{conversation_history}}
        
        # 添加当前轮次
        history.append({
            "user": "{{current_query}}",
            "assistant": "{{response_generator.output}}",
            "timestamp": "{{now}}"
        })
        
        # 限制历史长度(保留最近20轮)
        if len(history) > 20:
            history = history[-20:]
        
        return json.dumps(history)
  
  # 结束
  - id: end
    type: End
    outputs:
      - name: response
        value: "{{response_generator.output}}"
      - name: updated_history
        value: "{{history_updater.output}}"

五、高级技巧

5.1 使用模板引擎

{# Handlebars 模板语法 #}

{# 条件判断 #}
{{#if variable}}
  变量存在时的内容
{{else}}
  变量不存在时的内容
{{/if}}

{# 循环 #}
{{#each array}}
  索引:{{@index}},值:{{this}}
{{/each}}

{# 逻辑运算 #}
{{#and condition1 condition2}}
  两者都为真
{{/and}}

{{#or condition1 condition2}}
  任一为真
{{/or}}

{# 比较 #}
{{#eq var1 var2}}相等{{/eq}}
{{#gt var1 var2}}大于{{/gt}}
{{#lt var1 var2}}小于{{/lt}}

5.2 错误处理

# 错误处理流程示例
流程:
  开始 → 尝试执行 → 成功? → 结束
                    ↓
                  失败 → 错误处理 → 备用方案 → 结束

错误处理节点配置:

# Code 节点中的错误处理
try:
    result = some_operation()
    return {"success": True, "data": result}
except Exception as e:
    return {
        "success": False,
        "error": str(e),
        "fallback_data": get_fallback_data()
    }

# 在后续节点中判断
{{#if previous_node.output.success}}
  使用 {{previous_node.output.data}}
{{else}}
  使用备用数据:{{previous_node.output.fallback_data}}
  记录错误:{{previous_node.output.error}}
{{/if}}

5.3 性能优化

# 并行执行示例
流程:
  开始 → 并行分支
           ├→ 分支1: 知识库检索
           ├→ 分支2: API调用1
           ├→ 分支3: API调用2
           └→ 分支4: 本地计算
        ↓
  合并结果 → LLM处理 → 结束

配置并行执行:

# 使用 Join 节点合并并行分支
join_config:
  wait_for_all: true  # 等待所有分支完成
  timeout: 30000      # 超时时间(毫秒)
  
# 或者使用条件合并
conditions:
  - expression: "{{branch1.output}} != null"
    target: continue
  - expression: "{{branch2.output}} != null"
    target: continue

六、调试与测试

6.1 调试工具

# 在 Code 节点中添加调试输出
print(f"DEBUG: 当前变量 = {locals()}")
import logging
logging.info(f"数据: {data}")

# 使用 Dify 的调试面板
# 1. 点击流程中的"调试"按钮
# 2. 查看每个节点的输入输出
# 3. 查看执行时间线
# 4. 下载调试日志

6.2 测试用例

# 创建测试用例
test_cases:
  - name: "简单问候"
    inputs:
      query: "你好"
    expected_output_contains: ["你好", "欢迎"]
    
  - name: "复杂问题"
    inputs:
      query: "如何安装产品并配置数据库?"
    expected_output_contains: ["安装", "配置", "步骤"]
    max_response_time: 5000  # 最大响应时间(毫秒)
    
  - name: "错误处理测试"
    inputs:
      query: "故意错误的查询"
    expected_output_contains: ["抱歉", "无法", "帮助"]

七、API 调用 Chatflow

7.1 基本调用

import requests
import json

class DifyChatflowClient:
    def __init__(self, api_key, base_url="http://localhost:5001"):
        self.api_key = api_key
        self.base_url = base_url
    
    def run_workflow(self, workflow_id, inputs, stream=False):
        """运行工作流"""
        url = f"{self.base_url}/v1/workflows/run"
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "inputs": inputs,
            "response_mode": "streaming" if stream else "blocking",
            "user": "user_123"
        }
        
        response = requests.post(
            url,
            headers=headers,
            json=payload,
            stream=stream
        )
        
        if stream:
            return self._handle_stream_response(response)
        else:
            return response.json()
    
    def _handle_stream_response(self, response):
        """处理流式响应"""
        for line in response.iter_lines():
            if line:
                line = line.decode('utf-8')
                if line.startswith('data: '):
                    data = line[6:]  # 去掉"data: "前缀
                    if data == '[DONE]':
                        break
                    try:
                        yield json.loads(data)
                    except:
                        yield {"data": data}
    
    def get_workflow_status(self, task_id):
        """获取任务状态"""
        url = f"{self.base_url}/v1/tasks/{task_id}"
        headers = {"Authorization": f"Bearer {self.api_key}"}
        response = requests.get(url, headers=headers)
        return response.json()
    
    def batch_run(self, workflow_id, queries, max_concurrent=5):
        """批量运行"""
        import asyncio
        import aiohttp
        
        async def run_one(session, query):
            async with session.post(
                f"{self.base_url}/v1/workflows/run",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={"inputs": {"query": query}}
            ) as response:
                return await response.json()
        
        async def main():
            async with aiohttp.ClientSession() as session:
                tasks = []
                for query in queries:
                    task = run_one(session, query)
                    tasks.append(task)
                    
                    # 控制并发数
                    if len(tasks) >= max_concurrent:
                        results = await asyncio.gather(*tasks)
                        for result in results:
                            yield result
                        tasks = []
                
                # 处理剩余任务
                if tasks:
                    results = await asyncio.gather(*tasks)
                    for result in results:
                        yield result
        
        return asyncio.run(main())

# 使用示例
client = DifyChatflowClient(api_key="your-api-key")

# 同步调用
result = client.run_workflow(
    workflow_id="workflow_123",
    inputs={"query": "今天天气怎么样?"},
    stream=False
)
print(result["data"])

# 流式调用
for chunk in client.run_workflow(
    workflow_id="workflow_123",
    inputs={"query": "今天天气怎么样?"},
    stream=True
):
    print(chunk.get("answer", ""), end="", flush=True)

7.2 Webhook 集成

# 配置 Chatflow 输出到 Webhook
webhook_config:
  url: "https://your-server.com/webhook/callback"
  method: "POST"
  headers:
    Authorization: "Bearer your-webhook-token"
  retry_times: 3
  timeout: 10000

# 接收 Webhook 的服务器
from flask import Flask, request
import json

app = Flask(__name__)

@app.route('/webhook/callback', methods=['POST'])
def webhook_callback():
    data = request.json
    
    # 处理 Dify 返回的数据
    workflow_id = data.get('workflow_id')
    task_id = data.get('task_id')
    outputs = data.get('outputs', {})
    
    # 业务逻辑处理
    process_result(outputs)
    
    return {"status": "success"}, 200

def process_result(outputs):
    """处理工作流输出"""
    answer = outputs.get('answer', '')
    context = outputs.get('context', '')
    metadata = outputs.get('metadata', {})
    
    # 保存到数据库
    save_to_database({
        'answer': answer,
        'context': context,
        'metadata': metadata
    })
    
    # 发送通知
    send_notification(f"新回答:{answer[:100]}...")

八、最佳实践

8.1 设计原则

  1. 模块化设计:每个节点功能单一,便于复用
  2. 错误处理:每个可能失败的节点都要有错误处理分支
  3. 性能考虑:并行执行独立任务,设置超时时间
  4. 可维护性:使用有意义的节点名称,添加注释

8.2 命名规范

节点命名示例:
  # 好例子
  - 用户输入解析
  - 产品知识库检索
  - GPT4_回答生成
  - 错误处理_备用回答
  
  # 坏例子
  - 节点1
  - 处理
  - llm
  
变量命名:
  - user_query: 用户输入
  - kb_results: 知识库结果
  - llm_response: LLM响应
  - final_answer: 最终答案

8.3 版本控制

# 导出工作流配置
curl -X GET "http://localhost:5001/v1/workflows/{workflow_id}/export" \
  -H "Authorization: Bearer your-api-key" \
  -o workflow_config.yaml

# 导入工作流
curl -X POST "http://localhost:5001/v1/workflows/import" \
  -H "Authorization: Bearer your-api-key" \
  -F "file=@workflow_config.yaml"

九、常见问题解决

9.1 节点执行失败

排查步骤:
  1. 检查节点连接是否正确
  2. 查看节点配置参数
  3. 检查变量引用是否正确
  4. 查看执行日志
  5. 测试单个节点功能

9.2 变量传递问题

# 调试变量传递
# 在 Code 节点中添加:
debug_info = {
    "当前变量": locals(),
    "输入变量": input_variables,
    "上一个节点输出": previous_node_output
}
print(json.dumps(debug_info, indent=2, ensure_ascii=False))

9.3 性能问题优化

优化策略:
  1. 减少不必要的节点
  2. 使用并行执行
  3. 设置合理的超时时间
  4. 缓存重复查询结果
  5. 分批处理大数据

总结

Dify Chatflow 提供了强大的可视化对话流程编排能力,通过:

  1. 节点化设计:将复杂逻辑分解为可管理的单元
  2. 可视化编排:直观地设计和调试流程
  3. 灵活集成:支持各种数据源和API
  4. 企业级功能:支持多租户、监控、版本控制

掌握 Chatflow 的使用,你可以构建从简单问答到复杂业务流程的各种 AI 应用。建议从简单的流程开始,逐步增加复杂度,并充分利用调试工具和测试功能确保质量。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐