【Python】使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理
本文介绍了如何使用 Python 构建一个 WeaviateHelper 工具类,用于管理 Weaviate 向量数据库中的数据。通过结合本地向量模型(如 Ollama 部署的 nomic-embed-text),该工具类支持数据插入、语义搜索、混合检索以及集合管理等功能。文章详细讲解了工具类的核心功能,包括创建集合、批量插入数据、混合搜索与语义搜索的实现,并提供了代码示例。该工具类适用于私有部署
使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理
随着 AI 与大模型的广泛应用,向量数据库成为文本、图像、音频等非结构化数据管理的核心组件。本文将通过封装一个基于 Python 的 WeaviateHelper
类,演示如何结合本地向量模型(如 Ollama 部署的 nomic-embed-text
)实现数据管理、插入、语义检索与混合搜索。
目录
技术栈概览
技术 | 说明 |
---|---|
Weaviate | 向量数据库,支持多模态数据管理与语义搜索 |
Ollama | 本地大模型部署平台,用于运行 nomic-embed-text 向量模型 |
Loguru | 更友好的日志库,替代 Python 原生 logging |
Python | 封装 Weaviate 功能逻辑,主导语言 |
核心功能介绍
WeaviateHelper
封装了常见的操作接口,包括但不限于:
- 创建集合并配置向量化器;
- 插入单条或多条数据;
- 语义搜索(near_text);
- 混合搜索(Hybrid Search);
- 按属性删除集合中的数据;
- 获取集合中的所有数据;
- 删除集合、列出集合等管理操作。
完整代码讲解
以下是完整工具类代码(为保证可运行性,部分配置需根据实际环境调整):
pip install weaviate-client
1. 初始化连接
self.client = weaviate.connect_to_custom(
http_host=http_host,
http_port=http_port,
grpc_host=grpc_host,
grpc_port=grpc_port,
skip_init_checks=True,
http_secure=False,
grpc_secure=False,
)
✅ 支持通过 IP + 端口连接 Weaviate 服务,同时允许使用本地部署的 Embedding 模型。
2. 创建集合并指定文本向量化模型
self.client.collections.create(
weibo_collection,
vectorizer_config=Configure.Vectorizer.text2vec_ollama(
model="nomic-embed-text",
api_endpoint=self.ollama_endpoint
),
properties=[
Property(name=name, data_type=data_type)
for name, data_type in properties
],
)
✅ 通过
text2vec_ollama
接入 Ollama 本地模型;避免依赖 OpenAI/Replicate API,有利于私有部署与成本控制。
3. 插入数据支持批量导入
collection = self.client.collections.get(weibo_collection)
collection.data.insert_many(data)
✅ 推荐使用
insert_many
批量导入,提升性能。
4. 混合搜索与语义搜索
# 混合搜索:结合 BM25 和语义向量
collection.query.hybrid(
query=query,
alpha=alpha,
limit=limit,
return_metadata=MetadataQuery(distance=True, score=True)
)
# 语义搜索:向量相似度搜索
collection.query.near_text(
query=query,
limit=limit,
return_metadata=MetadataQuery(distance=True)
)
✅
hybrid
模式可调整alpha
参数平衡关键词权重与向量语义。
5. 数据删除与集合管理
# 删除属性匹配的数据
where_filter = Filter.by_property(property_name).like(property_value)
collection.data.delete_many(where=where_filter)
# 删除集合
self.client.collections.delete(weibo_collection)
✅ 精细化控制数据生命周期,适用于定期清理历史数据或临时集合。
使用示例与效果
示例代码片段:
helper = WeaviateHelper(...)
helper.create_collection("weibo_collection", [
("platform", DataType.TEXT),
("username", DataType.TEXT),
("content", DataType.TEXT),
])
helper.insert_data("weibo_collection", [{
'platform': 'weibo',
'username': 'user01',
'content': '测试内容测试内容'
}])
results = helper.semantic_search("weibo_collection", "内容")
for result in results:
print(result.properties)
✅ 示例操作展示了集合创建 → 数据导入 → 语义搜索的完整流程。
完整代码
import traceback
import weaviate
from weaviate.classes.config import Configure, Property
from weaviate.classes.query import MetadataQuery, Filter
from loguru import logger
class WeaviateHelper:
def __init__(self, http_host, http_port, grpc_host, grpc_port, ollama_endpoint):
self.client = weaviate.connect_to_custom(
http_host=http_host,
http_port=http_port,
http_secure=False,
grpc_host=grpc_host,
grpc_port=grpc_port,
grpc_secure=False,
skip_init_checks=True,
)
self.ollama_endpoint = ollama_endpoint
def create_collection(self, weibo_collection, properties):
"""创建集合
Args:
weibo_collection (str): 集合名称
properties (list): 属性列表,例如 [("platform", DataType.TEXT), ("username", DataType.TEXT)]
"""
try:
self.client.collections.create(
weibo_collection,
vectorizer_config=Configure.Vectorizer.text2vec_ollama(
model="nomic-embed-text", api_endpoint=self.ollama_endpoint
),
properties=[
Property(name=name, data_type=data_type)
for name, data_type in properties
],
)
except Exception as e:
logger.warning(
f"Collection '{weibo_collection}' already exists or error: {e}"
)
def insert_data(self, weibo_collection, data):
"""插入数据到集合
Args:
weibo_collection (str): 集合名称
data (list): 要插入的数据列表
"""
collection = self.client.collections.get(weibo_collection)
result = collection.data.insert_many(data)
logger.info(f"Insertion response: {result}")
return result
def hybrid_search(self, weibo_collection, query, alpha=0.5, limit=1):
"""混合搜索(结合向量搜索和关键词搜索)
Args:
weibo_collection (str): 集合名称
query (str): 搜索查询
alpha (float): 向量搜索和关键词搜索的权重比例
limit (int): 返回结果数量
"""
collection = self.client.collections.get(weibo_collection)
response = collection.query.hybrid(
query=query,
alpha=alpha,
limit=limit,
return_metadata=MetadataQuery(distance=True, score=True),
)
return response.objects
def semantic_search(self, weibo_collection, query, limit=1):
"""语义搜索
Args:
weibo_collection (str): 集合名称
query (str): 搜索查询
limit (int): 返回结果数量
"""
collection = self.client.collections.get(weibo_collection)
response = collection.query.near_text(
query=query, limit=limit, return_metadata=MetadataQuery(distance=True)
)
return response.objects
def close(self):
"""关闭客户端连接"""
self.client.close()
def get_all_data(self, weibo_collection):
"""获取集合中的所有数据
Args:
weibo_collection (str): 集合名称
Returns:
list: 集合中的所有对象
"""
collection = self.client.collections.get(weibo_collection)
response = collection.query.fetch_objects(
limit=10000
) # 设置较大的限制以获取所有数据
return response.objects
def delete_collection(self, weibo_collection):
"""删除指定的集合
Args:
weibo_collection (str): 要删除的集合名称
"""
try:
self.client.collections.delete(weibo_collection)
logger.info(f"Collection '{weibo_collection}' has been deleted successfully")
except Exception as e:
logger.error(f"Error deleting collection '{weibo_collection}': {e}")
def delete_collection_by_property_name(
self, weibo_collection, property_name, property_value
):
"""根据属性名称删除集合中的数据
Args:
weibo_collection (str): 集合名称
property_name (str): 属性名称
property_value (str): 属性值
"""
try:
collection = self.client.collections.get(weibo_collection)
where_filter = Filter.by_property(property_name).like(property_value)
collection.data.delete_many(where=where_filter)
logger.info(
f"Successfully deleted data where {property_name}={property_value}"
)
except Exception as e:
logger.error(f"Error deleting data: {traceback.format_exc()}")
def get_all_collections(self):
"""获取所有集合的列表
Returns:
list: 所有集合名称的列表
"""
try:
collections = self.client.collections.list_all()
logger.info(f"Found {len(collections)} collections")
return collections
except Exception as e:
logger.error(f"Error getting collections: {traceback.format_exc()}")
return []
# 使用示例
if __name__ == "__main__":
# 创建助手实例
helper = WeaviateHelper(
http_host="x.x.x.x",
http_port=8080,
grpc_host="x.x.x.x",
grpc_port=50051,
ollama_endpoint="http://x.x.x.x:11434",
)
try:
# 获取所有集合示例
collections = helper.get_all_collections()
for collection in collections:
logger.info(f"Collection name: {collection}")
# 定义集合属性
# properties = [
# ("platform", DataType.TEXT),
# ("username", DataType.TEXT),
# ("content", DataType.TEXT),
# ]
# 创建集合
# helper.create_collection("weibo_collection", properties)
# 插入数据
# test_data = [{
# 'platform': 'weibo',
# 'username': 'username',
# 'content': '测试内容测试内容测试'
# }]
# helper.insert_data("weibo_collection", test_data)
# 混合搜索
# results = helper.semantic_search("weibo_collection", "weibo")
# for result in results:
# logger.info(f"帖子内容: {result.properties}")
# logger.info(f"语义距离: {result.metadata.distance}, BM25 分数: {result.metadata.score}")
# 获取所有数据示例
# all_data = helper.get_all_data("weibo_collection")
# logger.info("所有数据:")
# for item in all_data:
# logger.info(f"数据: {item.properties}")
# 删除集合示例(取消注释以执行)
# helper.delete_collection("weibo_collection")
# 删除特定属性的数据示例
# helper.delete_collection_by_property_name("weibo_collection", "platform", "weibo")
# 获取所有数据示例
# all_data = helper.get_all_data("weibo_collection")
# logger.info("所有数据:")
# for item in all_data:
# logger.info(f"数据: {item.properties}")
except Exception:
logger.error(traceback.format_exc())
finally:
# 关闭连接
helper.close()
总结与建议
✅ 本文优势
- 一站式封装 Weaviate 的常用接口;
- 适配本地 Ollama 模型,降低外部依赖;
- 完善的日志记录与异常捕捉;
- 支持多种搜索方式,满足不同业务场景。
参考文档:https://weaviate.io/developers/weaviate

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)