W4 · 知识点12:流式输出与实时响应

学习目标:为设备维修系统实现流式输出,让维修人员实时看到AI的诊断分析过程。


一、为什么设备维修系统需要流式输出

非流式:维修人员提交故障描述 → 等待5秒 → 一大段诊断结果突然出现。
流式:诊断结果逐字显示,维修人员可以边看边思考,感知等待时间大幅降低。

在设备维修场景中,维修人员通常在车间用手机或平板操作,流式输出让他们更快开始阅读诊断建议。

二、基础流式调用

from openai import OpenAI

client = OpenAI()

response = client.chat.completions.create(
    model="deepseek-chat",
    messages=[
        {"role": "system", "content": "你是设备维修诊断助手,回答简洁专业。"},
        {"role": "user", "content": "空压机排气温度过高,冷却水正常,无异常声响,运行2000小时未保养。请分析原因和维修方案。"}
    ],
    stream=True,   # 开启流式输出
)

# 逐块接收,实时打印
print("诊断分析: ", end="", flush=True)
for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
print()

三、完整的流式诊断对话程序

from openai import OpenAI

client = OpenAI()

class StreamDiagnosisBot:
    """流式输出的设备故障诊断助手"""

    def __init__(self):
        self.history = [{
            "role": "system",
            "content": """你是设备维修养护系统的AI诊断助手。
通过对话帮助维修人员定位设备故障并给出维修建议。
回答简洁专业,每次回复不超过200字。必须包含安全提醒。"""
        }]
        self.total_tokens = 0

    def stream_diagnose(self, message):
        self.history.append({"role": "user", "content": message})

        stream = client.chat.completions.create(
            model="deepseek-chat",
            messages=self.history,
            temperature=0.3,
            stream=True,
            stream_options={"include_usage": True},
        )

        full_reply = []
        print("\n诊断助手: ", end="", flush=True)

        for chunk in stream:
            # 内容块
            if chunk.choices and chunk.choices[0].delta.content:
                text = chunk.choices[0].delta.content
                full_reply.append(text)
                print(text, end="", flush=True)
            # 用量信息(最后一个块)
            if chunk.usage:
                self.total_tokens += chunk.usage.total_tokens

        reply = "".join(full_reply)
        self.history.append({"role": "assistant", "content": reply})
        print(f"\n  [tokens: {self.total_tokens}]")
        return reply


def main():
    bot = StreamDiagnosisBot()
    print("=" * 50)
    print("  设备维修诊断助手(流式版)")
    print("  输入故障描述开始诊断 | /quit 退出")
    print("=" * 50)
    print()

    while True:
        try:
            user_input = input("故障描述> ").strip()
        except (KeyboardInterrupt, EOFError):
            break

        if user_input.lower() in ('/quit', '/q'):
            break
        if not user_input:
            continue

        bot.stream_diagnose(user_input)
        print()

    print(f"\n总Token消耗: {bot.total_tokens}")
    print("诊断结束,注意安全操作!")

if __name__ == "__main__":
    main()

四、流式输出 + JSON解析(高级技巧)

在设备维修系统中,你可能想同时实现:流式显示诊断过程 + 最终输出结构化JSON。

import json

def stream_then_parse(fault_description):
    """先流式显示诊断思考过程,最后输出结构化JSON"""

    stream = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {"role": "system", "content": """你是设备维修诊断助手。
请按以下格式回答:

思考过程:
(你的分析推理,会实时显示给用户)

---JSON---
(诊断完成后,输出结构化数据)
{
    "root_cause": "根本原因",
    "confidence": "高|中|低",
    "repair_steps": ["步骤1", "步骤2"],
    "spare_parts": ["备件1"],
    "estimated_hours": 0,
    "safety_warnings": ["安全警告"]
}"""},
            {"role": "user", "content": fault_description}
        ],
        stream=True,
        temperature=0.3,
    )

    full_text = []
    in_json = False
    print("诊断分析: ", end="", flush=True)

    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            text = chunk.choices[0].delta.content
            full_text.append(text)
            # 在JSON标记之前的内容实时显示
            if not in_json:
                if "---JSON---" in text:
                    in_json = True
                    print("\n[生成诊断数据...]", flush=True)
                else:
                    print(text, end="", flush=True)

    # 提取JSON部分
    content = "".join(full_text)
    if "---JSON---" in content:
        json_str = content.split("---JSON---")[-1].strip()
        try:
            result = json.loads(json_str)
            print(f"\n\n诊断结论: {result['root_cause']}")
            print(f"维修步骤: {result['repair_steps']}")
            return result
        except json.JSONDecodeError:
            print("\n[JSON解析失败,返回原始文本]")

    return content

# 使用
result = stream_then_parse("液压系统压力无法建立,油泵运转正常,油箱油位正常")

五、动手练习

练习1:对比体验

分别用非流式和流式调用,记录:

  • 首字延迟(第一个字出现的时间)
  • 用户感知的等待体验

练习2:升级之前的诊断助手

把知识点10的MaintenanceDiagnosisBot升级为流式输出版本。

练习3:实现"中断诊断"功能

# 让用户在AI生成过程中按 Ctrl+C 中断
# 已生成的部分保留到对话历史中
# 提示用户可以选择继续追问或开始新的诊断

六、本知识点检验标准

  • 实现设备维修诊断的流式输出
  • 理解流式模式下Token统计的处理方式
  • 能实现"流式显示思考过程 + 结构化输出"的组合模式
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐