20250630015432

写在前边

本篇介绍 Zulip API 的使用方法:如何获取 Zulip 中的聊天数据用于集成到 AI 对话等场景。

在开始教程前,我想先做一些重要的声明和倡议。

  1. 隐私保护:在将论坛数据用于训练或其他用途时,请务必保护用户隐私,遵守相关法律法规
  2. 技术中立:技术本身是中立的,请不要滥用这些技术手段
  3. 合理使用:请确保你的使用符合目标 Zulip 实例的使用条款和政策

话说回来,好的方面是现在有很多辅助阅读的文档工具。通过 API 来实现 Zulip 内容的智能阅读和总结,确实是一个不错的选择。

什么是 Zulip

Zulip 是一款强大的、开源的团队沟通工具。与许多其他聊天应用不同,Zulip 的核心是基于话题(Topic)的线程化对话模型

Zulip 的独特之处

  • Stream + Topic 结构:每个对话都清晰地组织在特定的"流"(Stream,类似于频道)下的"话题"中
  • 信息组织:极大地减少了信息噪音,使得追踪多个并行对话变得异常轻松
  • 异步友好:即使你离开了一段时间,也能快速跟上进度
  • 功能丰富:强大的搜索、代码块高亮、Markdown 支持、表情回应等

Zulip API 概述

Zulip 提供了功能丰富的 REST API,支持几乎所有的操作。通过 Zulip API,你可以:

  1. 集成外部服务:将来自 GitHub、Jira、监控系统等的通知推送到指定的流和话题
  2. 构建交互式机器人:创建能够响应消息、执行任务的智能 Bot
  3. 自动化工作流程:自动发送报告、提醒,或触发其他系统操作
  4. 数据分析:获取聊天数据进行分析、总结,或与 AI 系统集成

我们本篇主要关注的是 Zulip 信息的获取:从频道信息、主题信息,到具体的对话内容。这样可以通过 AI 快速阅读总结相关帖子,阅读过程中也可以随时和 AI 交流。

准备工作

1. 获取 API 凭证

首先,你需要在 Zulip 中创建一个 Bot 账户:

  1. 登录你的 Zulip 实例
  2. 进入 SettingsYour bots
  3. 点击 Add a new bot
  4. 选择 Generic bot,填写必要信息
  5. 创建后会得到 Bot 的邮箱地址和 API Key

2. 安装依赖

pip install aiohttp asyncio

实现 Zulip API 客户端

下面是一个完整的异步 Zulip API 客户端实现:

import asyncio
import aiohttp
import json
import logging
from typing import List, Dict, Optional, Any
import os

ZULIP_BOT_EMAIL = os.getenv('ZULIP_BOT_EMAIL')
ZULIP_BOT_API_KEY = os.getenv('ZULIP_BOT_API_KEY')
ZULIP_SITE = os.getenv('ZULIP_SITE')

class ZulipClient:
    def __init__(self, 
                 site_url: Optional[str]=None,
                 bot_email: Optional[str]=None,
                 bot_api_key: Optional[str]=None,
                 logger: Optional[logging.Logger] = None):
        """初始化 Zulip 客户端"""
        if site_url is None:
            site_url = ZULIP_SITE
        if bot_email is None:
            bot_email = ZULIP_BOT_EMAIL
        if bot_api_key is None:
            bot_api_key = ZULIP_BOT_API_KEY
        
        if not all([site_url, bot_email, bot_api_key]):
            raise ValueError("site_url, bot_email, 和 bot_api_key 不能为空")
        
        self.base_url = site_url.rstrip('/') + "/api/v1"
        self._auth = aiohttp.BasicAuth(login=bot_email, password=bot_api_key)
        self.logger = logger or logging.getLogger(self.__class__.__name__)
        self._session: Optional[aiohttp.ClientSession] = None
        self._closed = False  # 添加状态标记

    async def __aenter__(self):
        """进入异步上下文,创建并存储 aiohttp session"""
        if self._closed:
            raise RuntimeError("Client has been closed and cannot be reused")
            
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
            self.logger.info("Created new aiohttp session")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出异步上下文,关闭 aiohttp session"""
        await self.close()

    async def close(self):
        """显式关闭客户端"""
        if self._session and not self._session.closed:
            await self._session.close()
            self.logger.info("Closed aiohttp session")
        self._session = None
        self._closed = True

    def shutdown(self):
        """同步方法关闭客户端"""
        if not self._closed:
            self._run_async(self.close())

    async def _ensure_session(self):
        """确保 session 可用"""
        if self._closed:
            raise RuntimeError("Client has been closed and cannot be reused")
            
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
            self.logger.info("Created new aiohttp session")
        return self._session

    async def _request(self, method: str, endpoint: str, 
                      params: Optional[Dict] = None, 
                      data: Optional[Dict] = None) -> Optional[Dict]:
        """基础请求方法"""
        if self._closed:
            raise RuntimeError("Client has been closed and cannot be reused")
            
        session = await self._ensure_session()
        url = f"{self.base_url}{endpoint}"
        
        try:
            async with session.request(method, url, params=params, 
                                     data=data, auth=self._auth) as resp:
                if resp.status == 200:
                    return await resp.json()
                self.logger.warning(f"请求失败: {resp.status} - {url}")
                return None
        except Exception as e:
            self.logger.error(f"请求异常: {e}")
            return None

    def __del__(self):
        """析构函数"""
        if not self._closed:
            self.shutdown()

    def _run_async(self, coro):
        """运行异步代码的同步包装器"""
        try:
            asyncio.get_running_loop()
            raise RuntimeError("不能在异步环境中调用同步方法")
        except RuntimeError:
            return asyncio.run(coro)

    async def _get_messages_batch(self, narrow: List[Dict], 
                                anchor: str, 
                                batch_size: int = 100) -> Optional[Dict]:
        """获取一批消息的通用方法"""
        params = {
            "narrow": json.dumps(narrow),
            "anchor": anchor,
            "num_before": batch_size,
            "num_after": 0
        }
        return await self._request("GET", "/messages", params=params)

    # === 异步公共接口 ===
    async def get_channels_async(self) -> List[Dict]:
        """获取频道列表"""
        data = await self._request("GET", "/users/me/subscriptions")
        return data.get("subscriptions", []) if data else []

    async def get_topics_async(self, stream_id: int) -> List[Dict]:
        """获取话题列表"""
        data = await self._request("GET", f"/users/me/{stream_id}/topics")
        return data.get("topics", []) if data else []

    async def get_topic_history_async(self, stream_id: int, topic_name: str, 
                          batch_size: int = 100, latest:bool=True) -> List[Dict[str, Any]]:
        """
        获取指定频道和话题的完整消息历史记录。
    
        Args:
            stream_id: 频道ID
            topic_name: 话题名称
            batch_size: 每次请求获取的消息数量,建议不超过1000
    
        Returns:
            List[Dict]: 按时间顺序(从旧到新)排列的所有消息列表
        """
        self.logger.info(f"开始获取话题 '{topic_name}' (频道 {stream_id}) 的完整历史...")
        
        # 构建查询条件
        narrow = [
            {"operator": "stream", "operand": stream_id},
            {"operator": "topic", "operand": topic_name}
        ]
        
        all_messages = []
        anchor = "newest"
        request_count = 0
        
        while True:
            request_count += 1
            self.logger.debug(f"第 {request_count} 次请求: anchor='{anchor}'")
            
            # 使用基础请求方法获取数据
            params = {
                "narrow": json.dumps(narrow),
                "anchor": anchor,
                "num_before": batch_size,
                "num_after": 0,
                "apply_markdown": 'false'
            }
            
            data = await self._request("GET", "/messages", params=params)
            
            if not data or "result" not in data:
                self.logger.error(f"获取消息批次 #{request_count} 失败")
                return []
                
            messages = data.get("messages", [])
            if not messages:
                self.logger.debug("收到空消息批次,假定已达到最早消息")
                break
                
            # 将新消息添加到列表前面
            all_messages = messages + all_messages
            
            # 检查是否到达最早消息
            if data.get("found_oldest", False):
                self.logger.debug("已确认达到最早消息")
                break
                
            # 更新锚点为当前批次最早消息的ID
            anchor = str(messages[0]["id"])
            
            # 添加请求延迟
            await asyncio.sleep(0.5)
            
            # 安全检查:限制最大请求次数
            if request_count >= 1000:
                self.logger.warning("达到最大请求次数限制,返回可能不完整的结果")
                break
        
        self.logger.info(f"已完成话题历史获取:共 {len(all_messages)} 条消息,用了 {request_count} 次请求")
        
        # 按时间顺序排序
        messages = sorted(all_messages, key=lambda x: x["id"])
        if latest:
            msgs, msg_ids = [], set()
            for msg in messages:
                msg_id = msg['id']
                if msg_id in msg_ids:
                    msgs[-1] = msg
                else:
                    msg_ids.add(msg_id)
                    msgs.append(msg)
            self.logger.info(f"去重后:共 {len(msgs)} 条消息")
            return msgs
        else:
            return messages

    # === 同步公共接口 ===
    def get_channels(self) -> List[Dict]:
        """同步获取频道列表"""
        return self._run_async(self.get_channels_async())

    def get_topics(self, stream_id: int) -> List[Dict]:
        """同步获取话题列表"""
        return self._run_async(self.get_topics_async(stream_id))

    def get_topic_history(self, stream_id: int, topic_name: str, 
                         batch_size: int = 100, latest:bool=True) -> List[Dict]:
        """同步获取话题完整历史"""
        return self._run_async(
            self.get_topic_history_async(stream_id, topic_name, batch_size, latest)
        )

使用示例

基础用法

import logging

# 配置日志
logging.basicConfig(level=logging.INFO)

# 初始化客户端
client = ZulipClient(
    site_url="https://your-zulip-instance.zulipchat.com",
    bot_email="your-bot@yourdomain.com",
    bot_api_key="your-api-key"
)

# 获取频道列表
channels = client.get_channels()
print(f"找到 {len(channels)} 个频道")

for channel in channels[:5]:  # 显示前5个频道
    print(f"- {channel['name']} (ID: {channel['stream_id']})")

异步用法

import asyncio

async def main():
    async with ZulipClient(site_url, bot_email, bot_api_key) as client:
        # 获取频道
        channels = await client.get_channels_async()
        
        if channels:
            stream_id = channels[0]['stream_id']
            
            # 获取话题
            topics = await client.get_topics_async(stream_id)
            print(f"频道 {channels[0]['name']}{len(topics)} 个话题")
            
            if topics:
                # 获取第一个话题的历史消息
                topic_name = topics[0]['name']
                messages = await client.get_topic_history_async(stream_id, topic_name)
                
                print(f"话题 '{topic_name}' 有 {len(messages)} 条消息")
                
                # 显示最新的几条消息
                for msg in messages[-3:]:
                    print(f"[{msg['sender_full_name']}]: {msg['content'][:100]}...")

# 运行异步代码
asyncio.run(main())

通过 Zulip API,我们可以轻松地获取和处理 Zulip 中的聊天数据。这为构建 AI 辅助阅读、自动摘要、智能问答等应用奠定了基础。记住,技术的价值在于如何负责任地使用它来改善我们的工作和学习体验。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐