Zulip API 实战 | 获取频道,话题和聊天数据
Zulip API 使用指南:构建智能阅读与AI对话工具 本文介绍如何利用Zulip API获取频道、话题和消息数据,构建智能阅读和AI对话集成工具。文章首先强调了隐私保护和技术合理使用的重要性,随后解释了Zulip基于话题的线程化对话模型特点。主要内容包括Zulip API的功能概述(如集成外部服务、构建机器人等)、获取API凭证的步骤,以及一个完整的异步Zulip API客户端实现代码。该客户
写在前边
本篇介绍 Zulip API 的使用方法:如何获取 Zulip 中的聊天数据用于集成到 AI 对话等场景。
在开始教程前,我想先做一些重要的声明和倡议。
- 隐私保护:在将论坛数据用于训练或其他用途时,请务必保护用户隐私,遵守相关法律法规
- 技术中立:技术本身是中立的,请不要滥用这些技术手段
- 合理使用:请确保你的使用符合目标 Zulip 实例的使用条款和政策
话说回来,好的方面是现在有很多辅助阅读的文档工具。通过 API 来实现 Zulip 内容的智能阅读和总结,确实是一个不错的选择。
什么是 Zulip
Zulip 是一款强大的、开源的团队沟通工具。与许多其他聊天应用不同,Zulip 的核心是基于话题(Topic)的线程化对话模型。
Zulip 的独特之处
- Stream + Topic 结构:每个对话都清晰地组织在特定的"流"(Stream,类似于频道)下的"话题"中
- 信息组织:极大地减少了信息噪音,使得追踪多个并行对话变得异常轻松
- 异步友好:即使你离开了一段时间,也能快速跟上进度
- 功能丰富:强大的搜索、代码块高亮、Markdown 支持、表情回应等
Zulip API 概述
Zulip 提供了功能丰富的 REST API,支持几乎所有的操作。通过 Zulip API,你可以:
- 集成外部服务:将来自 GitHub、Jira、监控系统等的通知推送到指定的流和话题
- 构建交互式机器人:创建能够响应消息、执行任务的智能 Bot
- 自动化工作流程:自动发送报告、提醒,或触发其他系统操作
- 数据分析:获取聊天数据进行分析、总结,或与 AI 系统集成
我们本篇主要关注的是 Zulip 信息的获取:从频道信息、主题信息,到具体的对话内容。这样可以通过 AI 快速阅读总结相关帖子,阅读过程中也可以随时和 AI 交流。
准备工作
1. 获取 API 凭证
首先,你需要在 Zulip 中创建一个 Bot 账户:
- 登录你的 Zulip 实例
- 进入
Settings
→Your bots
- 点击
Add a new bot
- 选择
Generic bot
,填写必要信息 - 创建后会得到 Bot 的邮箱地址和 API Key
2. 安装依赖
pip install aiohttp asyncio
实现 Zulip API 客户端
下面是一个完整的异步 Zulip API 客户端实现:
import asyncio
import aiohttp
import json
import logging
from typing import List, Dict, Optional, Any
import os
ZULIP_BOT_EMAIL = os.getenv('ZULIP_BOT_EMAIL')
ZULIP_BOT_API_KEY = os.getenv('ZULIP_BOT_API_KEY')
ZULIP_SITE = os.getenv('ZULIP_SITE')
class ZulipClient:
def __init__(self,
site_url: Optional[str]=None,
bot_email: Optional[str]=None,
bot_api_key: Optional[str]=None,
logger: Optional[logging.Logger] = None):
"""初始化 Zulip 客户端"""
if site_url is None:
site_url = ZULIP_SITE
if bot_email is None:
bot_email = ZULIP_BOT_EMAIL
if bot_api_key is None:
bot_api_key = ZULIP_BOT_API_KEY
if not all([site_url, bot_email, bot_api_key]):
raise ValueError("site_url, bot_email, 和 bot_api_key 不能为空")
self.base_url = site_url.rstrip('/') + "/api/v1"
self._auth = aiohttp.BasicAuth(login=bot_email, password=bot_api_key)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self._session: Optional[aiohttp.ClientSession] = None
self._closed = False # 添加状态标记
async def __aenter__(self):
"""进入异步上下文,创建并存储 aiohttp session"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
self.logger.info("Created new aiohttp session")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出异步上下文,关闭 aiohttp session"""
await self.close()
async def close(self):
"""显式关闭客户端"""
if self._session and not self._session.closed:
await self._session.close()
self.logger.info("Closed aiohttp session")
self._session = None
self._closed = True
def shutdown(self):
"""同步方法关闭客户端"""
if not self._closed:
self._run_async(self.close())
async def _ensure_session(self):
"""确保 session 可用"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
self.logger.info("Created new aiohttp session")
return self._session
async def _request(self, method: str, endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None) -> Optional[Dict]:
"""基础请求方法"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")
session = await self._ensure_session()
url = f"{self.base_url}{endpoint}"
try:
async with session.request(method, url, params=params,
data=data, auth=self._auth) as resp:
if resp.status == 200:
return await resp.json()
self.logger.warning(f"请求失败: {resp.status} - {url}")
return None
except Exception as e:
self.logger.error(f"请求异常: {e}")
return None
def __del__(self):
"""析构函数"""
if not self._closed:
self.shutdown()
def _run_async(self, coro):
"""运行异步代码的同步包装器"""
try:
asyncio.get_running_loop()
raise RuntimeError("不能在异步环境中调用同步方法")
except RuntimeError:
return asyncio.run(coro)
async def _get_messages_batch(self, narrow: List[Dict],
anchor: str,
batch_size: int = 100) -> Optional[Dict]:
"""获取一批消息的通用方法"""
params = {
"narrow": json.dumps(narrow),
"anchor": anchor,
"num_before": batch_size,
"num_after": 0
}
return await self._request("GET", "/messages", params=params)
# === 异步公共接口 ===
async def get_channels_async(self) -> List[Dict]:
"""获取频道列表"""
data = await self._request("GET", "/users/me/subscriptions")
return data.get("subscriptions", []) if data else []
async def get_topics_async(self, stream_id: int) -> List[Dict]:
"""获取话题列表"""
data = await self._request("GET", f"/users/me/{stream_id}/topics")
return data.get("topics", []) if data else []
async def get_topic_history_async(self, stream_id: int, topic_name: str,
batch_size: int = 100, latest:bool=True) -> List[Dict[str, Any]]:
"""
获取指定频道和话题的完整消息历史记录。
Args:
stream_id: 频道ID
topic_name: 话题名称
batch_size: 每次请求获取的消息数量,建议不超过1000
Returns:
List[Dict]: 按时间顺序(从旧到新)排列的所有消息列表
"""
self.logger.info(f"开始获取话题 '{topic_name}' (频道 {stream_id}) 的完整历史...")
# 构建查询条件
narrow = [
{"operator": "stream", "operand": stream_id},
{"operator": "topic", "operand": topic_name}
]
all_messages = []
anchor = "newest"
request_count = 0
while True:
request_count += 1
self.logger.debug(f"第 {request_count} 次请求: anchor='{anchor}'")
# 使用基础请求方法获取数据
params = {
"narrow": json.dumps(narrow),
"anchor": anchor,
"num_before": batch_size,
"num_after": 0,
"apply_markdown": 'false'
}
data = await self._request("GET", "/messages", params=params)
if not data or "result" not in data:
self.logger.error(f"获取消息批次 #{request_count} 失败")
return []
messages = data.get("messages", [])
if not messages:
self.logger.debug("收到空消息批次,假定已达到最早消息")
break
# 将新消息添加到列表前面
all_messages = messages + all_messages
# 检查是否到达最早消息
if data.get("found_oldest", False):
self.logger.debug("已确认达到最早消息")
break
# 更新锚点为当前批次最早消息的ID
anchor = str(messages[0]["id"])
# 添加请求延迟
await asyncio.sleep(0.5)
# 安全检查:限制最大请求次数
if request_count >= 1000:
self.logger.warning("达到最大请求次数限制,返回可能不完整的结果")
break
self.logger.info(f"已完成话题历史获取:共 {len(all_messages)} 条消息,用了 {request_count} 次请求")
# 按时间顺序排序
messages = sorted(all_messages, key=lambda x: x["id"])
if latest:
msgs, msg_ids = [], set()
for msg in messages:
msg_id = msg['id']
if msg_id in msg_ids:
msgs[-1] = msg
else:
msg_ids.add(msg_id)
msgs.append(msg)
self.logger.info(f"去重后:共 {len(msgs)} 条消息")
return msgs
else:
return messages
# === 同步公共接口 ===
def get_channels(self) -> List[Dict]:
"""同步获取频道列表"""
return self._run_async(self.get_channels_async())
def get_topics(self, stream_id: int) -> List[Dict]:
"""同步获取话题列表"""
return self._run_async(self.get_topics_async(stream_id))
def get_topic_history(self, stream_id: int, topic_name: str,
batch_size: int = 100, latest:bool=True) -> List[Dict]:
"""同步获取话题完整历史"""
return self._run_async(
self.get_topic_history_async(stream_id, topic_name, batch_size, latest)
)
使用示例
基础用法
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
# 初始化客户端
client = ZulipClient(
site_url="https://your-zulip-instance.zulipchat.com",
bot_email="your-bot@yourdomain.com",
bot_api_key="your-api-key"
)
# 获取频道列表
channels = client.get_channels()
print(f"找到 {len(channels)} 个频道")
for channel in channels[:5]: # 显示前5个频道
print(f"- {channel['name']} (ID: {channel['stream_id']})")
异步用法
import asyncio
async def main():
async with ZulipClient(site_url, bot_email, bot_api_key) as client:
# 获取频道
channels = await client.get_channels_async()
if channels:
stream_id = channels[0]['stream_id']
# 获取话题
topics = await client.get_topics_async(stream_id)
print(f"频道 {channels[0]['name']} 有 {len(topics)} 个话题")
if topics:
# 获取第一个话题的历史消息
topic_name = topics[0]['name']
messages = await client.get_topic_history_async(stream_id, topic_name)
print(f"话题 '{topic_name}' 有 {len(messages)} 条消息")
# 显示最新的几条消息
for msg in messages[-3:]:
print(f"[{msg['sender_full_name']}]: {msg['content'][:100]}...")
# 运行异步代码
asyncio.run(main())
通过 Zulip API,我们可以轻松地获取和处理 Zulip 中的聊天数据。这为构建 AI 辅助阅读、自动摘要、智能问答等应用奠定了基础。记住,技术的价值在于如何负责任地使用它来改善我们的工作和学习体验。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)