使用 Qdrant 向量数据库
·
使用 Qdrant 向量数据库,需要进行以下修改:
-
安装 Qdrant 客户端:
- 安装 Qdrant 的 Python 客户端库:
pip install qdrant-client
- 安装 Qdrant 的 Python 客户端库:
-
修改代码:
- 替换 Weaviate 的相关代码为 Qdrant 的代码。
- 确保插入数据时包含
doc_id元数据。 - 查询时返回内容及其对应的
doc_id。
以下是修改后的代码:
修改后的代码
from langchain.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Qdrant
from langchain.embeddings.base import Embeddings
from langchain.chains import RetrievalQA
from langchain.llms import Ollama
from qdrant_client import QdrantClient
from qdrant_client.http import models
import os
import requests
# 使用 Ollama 的 nomic-embed-text 生成嵌入
class NomicEmbedText(Embeddings):
def __init__(self, base_url="http://127.0.0.1:11434"):
self.base_url = base_url
def embed_query(self, text):
response = requests.post(
f"{self.base_url}/api/embeddings",
json={"model": "nomic-embed-text:v1.5", "prompt": text}
)
if response.status_code != 200:
raise ValueError(f"嵌入生成失败: {response.text}")
return response.json()["embedding"]
def embed_documents(self, texts):
embeddings = []
for text in texts:
embedding = self.embed_query(text)
embeddings.append(embedding)
return embeddings
# 初始化 Qdrant 客户端
def init_qdrant_client(url="http://localhost:6333"):
client = QdrantClient(url)
return client
# 创建 Qdrant 向量存储
def create_qdrant_vector_store(chunks, collection_name):
embeddings = NomicEmbedText()
texts = [chunk.page_content for chunk in chunks]
metadatas = [{"doc_id": chunk.metadata.get("doc_id", "未知")} for chunk in chunks] # 确保元数据包含 doc_id
# 初始化 Qdrant 客户端
client = init_qdrant_client()
# 创建集合(如果不存在)
try:
client.get_collection(collection_name)
except:
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(size=512, distance=models.Distance.COSINE)
)
# 插入数据到 Qdrant
points = []
for idx, (text, metadata) in enumerate(zip(texts, metadatas)):
points.append(
models.PointStruct(
id=idx + 1, # 点 ID
vector=embeddings.embed_query(text), # 向量
payload={"text": text, "metadata": metadata} # 元数据和文本
)
)
client.upsert(collection_name=collection_name, points=points)
# 创建 Qdrant 向量存储对象
vector_store = Qdrant(
client=client,
collection_name=collection_name,
embeddings=embeddings
)
return vector_store
# 设置 RAG 管道
def setup_rag_pipeline(vector_store):
llm = Ollama(model="deepseek-r1:1.5b", base_url="http://127.0.0.1:11434")
qa_pipeline = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(search_kwargs={"k": 3})
)
return qa_pipeline
# 查询 RAG 管道
def query_rag_pipeline(qa_pipeline, question):
# 获取检索结果
result = qa_pipeline({"query": question})
# 提取检索到的文档及其元数据
retrieved_docs = qa_pipeline.retriever.get_relevant_documents(question)
# 构建返回结果
response = {
"answer": result["result"], # 模型生成的答案
"sources": [] # 包含 doc_id 和内容的来源文档
}
for doc in retrieved_docs:
response["sources"].append({
"doc_id": doc.metadata.get("doc_id", "未知"), # 提取 doc_id
"content": doc.page_content # 提取内容
})
return response
# 加载并分割文档
def load_and_split_documents(file_path):
# 检查文件路径是否有效
if not file_path:
raise ValueError("文件路径不能为空")
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
print(f"加载文件: {file_path}")
if file_path.endswith('.pdf'):
print("使用 PyPDFLoader")
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
print("使用 Docx2txtLoader")
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.txt'):
print("使用 TextLoader")
loader = TextLoader(file_path)
else:
raise ValueError("Unsupported file format")
documents = loader.load()
print(f"加载了 {len(documents)} 个文档块")
# 文档分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块 500 字符
chunk_overlap=50 # 块之间重叠 50 字符
)
chunks = text_splitter.split_documents(documents)
print(f"分块后得到 {len(chunks)} 个块")
return chunks
# 主函数
def main():
file_path = r"C:\Users\leon\Desktop\断点续传\python_learn\2025\20250322\rag\pdf\company_law.pdf"
chunks = load_and_split_documents(file_path)
# 1. 创建或加载 Qdrant 向量存储
collection_name = "CompanyLaw"
vector_store = create_qdrant_vector_store(chunks, collection_name)
# 2. 设置 RAG 管道
qa_pipeline = setup_rag_pipeline(vector_store)
# 3. 查询
question = "关于公司法律条款的解释"
response = query_rag_pipeline(qa_pipeline, question)
# 打印结果
print("回答:", response["answer"])
print("来源文档:")
for source in response["sources"]:
print(f"- doc_id: {source['doc_id']}, 内容: {source['content']}")
if __name__ == "__main__":
main()
代码说明
-
Qdrant 客户端初始化:
- 使用
QdrantClient连接到 Qdrant 服务。 - 如果集合不存在,则创建集合。
- 使用
-
插入数据:
- 将文档的嵌入向量和元数据(包括
doc_id)插入到 Qdrant 集合中。
- 将文档的嵌入向量和元数据(包括
-
查询数据:
- 使用
RetrievalQA进行语义搜索。 - 返回模型生成的答案以及来源文档的
doc_id和内容。
- 使用
-
权限控制:
- 通过元数据中的
doc_id实现权限控制。
- 通过元数据中的
运行步骤
-
启动 Qdrant 服务:
docker run -d -p 6333:6333 --name qdrant qdrant/qdrant -
运行 Python 脚本:
python rag_with_qdrant.py
总结
- 代码已从 Weaviate 迁移到 Qdrant,支持高效的向量检索和元数据过滤。
- 插入数据时包含
doc_id,查询时返回内容及其对应的doc_id。 - 代码结构清晰,易于扩展和维护。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)