使用 Qdrant 向量数据库,需要进行以下修改:

  1. 安装 Qdrant 客户端

    • 安装 Qdrant 的 Python 客户端库:
      pip install qdrant-client
      
  2. 修改代码

    • 替换 Weaviate 的相关代码为 Qdrant 的代码。
    • 确保插入数据时包含 doc_id 元数据。
    • 查询时返回内容及其对应的 doc_id

以下是修改后的代码:


修改后的代码

from langchain.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from langchain.embeddings.base import Embeddings
from langchain.chains import RetrievalQA
from langchain.llms import Ollama
from qdrant_client import QdrantClient
from qdrant_client.http import models
import os
import requests

# 使用 Ollama 的 nomic-embed-text 生成嵌入
class NomicEmbedText(Embeddings):
    def __init__(self, base_url="http://127.0.0.1:11434"):
        self.base_url = base_url

    def embed_query(self, text):
        response = requests.post(
            f"{self.base_url}/api/embeddings",
            json={"model": "nomic-embed-text:v1.5", "prompt": text}
        )
        if response.status_code != 200:
            raise ValueError(f"嵌入生成失败: {response.text}")
        return response.json()["embedding"]

    def embed_documents(self, texts):
        embeddings = []
        for text in texts:
            embedding = self.embed_query(text)
            embeddings.append(embedding)
        return embeddings


# 初始化 Qdrant 客户端
def init_qdrant_client(url="http://localhost:6333"):
    client = QdrantClient(url)
    return client


# 创建 Qdrant 向量存储
def create_qdrant_vector_store(chunks, collection_name):
    embeddings = NomicEmbedText()
    texts = [chunk.page_content for chunk in chunks]
    metadatas = [{"doc_id": chunk.metadata.get("doc_id", "未知")} for chunk in chunks]  # 确保元数据包含 doc_id

    # 初始化 Qdrant 客户端
    client = init_qdrant_client()

    # 创建集合(如果不存在)
    try:
        client.get_collection(collection_name)
    except:
        client.create_collection(
            collection_name=collection_name,
            vectors_config=models.VectorParams(size=512, distance=models.Distance.COSINE)
        )

    # 插入数据到 Qdrant
    points = []
    for idx, (text, metadata) in enumerate(zip(texts, metadatas)):
        points.append(
            models.PointStruct(
                id=idx + 1,  # 点 ID
                vector=embeddings.embed_query(text),  # 向量
                payload={"text": text, "metadata": metadata}  # 元数据和文本
            )
        )
    client.upsert(collection_name=collection_name, points=points)

    # 创建 Qdrant 向量存储对象
    vector_store = Qdrant(
        client=client,
        collection_name=collection_name,
        embeddings=embeddings
    )
    return vector_store


# 设置 RAG 管道
def setup_rag_pipeline(vector_store):
    llm = Ollama(model="deepseek-r1:1.5b", base_url="http://127.0.0.1:11434")
    qa_pipeline = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vector_store.as_retriever(search_kwargs={"k": 3})
    )
    return qa_pipeline


# 查询 RAG 管道
def query_rag_pipeline(qa_pipeline, question):
    # 获取检索结果
    result = qa_pipeline({"query": question})
    
    # 提取检索到的文档及其元数据
    retrieved_docs = qa_pipeline.retriever.get_relevant_documents(question)
    
    # 构建返回结果
    response = {
        "answer": result["result"],  # 模型生成的答案
        "sources": []  # 包含 doc_id 和内容的来源文档
    }
    
    for doc in retrieved_docs:
        response["sources"].append({
            "doc_id": doc.metadata.get("doc_id", "未知"),  # 提取 doc_id
            "content": doc.page_content  # 提取内容
        })
    
    return response


# 加载并分割文档
def load_and_split_documents(file_path):
    # 检查文件路径是否有效
    if not file_path:
        raise ValueError("文件路径不能为空")
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"文件不存在: {file_path}")

    print(f"加载文件: {file_path}")
    if file_path.endswith('.pdf'):
        print("使用 PyPDFLoader")
        loader = PyPDFLoader(file_path)
    elif file_path.endswith('.docx'):
        print("使用 Docx2txtLoader")
        loader = Docx2txtLoader(file_path)
    elif file_path.endswith('.txt'):
        print("使用 TextLoader")
        loader = TextLoader(file_path)
    else:
        raise ValueError("Unsupported file format")

    documents = loader.load()
    print(f"加载了 {len(documents)} 个文档块")

    # 文档分块
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,  # 每块 500 字符
        chunk_overlap=50  # 块之间重叠 50 字符
    )
    chunks = text_splitter.split_documents(documents)
    print(f"分块后得到 {len(chunks)} 个块")
    return chunks


# 主函数
def main():
    file_path = r"C:\Users\leon\Desktop\断点续传\python_learn\2025\20250322\rag\pdf\company_law.pdf"
    chunks = load_and_split_documents(file_path)

    # 1. 创建或加载 Qdrant 向量存储
    collection_name = "CompanyLaw"
    vector_store = create_qdrant_vector_store(chunks, collection_name)

    # 2. 设置 RAG 管道
    qa_pipeline = setup_rag_pipeline(vector_store)

    # 3. 查询
    question = "关于公司法律条款的解释"
    response = query_rag_pipeline(qa_pipeline, question)
    
    # 打印结果
    print("回答:", response["answer"])
    print("来源文档:")
    for source in response["sources"]:
        print(f"- doc_id: {source['doc_id']}, 内容: {source['content']}")


if __name__ == "__main__":
    main()

代码说明

  1. Qdrant 客户端初始化

    • 使用 QdrantClient 连接到 Qdrant 服务。
    • 如果集合不存在,则创建集合。
  2. 插入数据

    • 将文档的嵌入向量和元数据(包括 doc_id)插入到 Qdrant 集合中。
  3. 查询数据

    • 使用 RetrievalQA 进行语义搜索。
    • 返回模型生成的答案以及来源文档的 doc_id 和内容。
  4. 权限控制

    • 通过元数据中的 doc_id 实现权限控制。

运行步骤

  1. 启动 Qdrant 服务:

    docker run -d -p 6333:6333 --name qdrant qdrant/qdrant
    
  2. 运行 Python 脚本:

    python rag_with_qdrant.py
    

总结

  • 代码已从 Weaviate 迁移到 Qdrant,支持高效的向量检索和元数据过滤。
  • 插入数据时包含 doc_id,查询时返回内容及其对应的 doc_id
  • 代码结构清晰,易于扩展和维护。
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐