Milvus 连接池优化方案

是的,每次调用 from_texts() 都会创建新连接,这样效率不高。我们可以使用连接池来复用连接。以下是优化后的完整代码:

import threading
from pymilvus import connections, utility, Collection
from queue import Queue, Empty
from typing import List, Dict, Optional, Any
from langchain.vectorstores.milvus import Milvus

class MilvusConnectionPool:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, host: str, port: str, pool_size: int = 5):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super(MilvusConnectionPool, cls).__new__(cls)
                    cls._instance._initialize_pool(host, port, pool_size)
        return cls._instance

    def _initialize_pool(self, host: str, port: str, pool_size: int):
        self.host = host
        self.port = port
        self.pool_size = pool_size
        self._pool = Queue(maxsize=pool_size)
        self._lock = threading.Lock()

        # 预先创建连接并放入池中
        for i in range(pool_size):
            alias = f"conn_{i}"
            connections.connect(alias=alias, host=host, port=port)
            self._pool.put(alias)

    def get_connection(self) -> str:
        """从连接池获取一个连接别名"""
        try:
            alias = self._pool.get_nowait()
            # 检查连接是否有效
            try:
                connections.get_connection_addr(alias)
                return alias
            except Exception:
                # 如果连接无效,重新创建
                connections.connect(alias=alias, host=self.host, port=self.port)
                return alias
        except Empty:
            raise Exception("Connection pool exhausted")

    def release_connection(self, alias: str):
        """释放连接回连接池"""
        self._pool.put(alias)

    def close_all(self):
        """关闭所有连接"""
        while not self._pool.empty():
            try:
                alias = self._pool.get_nowait()
                connections.remove_connection(alias)
            except Empty:
                break


class OptimizedMilvus(Milvus):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 初始化连接池
        self.connection_pool = MilvusConnectionPool(
            host=self.milvus_host,
            port=self.milvus_port,
            pool_size=10  # 可根据需要调整
        )

    @classmethod
    def from_texts(
        cls,
        texts: List[str],
        embedding: Any,
        metadatas: Optional[List[dict]] = None,
        collection_name: str = "LangChainCollection",
        connection_args: Dict[str, Any] = None,
        **kwargs: Any,
    ) -> "Milvus":
        """优化后的from_texts方法,使用连接池"""
        # 获取或创建连接池实例
        if connection_args is None:
            raise ValueError("connection_args must be provided")
            
        connection_pool = MilvusConnectionPool(
            host=connection_args["host"],
            port=connection_args["port"],
            pool_size=10
        )
        
        # 从连接池获取连接
        alias = connection_pool.get_connection()
        
        try:
            # 使用获取的连接进行操作
            connections.connect(alias=alias, **connection_args)
            
            # 调用父类方法
            instance = super().from_texts(
                texts=texts,
                embedding=embedding,
                metadatas=metadatas,
                collection_name=collection_name,
                connection_args={"alias": alias},  # 使用现有连接
                **kwargs
            )
            
            # 将连接池实例保存到对象中
            instance.connection_pool = connection_pool
            
            return instance
        except Exception as e:
            # 发生异常时释放连接
            connection_pool.release_connection(alias)
            raise e
        finally:
            # 正常完成时释放连接
            connection_pool.release_connection(alias)

    def __del__(self):
        """对象销毁时确保连接被释放"""
        if hasattr(self, 'connection_pool'):
            self.connection_pool.close_all()

使用示例

# 初始化
vector_db = OptimizedMilvus.from_texts(
    texts=chunk_texts,
    embedding=embeddings,
    metadatas=metadata_list,
    collection_name="my_collection",
    connection_args={"host": "localhost", "port": "19530"}
)

# 后续操作会自动使用连接池

优化说明

  1. 连接池管理MilvusConnectionPool 类管理多个 Milvus 连接,避免频繁创建和销毁连接。

  2. 连接复用:每次操作从连接池获取连接,使用后归还,而不是每次都新建连接。

  3. 线程安全:使用线程锁确保连接池在多线程环境下的安全访问。

  4. 自动清理:在对象销毁时自动关闭所有连接,防止资源泄漏。

  5. 兼容性:继承了原始的 Milvus 类,保持原有功能不变。

这种优化可以显著提高 Milvus 操作的性能,特别是在高并发场景下。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐