Day 8 我们实现了文档的批量入库。但在实测中发现,如果用户提问不够精准,RAG 经常检索不到正确内容。今天是 Day 9,我们将对检索链路进行重大升级。我们将引入 LangChain 的 MultiQueryRetriever,利用 LLM 的推理能力,将用户的一个模糊问题自动“裂变”为多个不同角度的精准提问,并行检索,极大提升 RAG 的命中率(Recall)。

一、 项目进度:Day 9 启动

根据项目路线图,今天是 Phase 3 的关键优化日。
我们要让检索系统变得更“聪明”。


二、 核心原理:多重查询检索 (Multi-Query) 的运作机制

在 Day 9,我们不对代码语法做过多纠缠,而是专注于解决 RAG 系统中最大的痛点:“用户怎么问,决定了能不能搜到”

1. 核心痛点:语义鸿沟与单点检索

在基础 RAG 中,检索是“一锤子买卖”。

  • 场景:数据库里存的是标准术语“螺蛳粉”。

  • 问题:用户问的是口语化的“臭面条”。

  • 结果:Embedding 模型将“臭面条”映射到的向量坐标,距离“螺蛳粉”太远,导致检索失败(Miss)。

这就是 语义鸿沟 (Semantic Gap)。用户的表达方式千变万化,我们不能指望用户每次都能精准命中关键词。

2. 解决方案:多重查询 (Multi-Query) 原理

Multi-Query Retriever 的核心思想是 “用算力换准确率”。它的工作原理类似于人类的“联想能力”:

  1. 语义扩张 (Expansion):利用 LLM 强大的推理能力,将用户的一个问题,发散成多个不同角度的“替身问题”。

    • 原始:“臭面条”

    • 发散:“螺蛳粉”、“臭豆腐面”、“重口味面食”。

  2. 空间覆盖 (Coverage):在向量空间中,一个问题是一个点。生成 3 个问题,就是在向量空间里打了 3 个点。这极大地扩大了搜索范围,增加了命中目标文档的概率。

  3. 结果融合 (Fusion):把这 3 次搜索捞回来的文档倒在一起,去掉重复的,形成最完整的资料包。

3. LangChain 是如何支持的?

LangChain 提供了一个强大的组件 MultiQueryRetriever,它封装了上述复杂的流程。

  • 自动化 Prompt:你不需要自己写“请帮我改写问题...”的 Prompt,LangChain 内置了专门用于 Query Generation 的提示词模板。

  • 自动化并发:它会自动并发执行多次向量检索,你不需要自己写多线程代码。

  • 自动化去重:它会自动处理多个结果集之间的并集(Union),去除重复文档。

下面这张图展示了Langchain在幕后做的工作:

如果不使用 LangChain,要实现同样的功能,你需要手动写几十行代码来处理多线程并发、列表去重和 Prompt 拼接。而 LangChain 将这 三个繁重的步骤 封装成了一个黑盒:

1. 🧠 自动裂变 (Query Expansion)
  • 幕后工作:LangChain 收到你的问题后,没有直接去查库,而是先偷梁换柱。它调用 LLM,利用内置的 Prompt 模板,强迫 LLM 把你的问题“翻译”成 3 个不同的版本。

  • 意义:把“听不懂的人话”变成了“机器能懂的搜索词”。

2. 🚀 自动并发 (Parallel Execution)
  • 幕后工作:图中中间部分展示了 1 变 3 的过程。LangChain 自动开启了线程池,同时向 ChromaDB 发起了 3 次独立的向量检索。

  • 意义:这就像影分身术。原本只找一次,现在同时在三个方向找,召回率 (Recall) 直接翻倍,且并没有显著增加等待时间(因为是并行的)。

3. ⚗️ 自动融合 (Fusion & Deduplication)
  • 幕后工作:3 次检索可能会带回重复的内容(比如 Q1 和 Q2 都找到了文档 A)。LangChain 会自动对比文档 ID 或内容,执行 Set Union (集合并集) 操作,剔除重复项。

  • 意义:保证喂给大模型的资料是纯净、不冗余的,节省了宝贵的 Context Window 空间。


三、 实战:代码实现

1.代码序列图

2. 升级知识库模块 (src/core/knowledge.py)

我们需要修改 KnowledgeBase 类,引入 LangChain 的 MultiQueryRetriever。
注意:为了让 Retriever 能改写问题,我们需要把 LLM 实例传给它。

修改文件:src/core/knowledge.py

import os
from typing import List

# 引入核心组件
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.runnables import RunnableLambda
from src.core.embedding import EmbeddingModel
from src.utils.logger import logger

PERSIST_DIR = "./data/chroma_db"

# --- 1. 定义自定义解析器 ---
# 作用:把 LLM 返回的字符串(一行一个问题)变成 Python List
class LineListOutputParser(BaseOutputParser[List[str]]):
    def parse(self, text: str) -> List[str]:
        lines = text.strip().split("\n")
        # 过滤掉空行,并去掉可能的序号 (如 "1. xxx")
        cleaned_lines = []
        for line in lines:
            line = line.strip()
            if line:
                # 简单去掉开头的 "1. " 或 "- "
                if line[0].isdigit() or line.startswith("-"):
                    line = line.split(" ", 1)[-1]
                cleaned_lines.append(line)
        return cleaned_lines

# --- 2. 定义去重函数 ---
# 作用:把 [[Doc1, Doc2], [Doc2, Doc3]] 展平并去重为 [Doc1, Doc2, Doc3]
def unique_documents(list_of_lists: List[List[Document]]) -> List[Document]:
    flattened = [doc for sublist in list_of_lists for doc in sublist]
    unique_docs = {}
    for doc in flattened:
        # 以内容为 Key 进行去重
        unique_docs[doc.page_content] = doc
    return list(unique_docs.values())

class KnowledgeBase:
    def __init__(self):
        self.embedding_model = EmbeddingModel().get_model()
        self.vector_store = Chroma(
            collection_name="echo_knowledge",
            embedding_function=self.embedding_model,
            persist_directory=PERSIST_DIR
        )

    def add_documents(self, documents: list):
        """
        直接存入 LangChain 的 Document 对象列表
        (Day 8 新增)
        """
        if not documents:
            return
        
        logger.info(f"📥 正在向量化并存入 {len(documents)} 个知识片段...")
        # Chroma 的 add_documents 会自动处理向量化
        self.vector_store.add_documents(documents)
        logger.info("✅ 入库完成!")

    def get_multiquery_retriever(self, llm):
        """
        【纯 LCEL 实现】手搓多重查询检索器
        """
        logger.info("🚀 构建纯 LCEL 多重检索链...")

        # Step A: 定义“问题裂变”的 Prompt
        # ❌ 原来的写法: ... 原始问题: {question}
        # ✅ 修正后的写法: ... 原始问题: {input}
        QUERY_PROMPT = ChatPromptTemplate.from_template(
            """你是一个AI语言模型助手。你的任务是为用户的问题生成 3 个不同版本的搜索查询,
            以便从向量数据库中检索相关文档。通过从多个角度生成用户问题,以此帮助用户克服基于距离的相似性搜索的一些局限性。
            请直接每一行输出一个备选问题,不要包含序号、解释或其他无关文字。

            原始问题: {input}"""
        )

        # Step B: 组装生成链
        generate_queries = (
            QUERY_PROMPT 
            | llm 
            | LineListOutputParser()
        )

        # Step C: 组装检索链
        retriever_chain = (
            generate_queries
            | self.vector_store.as_retriever(search_kwargs={"k": 2}).map() 
            | RunnableLambda(unique_documents)
        )
        
        return retriever_chain

3. 升级主程序 (main.py)

我们需要彻底重构 main.py 的 RAG 部分,使用 LCEL (LangChain Expression Language) 将 Retriever 融入链条。

修改文件:main.py

# ==============================================================================
# Project Echo Day 9: Multi-Query RAG Integration
# 集成特性: Redis记忆 + 情绪识别 + Multi-Query知识库检索
# ==============================================================================

# LangChain LCEL 核心组件
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables import RunnablePassthrough
from operator import itemgetter  # 【修复】用于从字典中提取字段

# 项目核心模块
from src.core.llm import LLMClient
from src.core.prompts import PROMPTS
from src.utils.logger import logger
from src.config.settings import settings
from src.core.emotion import EmotionEngine   # Day 5: 情绪
from src.core.knowledge import KnowledgeBase # Day 7-9: 知识库

# --- Redis 历史记录工厂 (Day 6) ---
def get_session_history(session_id: str) -> BaseChatMessageHistory:
    return RedisChatMessageHistory(
        session_id=session_id,
        url=settings.REDIS_URL,
        ttl=3600 * 24 * 7 # 记忆保留 7 天
    )

# --- 辅助函数:格式化文档 ---
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

def main():
    logger.info("🚀 --- Project Echo: Day 9 集成版启动 ---")
    
    # ==========================================
    # 1. 初始化组件
    # ==========================================
    # 1.1 大模型 (Brain)
    client = LLMClient()
    llm = client.get_client()
    
    # 1.2 情绪引擎 (Heart)
    emotion_engine = EmotionEngine()
    
    # 1.3 知识库 (Book)
    kb = KnowledgeBase()
    
    # 【Day 9 核心】获取多重查询检索器
    # 这里我们将 LLM 传进去,让检索器具备"思考"能力
    retriever = kb.get_multiquery_retriever(llm)

    # ==========================================
    # 2. 构建 Prompt 模板
    # ==========================================
    sys_prompt_base = PROMPTS["tsundere"]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", sys_prompt_base),                   # 1. 基础人设 (傲娇)
        ("system", "{emotion_context}"),               # 2. 情绪指令 (动态注入)
        ("system", "【参考资料(必须基于此回答)】:\n{context}"), # 3. 知识库资料 (RAG)
        MessagesPlaceholder(variable_name="history"),  # 4. 历史记忆 (Redis)
        ("human", "{input}")                           # 5. 用户输入
    ])

    # ==========================================
    # 3. 组装 LCEL 流水线
    # ==========================================
    rag_chain = (
        {
            # 分支 A: 智能检索 (用户输入 -> 裂变3个问题 -> 并行检索 -> 汇总 -> 格式化)
            "context": itemgetter("input") | retriever | format_docs,  # 【修复】使用 itemgetter 提取 input
                
            # 分支 B: 透传参数 (直接传递给 Prompt)
            "input": itemgetter("input"),              # 【修复】提取 input 字段
            "emotion_context": itemgetter("emotion_context"),  # 【修复】提取 emotion_context 字段
            "history": itemgetter("history")           # 【修复】提取 history 字段
        }
        | prompt  # 填入模板
        | llm     # 大模型推理
    )

    # ==========================================
    # 4. 挂载持久化记忆
    # ==========================================
    final_chain = RunnableWithMessageHistory(
        rag_chain,
        get_session_history,
        input_messages_key="input",
        history_messages_key="history",
    )

    print("\n✨ 系统就绪!试试问得模糊一点,比如“那个写代码的人爱吃啥?”\n")
    session_id = "user_day9_demo"

    # ==========================================
    # 5. 对话循环
    # ==========================================
    while True:
        user_input = input("You: ")
        if user_input.lower() in ["quit", "exit"]:
            break
            
        if user_input.strip():
            # --- Phase A: 情绪侦探 ---
            current_emotion = emotion_engine.analyze(user_input)
            
            emotion_instruction = "用户情绪平稳。"
            if "[愤怒]" in current_emotion:
                emotion_instruction = "⚠️ 警告:用户很生气!请示弱道歉。"
            elif "[悲伤]" in current_emotion:
                emotion_instruction = "⚠️ 提示:用户很难过。请温柔安慰。"

            try:
                # --- Phase B: 执行 RAG 主链 ---
                # 这一步会自动触发 Multi-Query 检索
                logger.info("🔍 正在进行多重检索与思考...")
                
                response = final_chain.invoke(
                    {
                        "input": user_input, 
                        "emotion_context": emotion_instruction
                    },
                    config={"configurable": {"session_id": session_id}}
                )
                
                print(f"Bot ({current_emotion}): {response.content}\n")
                
            except Exception as e:
                logger.error(f"❌ 调用失败: {e}")

if __name__ == "__main__":
    main()

四、 验证效果 (对比测试)

为了验证优化效果,我们进行一次对比测试。
(前提:你已经在 Day 8 运行了 ingest.py,数据库里有“阿强爱吃螺蛳粉”的数据)

用户提问:“那个写代码的人,平时喜欢吃那种很臭的面条吗?”

  • Day 7 (普通检索)

    • 检索词:“那个写代码的人,平时喜欢吃那种很臭的面条吗?”

    • 结果:可能搜不到。因为“臭面条”和“螺蛳粉”字面差距大,且 Embedding 模型可能不够强。

    • AI 回答:我不知道你在说什么。

  • Day 9 (Multi-Query)

    • 后台日志:LangChain 自动生成了 3 个问题:

      1. “阿强喜欢吃什么?”

      2. “关于阿强饮食偏好的资料。”

      3. “阿强是否喜欢吃螺蛳粉?”

    • 结果:第 3 个问题精准命中了数据库里的“螺蛳粉”记录。

    • AI 回答:哼,你是说阿强吧?那个笨蛋确实最爱吃螺蛳粉!虽然味道很臭,但他就是喜欢,真是没办法!


五、 总结与预告

今天我们通过引入 Multi-Query Retriever,解决了“用户提问不准”的难题。
现在的 AI 能够揣摩你的意图,把它翻译成机器能听懂的查询语言,从而大幅提高了知识库的命中率。

明日预告 (Day 10)
目前的 RAG 还有一个缺陷:如果检索到的资料太多(比如搜到了 10 页文档),全部塞给大模型会很乱。
明天 Day 10,我们将引入 Re-ranking (重排序) 技术。就像搜索引擎一样,把最相关的结果排在第一位,剔除干扰项,让 RAG 的准确率达到工业级水平。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐