Milvus向量数据库连接池优化实现
·
Milvus 连接池优化方案
是的,每次调用 from_texts() 都会创建新连接,这样效率不高。我们可以使用连接池来复用连接。以下是优化后的完整代码:
import threading
from pymilvus import connections, utility, Collection
from queue import Queue, Empty
from typing import List, Dict, Optional, Any
from langchain.vectorstores.milvus import Milvus
class MilvusConnectionPool:
_instance = None
_lock = threading.Lock()
def __new__(cls, host: str, port: str, pool_size: int = 5):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(MilvusConnectionPool, cls).__new__(cls)
cls._instance._initialize_pool(host, port, pool_size)
return cls._instance
def _initialize_pool(self, host: str, port: str, pool_size: int):
self.host = host
self.port = port
self.pool_size = pool_size
self._pool = Queue(maxsize=pool_size)
self._lock = threading.Lock()
# 预先创建连接并放入池中
for i in range(pool_size):
alias = f"conn_{i}"
connections.connect(alias=alias, host=host, port=port)
self._pool.put(alias)
def get_connection(self) -> str:
"""从连接池获取一个连接别名"""
try:
alias = self._pool.get_nowait()
# 检查连接是否有效
try:
connections.get_connection_addr(alias)
return alias
except Exception:
# 如果连接无效,重新创建
connections.connect(alias=alias, host=self.host, port=self.port)
return alias
except Empty:
raise Exception("Connection pool exhausted")
def release_connection(self, alias: str):
"""释放连接回连接池"""
self._pool.put(alias)
def close_all(self):
"""关闭所有连接"""
while not self._pool.empty():
try:
alias = self._pool.get_nowait()
connections.remove_connection(alias)
except Empty:
break
class OptimizedMilvus(Milvus):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化连接池
self.connection_pool = MilvusConnectionPool(
host=self.milvus_host,
port=self.milvus_port,
pool_size=10 # 可根据需要调整
)
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Any,
metadatas: Optional[List[dict]] = None,
collection_name: str = "LangChainCollection",
connection_args: Dict[str, Any] = None,
**kwargs: Any,
) -> "Milvus":
"""优化后的from_texts方法,使用连接池"""
# 获取或创建连接池实例
if connection_args is None:
raise ValueError("connection_args must be provided")
connection_pool = MilvusConnectionPool(
host=connection_args["host"],
port=connection_args["port"],
pool_size=10
)
# 从连接池获取连接
alias = connection_pool.get_connection()
try:
# 使用获取的连接进行操作
connections.connect(alias=alias, **connection_args)
# 调用父类方法
instance = super().from_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
collection_name=collection_name,
connection_args={"alias": alias}, # 使用现有连接
**kwargs
)
# 将连接池实例保存到对象中
instance.connection_pool = connection_pool
return instance
except Exception as e:
# 发生异常时释放连接
connection_pool.release_connection(alias)
raise e
finally:
# 正常完成时释放连接
connection_pool.release_connection(alias)
def __del__(self):
"""对象销毁时确保连接被释放"""
if hasattr(self, 'connection_pool'):
self.connection_pool.close_all()
使用示例
# 初始化
vector_db = OptimizedMilvus.from_texts(
texts=chunk_texts,
embedding=embeddings,
metadatas=metadata_list,
collection_name="my_collection",
connection_args={"host": "localhost", "port": "19530"}
)
# 后续操作会自动使用连接池
优化说明
-
连接池管理:
MilvusConnectionPool类管理多个 Milvus 连接,避免频繁创建和销毁连接。 -
连接复用:每次操作从连接池获取连接,使用后归还,而不是每次都新建连接。
-
线程安全:使用线程锁确保连接池在多线程环境下的安全访问。
-
自动清理:在对象销毁时自动关闭所有连接,防止资源泄漏。
-
兼容性:继承了原始的
Milvus类,保持原有功能不变。
这种优化可以显著提高 Milvus 操作的性能,特别是在高并发场景下。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)