使用 Python 构建 Weaviate 工具类:实现数据插入、语义搜索、混合检索与集合管理

随着 AI 与大模型的广泛应用,向量数据库成为文本、图像、音频等非结构化数据管理的核心组件。本文将通过封装一个基于 Python 的 WeaviateHelper 类,演示如何结合本地向量模型(如 Ollama 部署的 nomic-embed-text)实现数据管理、插入、语义检索与混合搜索。


目录

  1. 技术栈概览
  2. 核心功能介绍
  3. 完整代码讲解
  4. 使用示例与效果

技术栈概览

技术 说明
Weaviate 向量数据库,支持多模态数据管理与语义搜索
Ollama 本地大模型部署平台,用于运行 nomic-embed-text 向量模型
Loguru 更友好的日志库,替代 Python 原生 logging
Python 封装 Weaviate 功能逻辑,主导语言

核心功能介绍

WeaviateHelper 封装了常见的操作接口,包括但不限于:

  • 创建集合并配置向量化器;
  • 插入单条或多条数据;
  • 语义搜索(near_text);
  • 混合搜索(Hybrid Search);
  • 按属性删除集合中的数据;
  • 获取集合中的所有数据;
  • 删除集合、列出集合等管理操作。

完整代码讲解

以下是完整工具类代码(为保证可运行性,部分配置需根据实际环境调整):

pip install weaviate-client

1. 初始化连接

self.client = weaviate.connect_to_custom(
    http_host=http_host,
    http_port=http_port,
    grpc_host=grpc_host,
    grpc_port=grpc_port,
    skip_init_checks=True,
    http_secure=False,
    grpc_secure=False,
)

✅ 支持通过 IP + 端口连接 Weaviate 服务,同时允许使用本地部署的 Embedding 模型。


2. 创建集合并指定文本向量化模型

self.client.collections.create(
    weibo_collection,
    vectorizer_config=Configure.Vectorizer.text2vec_ollama(
        model="nomic-embed-text",
        api_endpoint=self.ollama_endpoint
    ),
    properties=[
        Property(name=name, data_type=data_type)
        for name, data_type in properties
    ],
)

✅ 通过 text2vec_ollama 接入 Ollama 本地模型;避免依赖 OpenAI/Replicate API,有利于私有部署与成本控制。


3. 插入数据支持批量导入

collection = self.client.collections.get(weibo_collection)
collection.data.insert_many(data)

✅ 推荐使用 insert_many 批量导入,提升性能。


4. 混合搜索与语义搜索

# 混合搜索:结合 BM25 和语义向量
collection.query.hybrid(
    query=query,
    alpha=alpha,
    limit=limit,
    return_metadata=MetadataQuery(distance=True, score=True)
)
# 语义搜索:向量相似度搜索
collection.query.near_text(
    query=query,
    limit=limit,
    return_metadata=MetadataQuery(distance=True)
)

hybrid 模式可调整 alpha 参数平衡关键词权重与向量语义。


5. 数据删除与集合管理

# 删除属性匹配的数据
where_filter = Filter.by_property(property_name).like(property_value)
collection.data.delete_many(where=where_filter)
# 删除集合
self.client.collections.delete(weibo_collection)

✅ 精细化控制数据生命周期,适用于定期清理历史数据或临时集合。


使用示例与效果

示例代码片段:

helper = WeaviateHelper(...)
helper.create_collection("weibo_collection", [
    ("platform", DataType.TEXT),
    ("username", DataType.TEXT),
    ("content", DataType.TEXT),
])

helper.insert_data("weibo_collection", [{
    'platform': 'weibo',
    'username': 'user01',
    'content': '测试内容测试内容'
}])

results = helper.semantic_search("weibo_collection", "内容")
for result in results:
    print(result.properties)

✅ 示例操作展示了集合创建 → 数据导入 → 语义搜索的完整流程。

完整代码

import traceback
import weaviate
from weaviate.classes.config import Configure, Property
from weaviate.classes.query import MetadataQuery, Filter
from loguru import logger


class WeaviateHelper:
    def __init__(self, http_host, http_port, grpc_host, grpc_port, ollama_endpoint):
        self.client = weaviate.connect_to_custom(
            http_host=http_host,
            http_port=http_port,
            http_secure=False,
            grpc_host=grpc_host,
            grpc_port=grpc_port,
            grpc_secure=False,
            skip_init_checks=True,
        )
        self.ollama_endpoint = ollama_endpoint

    def create_collection(self, weibo_collection, properties):
        """创建集合
        Args:
            weibo_collection (str): 集合名称
            properties (list): 属性列表,例如 [("platform", DataType.TEXT), ("username", DataType.TEXT)]
        """
        try:
            self.client.collections.create(
                weibo_collection,
                vectorizer_config=Configure.Vectorizer.text2vec_ollama(
                    model="nomic-embed-text", api_endpoint=self.ollama_endpoint
                ),
                properties=[
                    Property(name=name, data_type=data_type)
                    for name, data_type in properties
                ],
            )
        except Exception as e:
            logger.warning(
                f"Collection '{weibo_collection}' already exists or error: {e}"
            )

    def insert_data(self, weibo_collection, data):
        """插入数据到集合
        Args:
            weibo_collection (str): 集合名称
            data (list): 要插入的数据列表
        """
        collection = self.client.collections.get(weibo_collection)
        result = collection.data.insert_many(data)
        logger.info(f"Insertion response: {result}")
        return result

    def hybrid_search(self, weibo_collection, query, alpha=0.5, limit=1):
        """混合搜索(结合向量搜索和关键词搜索)
        Args:
            weibo_collection (str): 集合名称
            query (str): 搜索查询
            alpha (float): 向量搜索和关键词搜索的权重比例
            limit (int): 返回结果数量
        """
        collection = self.client.collections.get(weibo_collection)
        response = collection.query.hybrid(
            query=query,
            alpha=alpha,
            limit=limit,
            return_metadata=MetadataQuery(distance=True, score=True),
        )
        return response.objects

    def semantic_search(self, weibo_collection, query, limit=1):
        """语义搜索
        Args:
            weibo_collection (str): 集合名称
            query (str): 搜索查询
            limit (int): 返回结果数量
        """
        collection = self.client.collections.get(weibo_collection)
        response = collection.query.near_text(
            query=query, limit=limit, return_metadata=MetadataQuery(distance=True)
        )
        return response.objects

    def close(self):
        """关闭客户端连接"""
        self.client.close()

    def get_all_data(self, weibo_collection):
        """获取集合中的所有数据
        Args:
            weibo_collection (str): 集合名称
        Returns:
            list: 集合中的所有对象
        """
        collection = self.client.collections.get(weibo_collection)
        response = collection.query.fetch_objects(
            limit=10000
        )  # 设置较大的限制以获取所有数据
        return response.objects

    def delete_collection(self, weibo_collection):
        """删除指定的集合
        Args:
            weibo_collection (str): 要删除的集合名称
        """
        try:
            self.client.collections.delete(weibo_collection)
            logger.info(f"Collection '{weibo_collection}' has been deleted successfully")
        except Exception as e:
            logger.error(f"Error deleting collection '{weibo_collection}': {e}")

    def delete_collection_by_property_name(
        self, weibo_collection, property_name, property_value
    ):
        """根据属性名称删除集合中的数据
        Args:
            weibo_collection (str): 集合名称
            property_name (str): 属性名称
            property_value (str): 属性值
        """
        try:
            collection = self.client.collections.get(weibo_collection)

            where_filter = Filter.by_property(property_name).like(property_value)
            collection.data.delete_many(where=where_filter)
            logger.info(
                f"Successfully deleted data where {property_name}={property_value}"
            )
        except Exception as e:
            logger.error(f"Error deleting data: {traceback.format_exc()}")

    def get_all_collections(self):
        """获取所有集合的列表
        Returns:
            list: 所有集合名称的列表
        """
        try:
            collections = self.client.collections.list_all()
            logger.info(f"Found {len(collections)} collections")
            return collections
        except Exception as e:
            logger.error(f"Error getting collections: {traceback.format_exc()}")
            return []


# 使用示例
if __name__ == "__main__":
    # 创建助手实例

    helper = WeaviateHelper(
        http_host="x.x.x.x",
        http_port=8080,
        grpc_host="x.x.x.x",
        grpc_port=50051,
        ollama_endpoint="http://x.x.x.x:11434",
    )
    try:
        # 获取所有集合示例
        collections = helper.get_all_collections()
        for collection in collections:
            logger.info(f"Collection name: {collection}")

        # 定义集合属性
        # properties = [
        #     ("platform", DataType.TEXT),
        #     ("username", DataType.TEXT),
        #     ("content", DataType.TEXT),
        # ]

        # 创建集合
        # helper.create_collection("weibo_collection", properties)

        # 插入数据
        # test_data = [{
        #     'platform': 'weibo',
        #     'username': 'username',
        #     'content': '测试内容测试内容测试'
        # }]
        # helper.insert_data("weibo_collection", test_data)

        # 混合搜索
        # results = helper.semantic_search("weibo_collection", "weibo")
        # for result in results:
        #     logger.info(f"帖子内容: {result.properties}")
        #     logger.info(f"语义距离: {result.metadata.distance}, BM25 分数: {result.metadata.score}")

        # 获取所有数据示例
        # all_data = helper.get_all_data("weibo_collection")
        # logger.info("所有数据:")
        # for item in all_data:
        #     logger.info(f"数据: {item.properties}")

        # 删除集合示例(取消注释以执行)
        # helper.delete_collection("weibo_collection")

        # 删除特定属性的数据示例
        # helper.delete_collection_by_property_name("weibo_collection", "platform", "weibo")

        # 获取所有数据示例
        # all_data = helper.get_all_data("weibo_collection")
        # logger.info("所有数据:")
        # for item in all_data:
        #     logger.info(f"数据: {item.properties}")
    except Exception:
        logger.error(traceback.format_exc())
    finally:
        # 关闭连接
        helper.close()

总结与建议

✅ 本文优势

  • 一站式封装 Weaviate 的常用接口;
  • 适配本地 Ollama 模型,降低外部依赖;
  • 完善的日志记录与异常捕捉;
  • 支持多种搜索方式,满足不同业务场景。

参考文档:https://weaviate.io/developers/weaviate

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐