Dify Chatflow 使用指南
Dify Chatflow 使用指南摘要 Dify Chatflow 是一个可视化对话流程构建工具,通过节点连接方式设计复杂对话应用。主要包含节点、连接线、变量和输入/输出组件。用户可快速创建基础聊天流程,并通过多种节点类型(如LLM调用、知识库检索、条件分支等)实现高级功能。系统支持变量系统和条件判断,可集成知识库检索实现智能问答。实战案例展示了如何构建智能客服机器人和数据分析助手,通过流程编排
·
目录
Dify Chatflow 使用指南
一、Chatflow 基础概念
1.1 什么是 Chatflow?
Chatflow 是 Dify 的核心功能之一,用于可视化构建复杂的对话流程。它通过节点连接的方式,让开发者可以设计多步骤、带逻辑判断的对话应用。
1.2 主要组件
| 组件 | 说明 |
|---|---|
| 节点 | 执行特定功能的单元(LLM调用、知识库检索等) |
| 连接线 | 定义数据流向和流程逻辑 |
| 变量 | 存储和传递数据的容器 |
| 输入/输出 | 流程的开始和结束点 |
二、快速开始:创建第一个 Chatflow
2.1 基础聊天流程
# 最简单的 Chatflow 示例
流程:
开始 → LLM节点 → 结束
操作步骤:
- 登录 Dify → 创建应用 → 选择"对话型应用"
- 切换到工作流模式:点击右上角"工作流"按钮
- 添加节点:从左侧拖拽节点到画布
- 配置节点:双击节点设置参数
- 连接节点:从一个节点的输出拖到下一个节点的输入
- 测试发布:点击测试按钮,然后发布
2.2 节点详解
常用节点类型
基础节点:
- Start: 流程起始点,定义输入变量
- LLM: 调用大语言模型
- Knowledge Retrieval: 知识库检索
- Question Classifier: 问题分类器
- If/Else: 条件分支
- End: 流程结束点
高级节点:
- Code: 执行Python代码
- HTTP Request: 调用外部API
- Template Transform: 模板转换
- Variable Assigner: 变量赋值
- Loop: 循环执行
- Join: 合并分支
三、核心功能详解
3.1 变量系统
# 变量类型和使用
variables:
# 系统变量(自动创建)
- name: query # 用户输入
- name: conversation_id # 对话ID
# 自定义变量
- name: user_name # 字符串
- name: user_age # 数字
- name: search_results # 数组
- name: user_info # 对象
在节点中使用变量:
# 在LLM节点提示词中使用
你好,{{user_name}}!你今天{{query}},有什么我可以帮助的?
# 在知识库检索中使用
{{query}} 的相关信息是什么?
# 在HTTP请求中使用
{
"question": "{{query}}",
"user_id": "{{user_id}}"
}
3.2 条件分支 (If/Else)
# 条件判断示例
流程:
开始 → Question Classifier → If/Else
↓ ↓
[简单问题] [复杂问题]
↓ ↓
LLM直接回答 知识库检索 → LLM处理
配置条件表达式:
# 基于问题分类结果
{{classification}} == "greeting"
# 基于知识库检索结果
{{knowledge_retrieval.output}} != ""
# 组合条件
{{query.length}} > 10 and "error" not in {{code_execution.result}}
3.3 知识库检索集成
# 知识库检索流程
节点配置:
知识库检索:
- 选择知识库: "产品手册"
- 查询文本: "{{query}}"
- 返回条数: 3
- 相似度阈值: 0.7
- 启用重排: true
输出结构:
- documents: 检索到的文档列表
- context: 合并的上下文文本
高级检索技巧:
# 多知识库检索
步骤:
1. HTTP Request → 调用自定义检索API
2. Template Transform → 格式化结果
3. Join → 合并多个检索结果
# 混合检索
- 向量检索: 语义相似度
- 关键词检索: BM25/Elasticsearch
- 元数据过滤: 按标签、时间过滤
四、实战案例
4.1 案例1:智能客服机器人
name: 智能客服流程
description: 处理用户咨询,自动分类并调用相应知识库
flow:
# 开始节点
- id: start
type: Start
outputs:
- name: question
type: string
description: 用户问题
# 问题分类
- id: classifier
type: QuestionClassifier
config:
categories:
- name: product_query
description: 产品咨询
- name: technical_support
description: 技术支持
- name: billing
description: 账单问题
- name: other
description: 其他问题
default_category: other
# 条件分支
- id: router
type: IfElse
conditions:
- expression: "{{classifier.output}} == 'product_query'"
target: product_kb
- expression: "{{classifier.output}} == 'technical_support'"
target: tech_kb
- expression: "{{classifier.output}} == 'billing'"
target: billing_kb
- expression: true # 默认
target: fallback_llm
# 知识库检索分支
- id: product_kb
type: KnowledgeRetrieval
config:
knowledge_base_id: "product_manual"
query: "{{question}}"
top_k: 5
- id: tech_kb
type: KnowledgeRetrieval
config:
knowledge_base_id: "technical_docs"
query: "{{question}}"
top_k: 3
- id: billing_kb
type: HTTPRequest
config:
url: "https://api.company.com/billing/query"
method: POST
headers:
Authorization: "Bearer {{api_token}}"
body:
question: "{{question}}"
user_id: "{{user_id}}"
# LLM处理
- id: llm_process
type: LLM
config:
model: gpt-4
prompt: |
你是专业客服助手,基于以下信息回答用户问题:
用户问题:{{question}}
相关信息:
{{context}}
回答要求:
1. 如果信息充分,直接回答问题
2. 如果信息不足,说明无法回答并提供帮助渠道
3. 保持专业友好的语气
回答:
variables:
context: |
{{#if product_kb.output}}
产品信息:{{product_kb.output}}
{{/if}}
{{#if tech_kb.output}}
技术文档:{{tech_kb.output}}
{{/if}}
{{#if billing_kb.output}}
账单信息:{{billing_kb.output}}
{{/if}}
# 备用回答
- id: fallback_llm
type: LLM
config:
model: gpt-3.5-turbo
prompt: |
用户问:{{question}}
这个问题超出了我的知识范围,请帮我生成一个礼貌的回复,
建议用户联系人工客服或访问帮助中心。
回复:
# 结束节点
- id: end
type: End
outputs:
- name: answer
value: "{{#if llm_process.output}}{{llm_process.output}}{{else}}{{fallback_llm.output}}{{/if}}"
4.2 案例2:数据分析助手
name: 数据分析流程
description: 分析用户问题,执行代码,返回分析结果
flow:
# 开始
- id: start
type: Start
outputs:
- name: user_query
type: string
# 解析用户意图
- id: intent_parser
type: LLM
config:
model: gpt-4
prompt: |
分析用户的数据分析需求:
用户查询:{{user_query}}
请输出JSON格式:
{
"data_source": "sales|users|products|custom",
"analysis_type": "summary|trend|comparison|prediction",
"metrics": ["metric1", "metric2"],
"time_range": "last_week|last_month|last_year|custom",
"filters": {"key": "value"}
}
# 数据获取
- id: data_fetcher
type: IfElse
conditions:
- expression: "{{intent_parser.output.data_source}} == 'sales'"
target: fetch_sales_data
- expression: "{{intent_parser.output.data_source}} == 'users'"
target: fetch_user_data
- expression: true
target: fetch_custom_data
- id: fetch_sales_data
type: HTTPRequest
config:
url: "{{sales_api}}/query"
method: POST
body:
metrics: "{{intent_parser.output.metrics}}"
time_range: "{{intent_parser.output.time_range}}"
filters: "{{intent_parser.output.filters}}"
# 数据分析
- id: data_analyzer
type: Code
config:
code: |
import pandas as pd
import json
# 获取数据
data = {{data_fetcher.output}}
df = pd.DataFrame(data)
# 根据分析类型处理
analysis_type = "{{intent_parser.output.analysis_type}}"
if analysis_type == "summary":
result = {
"row_count": len(df),
"columns": list(df.columns),
"summary_stats": df.describe().to_dict()
}
elif analysis_type == "trend":
# 时间序列分析
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
result = df.groupby(df['date'].dt.to_period('M')).sum().to_dict()
else:
result = {"error": "No date column for trend analysis"}
elif analysis_type == "comparison":
# 对比分析
if 'category' in df.columns:
result = df.groupby('category').mean().to_dict()
else:
result = {"error": "No category column for comparison"}
else:
result = {"error": f"Unsupported analysis type: {analysis_type}"}
return json.dumps(result, default=str)
# 生成报告
- id: report_generator
type: LLM
config:
model: gpt-4
prompt: |
根据数据分析结果生成中文报告:
用户问题:{{user_query}}
分析类型:{{intent_parser.output.analysis_type}}
数据源:{{intent_parser.output.data_source}}
分析结果(JSON格式):
{{data_analyzer.output}}
请生成结构化的分析报告,包括:
1. 主要发现
2. 关键指标
3. 趋势分析(如果有)
4. 建议
报告:
# 可视化(可选)
- id: visualization
type: Code
config:
code: |
import plotly.graph_objects as go
import json
data = json.loads('{{data_fetcher.output}}')
# 创建图表
if "{{intent_parser.output.analysis_type}}" == "trend":
fig = go.Figure(data=go.Scatter(
x=list(data.keys()),
y=list(data.values()),
mode='lines+markers'
))
else:
fig = go.Figure(data=go.Bar(
x=list(data.keys()),
y=list(data.values())
))
# 转换为HTML
chart_html = fig.to_html(include_plotlyjs='cdn')
return chart_html
# 结束
- id: end
type: End
outputs:
- name: analysis_report
value: "{{report_generator.output}}"
- name: visualization_html
value: "{{visualization.output}}"
4.3 案例3:多轮对话系统
name: 多轮对话管理
description: 处理带上下文的连续对话
flow:
# 开始(带历史上下文)
- id: start
type: Start
outputs:
- name: current_query
type: string
- name: conversation_history
type: array
default: []
# 对话状态管理
- id: state_manager
type: Code
config:
code: |
import json
history = {{conversation_history}}
current_query = "{{current_query}}"
# 分析对话状态
if not history:
state = "new_conversation"
context_summary = ""
else:
# 从历史中提取关键信息
last_3_turns = history[-6:] # 最近3轮对话
context_summary = " ".join([
f"用户:{turn['user']} 助手:{turn['assistant']}"
for turn in last_3_turns
])
# 判断是否在同一个话题
topics = ["价格", "功能", "技术支持", "购买"]
current_topic = next((t for t in topics if t in current_query), "其他")
state = f"continuing_{current_topic}_topic"
return json.dumps({
"state": state,
"context_summary": context_summary,
"turn_count": len(history) // 2
})
# 查询改写(考虑上下文)
- id: query_rewriter
type: LLM
config:
model: gpt-3.5-turbo
prompt: |
根据对话历史改写当前查询,使其更完整:
对话历史:
{{state_manager.output.context_summary}}
当前查询:{{current_query}}
改写后的查询:
# 知识库检索
- id: knowledge_retrieval
type: KnowledgeRetrieval
config:
query: "{{query_rewriter.output}}"
knowledge_base_id: "faq_kb"
top_k: 5
# 生成回答(带上下文感知)
- id: response_generator
type: LLM
config:
model: gpt-4
prompt: |
基于以下信息生成回答:
当前问题:{{current_query}}
上下文摘要:{{state_manager.output.context_summary}}
对话轮次:{{state_manager.output.turn_count}}
相关知识点:
{{knowledge_retrieval.output}}
生成要求:
1. 如果这是新对话(轮次=0),要热情欢迎
2. 如果对话超过5轮,可以主动询问是否需要其他帮助
3. 回答要连贯,保持对话的连续性
4. 必要时可以反问以澄清需求
回答:
# 更新对话历史
- id: history_updater
type: Code
config:
code: |
import json
history = {{conversation_history}}
# 添加当前轮次
history.append({
"user": "{{current_query}}",
"assistant": "{{response_generator.output}}",
"timestamp": "{{now}}"
})
# 限制历史长度(保留最近20轮)
if len(history) > 20:
history = history[-20:]
return json.dumps(history)
# 结束
- id: end
type: End
outputs:
- name: response
value: "{{response_generator.output}}"
- name: updated_history
value: "{{history_updater.output}}"
五、高级技巧
5.1 使用模板引擎
{# Handlebars 模板语法 #}
{# 条件判断 #}
{{#if variable}}
变量存在时的内容
{{else}}
变量不存在时的内容
{{/if}}
{# 循环 #}
{{#each array}}
索引:{{@index}},值:{{this}}
{{/each}}
{# 逻辑运算 #}
{{#and condition1 condition2}}
两者都为真
{{/and}}
{{#or condition1 condition2}}
任一为真
{{/or}}
{# 比较 #}
{{#eq var1 var2}}相等{{/eq}}
{{#gt var1 var2}}大于{{/gt}}
{{#lt var1 var2}}小于{{/lt}}
5.2 错误处理
# 错误处理流程示例
流程:
开始 → 尝试执行 → 成功? → 结束
↓
失败 → 错误处理 → 备用方案 → 结束
错误处理节点配置:
# Code 节点中的错误处理
try:
result = some_operation()
return {"success": True, "data": result}
except Exception as e:
return {
"success": False,
"error": str(e),
"fallback_data": get_fallback_data()
}
# 在后续节点中判断
{{#if previous_node.output.success}}
使用 {{previous_node.output.data}}
{{else}}
使用备用数据:{{previous_node.output.fallback_data}}
记录错误:{{previous_node.output.error}}
{{/if}}
5.3 性能优化
# 并行执行示例
流程:
开始 → 并行分支
├→ 分支1: 知识库检索
├→ 分支2: API调用1
├→ 分支3: API调用2
└→ 分支4: 本地计算
↓
合并结果 → LLM处理 → 结束
配置并行执行:
# 使用 Join 节点合并并行分支
join_config:
wait_for_all: true # 等待所有分支完成
timeout: 30000 # 超时时间(毫秒)
# 或者使用条件合并
conditions:
- expression: "{{branch1.output}} != null"
target: continue
- expression: "{{branch2.output}} != null"
target: continue
六、调试与测试
6.1 调试工具
# 在 Code 节点中添加调试输出
print(f"DEBUG: 当前变量 = {locals()}")
import logging
logging.info(f"数据: {data}")
# 使用 Dify 的调试面板
# 1. 点击流程中的"调试"按钮
# 2. 查看每个节点的输入输出
# 3. 查看执行时间线
# 4. 下载调试日志
6.2 测试用例
# 创建测试用例
test_cases:
- name: "简单问候"
inputs:
query: "你好"
expected_output_contains: ["你好", "欢迎"]
- name: "复杂问题"
inputs:
query: "如何安装产品并配置数据库?"
expected_output_contains: ["安装", "配置", "步骤"]
max_response_time: 5000 # 最大响应时间(毫秒)
- name: "错误处理测试"
inputs:
query: "故意错误的查询"
expected_output_contains: ["抱歉", "无法", "帮助"]
七、API 调用 Chatflow
7.1 基本调用
import requests
import json
class DifyChatflowClient:
def __init__(self, api_key, base_url="http://localhost:5001"):
self.api_key = api_key
self.base_url = base_url
def run_workflow(self, workflow_id, inputs, stream=False):
"""运行工作流"""
url = f"{self.base_url}/v1/workflows/run"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"inputs": inputs,
"response_mode": "streaming" if stream else "blocking",
"user": "user_123"
}
response = requests.post(
url,
headers=headers,
json=payload,
stream=stream
)
if stream:
return self._handle_stream_response(response)
else:
return response.json()
def _handle_stream_response(self, response):
"""处理流式响应"""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # 去掉"data: "前缀
if data == '[DONE]':
break
try:
yield json.loads(data)
except:
yield {"data": data}
def get_workflow_status(self, task_id):
"""获取任务状态"""
url = f"{self.base_url}/v1/tasks/{task_id}"
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.get(url, headers=headers)
return response.json()
def batch_run(self, workflow_id, queries, max_concurrent=5):
"""批量运行"""
import asyncio
import aiohttp
async def run_one(session, query):
async with session.post(
f"{self.base_url}/v1/workflows/run",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"inputs": {"query": query}}
) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for query in queries:
task = run_one(session, query)
tasks.append(task)
# 控制并发数
if len(tasks) >= max_concurrent:
results = await asyncio.gather(*tasks)
for result in results:
yield result
tasks = []
# 处理剩余任务
if tasks:
results = await asyncio.gather(*tasks)
for result in results:
yield result
return asyncio.run(main())
# 使用示例
client = DifyChatflowClient(api_key="your-api-key")
# 同步调用
result = client.run_workflow(
workflow_id="workflow_123",
inputs={"query": "今天天气怎么样?"},
stream=False
)
print(result["data"])
# 流式调用
for chunk in client.run_workflow(
workflow_id="workflow_123",
inputs={"query": "今天天气怎么样?"},
stream=True
):
print(chunk.get("answer", ""), end="", flush=True)
7.2 Webhook 集成
# 配置 Chatflow 输出到 Webhook
webhook_config:
url: "https://your-server.com/webhook/callback"
method: "POST"
headers:
Authorization: "Bearer your-webhook-token"
retry_times: 3
timeout: 10000
# 接收 Webhook 的服务器
from flask import Flask, request
import json
app = Flask(__name__)
@app.route('/webhook/callback', methods=['POST'])
def webhook_callback():
data = request.json
# 处理 Dify 返回的数据
workflow_id = data.get('workflow_id')
task_id = data.get('task_id')
outputs = data.get('outputs', {})
# 业务逻辑处理
process_result(outputs)
return {"status": "success"}, 200
def process_result(outputs):
"""处理工作流输出"""
answer = outputs.get('answer', '')
context = outputs.get('context', '')
metadata = outputs.get('metadata', {})
# 保存到数据库
save_to_database({
'answer': answer,
'context': context,
'metadata': metadata
})
# 发送通知
send_notification(f"新回答:{answer[:100]}...")
八、最佳实践
8.1 设计原则
- 模块化设计:每个节点功能单一,便于复用
- 错误处理:每个可能失败的节点都要有错误处理分支
- 性能考虑:并行执行独立任务,设置超时时间
- 可维护性:使用有意义的节点名称,添加注释
8.2 命名规范
节点命名示例:
# 好例子
- 用户输入解析
- 产品知识库检索
- GPT4_回答生成
- 错误处理_备用回答
# 坏例子
- 节点1
- 处理
- llm
变量命名:
- user_query: 用户输入
- kb_results: 知识库结果
- llm_response: LLM响应
- final_answer: 最终答案
8.3 版本控制
# 导出工作流配置
curl -X GET "http://localhost:5001/v1/workflows/{workflow_id}/export" \
-H "Authorization: Bearer your-api-key" \
-o workflow_config.yaml
# 导入工作流
curl -X POST "http://localhost:5001/v1/workflows/import" \
-H "Authorization: Bearer your-api-key" \
-F "file=@workflow_config.yaml"
九、常见问题解决
9.1 节点执行失败
排查步骤:
1. 检查节点连接是否正确
2. 查看节点配置参数
3. 检查变量引用是否正确
4. 查看执行日志
5. 测试单个节点功能
9.2 变量传递问题
# 调试变量传递
# 在 Code 节点中添加:
debug_info = {
"当前变量": locals(),
"输入变量": input_variables,
"上一个节点输出": previous_node_output
}
print(json.dumps(debug_info, indent=2, ensure_ascii=False))
9.3 性能问题优化
优化策略:
1. 减少不必要的节点
2. 使用并行执行
3. 设置合理的超时时间
4. 缓存重复查询结果
5. 分批处理大数据
总结
Dify Chatflow 提供了强大的可视化对话流程编排能力,通过:
- 节点化设计:将复杂逻辑分解为可管理的单元
- 可视化编排:直观地设计和调试流程
- 灵活集成:支持各种数据源和API
- 企业级功能:支持多租户、监控、版本控制
掌握 Chatflow 的使用,你可以构建从简单问答到复杂业务流程的各种 AI 应用。建议从简单的流程开始,逐步增加复杂度,并充分利用调试工具和测试功能确保质量。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)