Telegram-Bot-搭建与对接完全指南-看一次就懂了
Telegram Bot 是运行在 Telegram 平台上的自动化程序。它们可以响应用户的消息、执行命令、发送通知、处理支付,甚至可以作为完整的应用程序运行。与其他即时通讯平台的机器人相比,Telegram Bot 拥有以下独特优势:Telegram Bot 的核心工作流程如下:你的服务器与 Telegram 服务器之间的通信有两种模式:1.3 Bot API 的核心概念在开始编码之前,你需要理
🤖 Telegram Bot 搭建与对接完全指南:从零到生产级部署
作者:万先生 | 最后更新:2026年2月 公司官网:https://hongshuotech.xyz/zh
上一期的tg搭建教程很火,特意二次更新
本文将从最基础的概念开始,手把手教你搭建一个功能完善的 Telegram Bot,并对接各种第三方服务。无论你是初学者还是有一定经验的开发者,都能在本文中找到有价值的内容。全文约 15000+ 字,建议收藏后慢慢阅读。
📑 目录
- 第一章:Telegram Bot 基础概念
- 第二章:环境准备与工具安装
- 第三章:创建你的第一个 Bot
- 第四章:Bot 开发框架选型
- 第五章:Python 版本完整实战
- 第六章:Node.js 版本完整实战
- 第七章:Webhook 与 Polling 模式详解
- 第八章:对接数据库
- 第九章:对接 AI 服务(OpenAI / Claude)
- 第十章:对接支付系统
- 第十一章:对接消息队列与任务调度
- 第十二章:部署方案全解析
- 第十三章:安全加固与最佳实践
- 第十四章:监控、日志与运维
- 第十五章:常见问题与排错指南
- 附录:完整项目结构参考
第一章:Telegram Bot 基础概念
1.1 什么是 Telegram Bot?
Telegram Bot 是运行在 Telegram 平台上的自动化程序。它们可以响应用户的消息、执行命令、发送通知、处理支付,甚至可以作为完整的应用程序运行。与其他即时通讯平台的机器人相比,Telegram Bot 拥有以下独特优势:
- 完全免费的 API:Telegram 不对 Bot API 的调用收取任何费用
- 丰富的消息类型:支持文本、图片、视频、文件、位置、联系人、投票等多种消息格式
- Inline Mode:用户可以在任何聊天中直接调用你的 Bot
- 支付集成:原生支持多种支付渠道
- Web App:可以在 Bot 中嵌入完整的 Web 应用
- 无需审核:创建 Bot 即可立即使用,无需等待平台审核
1.2 Bot 的工作原理
Telegram Bot 的核心工作流程如下:
用户发送消息 → Telegram 服务器 → Bot API → 你的服务器 → 处理逻辑 → Bot API → Telegram 服务器 → 用户收到回复
你的服务器与 Telegram 服务器之间的通信有两种模式:
- Polling(轮询):你的服务器主动向 Telegram 服务器请求新消息
- Webhook(推送):Telegram 服务器在有新消息时主动推送到你的服务器
┌─────────────────────────────────────────────────┐
│ Polling 模式 │
│ │
│ 你的服务器 ──请求──> Telegram ──返回消息──> 你的服务器 │
│ ↑ │ │
│ └──────────── 循环请求 ────────────────┘ │
└─────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────┐
│ Webhook 模式 │
│ │
│ 用户发消息 → Telegram ──推送──> 你的服务器(HTTPS) │
│ ←──响应── │
└─────────────────────────────────────────────────┘
1.3 Bot API 的核心概念
在开始编码之前,你需要理解以下核心概念:
Update(更新):Bot 收到的每一条新消息、回调查询、编辑等事件都是一个 Update 对象。每个 Update 有唯一的 update_id。
Chat(聊天):可以是私聊(private)、群组(group)、超级群组(supergroup)或频道(channel)。每个聊天都有唯一的 chat_id。
Message(消息):用户发送的内容,包含消息文本、发送者信息、时间戳等。
Callback Query(回调查询):用户点击 Inline Keyboard 按钮时触发的事件。
Command(命令):以 / 开头的特殊消息,如 /start、/help。
第二章:环境准备与工具安装
2.1 选择你的服务器
搭建 Telegram Bot 首先需要一台服务器。以下是常见方案对比:
| 方案 | 优点 | 缺点 | 适合场景 | 月成本 |
|---|---|---|---|---|
| 自有 VPS | 完全控制,灵活 | 需要运维 | 正式项目 | $5-20 |
| 云函数(Serverless) | 免运维,按量付费 | 冷启动延迟 | 轻量级 Bot | $0-5 |
| 本地开发机 | 免费,方便调试 | 需要内网穿透 | 开发测试 | $0 |
| Docker 容器 | 可移植,环境一致 | 需要容器知识 | 团队协作 | 取决于宿主机 |
| Heroku / Railway | 部署简单 | 免费额度有限 | 个人项目 | $0-7 |
推荐方案:对于初学者,建议使用一台 VPS(如 Vultr、DigitalOcean、Hetzner)配合 Docker 部署。
2.2 服务器基础环境配置
假设你已经有了一台 Ubuntu 22.04 / 24.04 的 VPS,按以下步骤配置基础环境:
# 1. 更新系统
sudo apt update && sudo apt upgrade -y
# 2. 安装基础工具
sudo apt install -y curl wget git vim htop unzip software-properties-common
# 3. 配置防火墙(重要!)
sudo ufw allow OpenSSH
sudo ufw allow 443/tcp # Webhook 需要 HTTPS
sudo ufw allow 80/tcp # 用于 Let's Encrypt 证书验证
sudo ufw enable
# 4. 创建专用用户(不要用 root 运行 Bot)
sudo adduser botuser
sudo usermod -aG sudo botuser
su - botuser
2.3 安装 Python 环境
# 安装 Python 3.11+
sudo apt install -y python3 python3-pip python3-venv
# 验证版本
python3 --version # 应该显示 3.11.x 或更高
# 创建项目目录
mkdir -p ~/telegram-bot && cd ~/telegram-bot
# 创建虚拟环境(强烈推荐)
python3 -m venv venv
source venv/bin/activate
# 安装核心依赖
pip install python-telegram-bot[all] # 官方推荐的 Python 库
pip install aiohttp # 异步 HTTP 请求
pip install python-dotenv # 环境变量管理
2.4 安装 Node.js 环境
# 使用 nvm 安装(推荐方式)
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
source ~/.bashrc
# 安装 Node.js LTS
nvm install --lts
nvm use --lts
# 验证
node --version # 应该显示 v20.x 或更高
npm --version
# 初始化项目
mkdir -p ~/telegram-bot-node && cd ~/telegram-bot-node
npm init -y
# 安装核心依赖
npm install telegraf # 最流行的 Node.js Bot 框架
npm install dotenv # 环境变量
npm install node-fetch # HTTP 请求
2.5 安装 Docker(可选但推荐)
# 安装 Docker
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $USER
newgrp docker
# 安装 Docker Compose
sudo apt install -y docker-compose-plugin
# 验证
docker --version
docker compose version
第三章:创建你的第一个 Bot
3.1 通过 BotFather 创建 Bot
BotFather 是 Telegram 官方的"机器人之父",所有 Bot 的创建和管理都通过它完成。
操作步骤:
- 在 Telegram 中搜索
@BotFather并打开对话 - 发送
/newbot命令 - 输入你的 Bot 名称(显示名称,可以包含中文和空格)
- 输入你的 Bot 用户名(必须以
bot结尾,如my_awesome_bot) - BotFather 会返回你的 API Token
✅ 成功创建 Bot!
Bot 名称: My Awesome Bot
用户名: @my_awesome_bot
Token: 7123456789:AAH1234567890abcdefghijklmnopqrstuv
⚠️ 请务必妥善保管你的 Token!
任何拥有 Token 的人都可以完全控制你的 Bot。
安全提示:永远不要在代码中硬编码 Token,也不要将其提交到 Git 仓库。使用环境变量来管理。
3.2 配置 Bot 的基本信息
创建完 Bot 后,你可以通过 BotFather 进一步配置:
/setdescription - 设置 Bot 的描述信息(搜索结果中显示)
/setabouttext - 设置 Bot 的 "关于" 信息
/setuserpic - 设置 Bot 的头像
/setcommands - 设置 Bot 的命令列表(命令菜单)
/setprivacy - 设置隐私模式(群组中是否只接收 @ 和命令)
/setjoingroups - 设置是否允许被添加到群组
/setinline - 启用 Inline 模式
设置命令列表示例:
向 BotFather 发送 /setcommands,然后输入:
start - 开始使用
help - 帮助信息
settings - 设置
status - 查看状态
subscribe - 订阅通知
unsubscribe - 取消订阅
3.3 使用环境变量管理配置
创建 .env 文件:
# .env 文件(绝对不要提交到 Git!)
BOT_TOKEN=7123456789:AAH1234567890abcdefghijklmnopqrstuv
BOT_USERNAME=my_awesome_bot
ADMIN_CHAT_ID=123456789
DATABASE_URL=postgresql://user:pass@localhost:5432/botdb
REDIS_URL=redis://localhost:6379
WEBHOOK_URL=https://bot.yourdomain.com
LOG_LEVEL=INFO
创建 .gitignore 文件:
.env
*.pyc
__pycache__/
venv/
node_modules/
*.log
第四章:Bot 开发框架选型
4.1 Python 框架对比
| 框架 | Stars | 特点 | 推荐度 |
|---|---|---|---|
| python-telegram-bot | 26k+ | 官方推荐,功能完整,异步支持 | ⭐⭐⭐⭐⭐ |
| aiogram | 5k+ | 纯异步,性能优秀,现代设计 | ⭐⭐⭐⭐⭐ |
| pyTelegramBotAPI | 8k+ | 简单易用,适合入门 | ⭐⭐⭐⭐ |
| Pyrogram | 4k+ | MTProto 协议,功能更底层 | ⭐⭐⭐ |
推荐选择:
- 新手入门:
python-telegram-bot(文档最全,社区最大) - 高性能场景:
aiogram(纯异步,设计更现代) - 快速原型:
pyTelegramBotAPI(API 最简洁)
4.2 Node.js 框架对比
| 框架 | Stars | 特点 | 推荐度 |
|---|---|---|---|
| Telegraf | 8k+ | 中间件架构,类似 Koa | ⭐⭐⭐⭐⭐ |
| grammY | 2k+ | TypeScript 优先,现代设计 | ⭐⭐⭐⭐⭐ |
| node-telegram-bot-api | 8k+ | 轻量,低级别封装 | ⭐⭐⭐⭐ |
推荐选择:
- JavaScript 项目:
Telegraf(生态成熟,插件丰富) - TypeScript 项目:
grammY(类型安全,文档优秀)
4.3 其他语言方案
如果你使用其他语言,以下是各语言的推荐库:
- Go:
telebot(gopkg.in/telebot.v3) - Rust:
teloxide - Java/Kotlin:
TelegramBots - C#:
Telegram.Bot - PHP:
telegram-bot-sdk
第五章:Python 版本完整实战
5.1 基础 Bot 骨架
# bot.py - 基础版本
import os
import logging
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
ConversationHandler,
filters,
ContextTypes,
)
# 加载环境变量
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
# 配置日志
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ==================== 命令处理器 ====================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /start 命令"""
user = update.effective_user
welcome_text = (
f"👋 你好,{user.first_name}!\n\n"
f"欢迎使用我们的 Bot!\n\n"
f"可用命令:\n"
f"/help - 查看帮助\n"
f"/settings - 修改设置\n"
f"/status - 查看状态"
)
# 创建 Inline Keyboard
keyboard = [
[
InlineKeyboardButton("📚 使用教程", callback_data="tutorial"),
InlineKeyboardButton("⚙️ 设置", callback_data="settings"),
],
[
InlineKeyboardButton("💬 联系我们", url="https://t.me/your_support"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(welcome_text, reply_markup=reply_markup)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /help 命令"""
help_text = (
"📖 *帮助中心*\n\n"
"以下是我可以帮你做的事情:\n\n"
"🔹 `/start` \\- 重新开始\n"
"🔹 `/help` \\- 查看本帮助\n"
"🔹 `/settings` \\- 个人设置\n"
"🔹 `/status` \\- 系统状态\n"
"🔹 `/subscribe` \\- 订阅通知\n\n"
"直接发送文字消息,我也会回复哦!"
)
await update.message.reply_text(help_text, parse_mode="MarkdownV2")
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /status 命令"""
import psutil
import time
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
uptime = time.time() - context.bot_data.get("start_time", time.time())
hours = int(uptime // 3600)
minutes = int((uptime % 3600) // 60)
status_text = (
"📊 *系统状态*\n\n"
f"🖥️ CPU 使用率: {cpu}%\n"
f"💾 内存使用率: {memory}%\n"
f"⏱️ 运行时间: {hours}小时{minutes}分钟\n"
f"🤖 Bot 状态: 正常运行中 ✅"
)
await update.message.reply_text(status_text, parse_mode="Markdown")
# ==================== 消息处理器 ====================
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理普通文本消息"""
text = update.message.text
user = update.effective_user
logger.info(f"收到来自 {user.first_name}({user.id}) 的消息: {text}")
# 简单的关键词回复
if "你好" in text or "hello" in text.lower():
await update.message.reply_text(f"你好呀,{user.first_name}!有什么可以帮你的? 😊")
elif "谢谢" in text:
await update.message.reply_text("不客气!随时为你服务 🤗")
else:
await update.message.reply_text(
f"收到你的消息:「{text}」\n\n"
"如需帮助,请使用 /help 查看功能列表。"
)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理图片消息"""
photo = update.message.photo[-1] # 获取最高分辨率的图片
file = await photo.get_file()
file_path = f"downloads/{photo.file_id}.jpg"
await file.download_to_drive(file_path)
await update.message.reply_text(
f"📷 收到你的图片!\n"
f"文件大小: {photo.file_size} bytes\n"
f"尺寸: {photo.width}x{photo.height}"
)
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文件消息"""
document = update.message.document
await update.message.reply_text(
f"📄 收到文件:{document.file_name}\n"
f"类型: {document.mime_type}\n"
f"大小: {document.file_size} bytes"
)
# ==================== 回调处理器 ====================
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 Inline Keyboard 按钮点击"""
query = update.callback_query
await query.answer() # 必须回应回调查询
if query.data == "tutorial":
await query.edit_message_text(
"📚 *使用教程*\n\n"
"1️⃣ 首先,发送 /start 开始\n"
"2️⃣ 然后,选择你需要的功能\n"
"3️⃣ 按照提示操作即可\n\n"
"就是这么简单!",
parse_mode="Markdown"
)
elif query.data == "settings":
keyboard = [
[InlineKeyboardButton("🌍 语言设置", callback_data="lang")],
[InlineKeyboardButton("🔔 通知设置", callback_data="notify")],
[InlineKeyboardButton("🔙 返回", callback_data="back")],
]
await query.edit_message_text(
"⚙️ *设置*\n\n请选择要修改的设置项:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="Markdown"
)
elif query.data == "back":
await start(update, context)
# ==================== 错误处理 ====================
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE):
"""全局错误处理"""
logger.error(f"Update {update} caused error {context.error}")
# 通知管理员
admin_id = os.getenv("ADMIN_CHAT_ID")
if admin_id:
await context.bot.send_message(
chat_id=admin_id,
text=f"⚠️ Bot 出错了!\n\n错误信息: {context.error}"
)
# ==================== 主函数 ====================
def main():
"""启动 Bot"""
import time
app = Application.builder().token(BOT_TOKEN).build()
# 记录启动时间
app.bot_data["start_time"] = time.time()
# 注册处理器
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(CommandHandler("status", status))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
# 注册错误处理器
app.add_error_handler(error_handler)
# 启动 Bot
logger.info("Bot 启动中...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
5.2 进阶:ConversationHandler(对话流程)
ConversationHandler 是实现多步交互的核心组件。以下示例实现了一个用户注册流程:
# conversation.py - 对话流程示例
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import (
ConversationHandler,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
# 定义对话状态
NAME, AGE, LOCATION, CONFIRM = range(4)
async def register_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""开始注册流程"""
await update.message.reply_text(
"📝 开始注册流程\n\n"
"请输入你的名字(发送 /cancel 可随时取消):"
)
return NAME
async def get_name(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""获取用户名字"""
context.user_data["name"] = update.message.text
await update.message.reply_text(
f"好的,{update.message.text}!\n\n请输入你的年龄:"
)
return AGE
async def get_age(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""获取用户年龄"""
try:
age = int(update.message.text)
if age < 1 or age > 150:
raise ValueError
context.user_data["age"] = age
except ValueError:
await update.message.reply_text("请输入有效的年龄(1-150):")
return AGE
keyboard = [["🌏 亚洲", "🌍 欧洲"], ["🌎 美洲", "🌏 其他"]]
await update.message.reply_text(
"请选择你的所在地区:",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True)
)
return LOCATION
async def get_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""获取用户地区"""
context.user_data["location"] = update.message.text
# 显示确认信息
data = context.user_data
confirm_text = (
"📋 *请确认你的注册信息:*\n\n"
f"姓名: {data['name']}\n"
f"年龄: {data['age']}\n"
f"地区: {data['location']}\n\n"
"确认注册请发送 /confirm\n"
"重新填写请发送 /cancel"
)
await update.message.reply_text(
confirm_text,
reply_markup=ReplyKeyboardRemove(),
parse_mode="Markdown"
)
return CONFIRM
async def confirm_registration(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""确认注册"""
data = context.user_data
# 这里可以将数据保存到数据库
# await db.save_user(update.effective_user.id, data)
await update.message.reply_text(
"✅ 注册成功!\n\n"
f"欢迎你,{data['name']}!"
)
context.user_data.clear()
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""取消注册"""
context.user_data.clear()
await update.message.reply_text(
"❌ 已取消注册。",
reply_markup=ReplyKeyboardRemove()
)
return ConversationHandler.END
# 创建 ConversationHandler
registration_handler = ConversationHandler(
entry_points=[CommandHandler("register", register_start)],
states={
NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_name)],
AGE: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_age)],
LOCATION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_location)],
CONFIRM: [CommandHandler("confirm", confirm_registration)],
},
fallbacks=[CommandHandler("cancel", cancel)],
conversation_timeout=300, # 5分钟超时
)
5.3 进阶:中间件与装饰器
# middleware.py - 权限控制与限流
import time
from functools import wraps
from collections import defaultdict
# 管理员权限检查装饰器
def admin_only(func):
@wraps(func)
async def wrapper(update, context):
admin_ids = [int(id) for id in os.getenv("ADMIN_IDS", "").split(",") if id]
if update.effective_user.id not in admin_ids:
await update.message.reply_text("⛔ 你没有权限执行此操作。")
return
return await func(update, context)
return wrapper
# 频率限制装饰器
def rate_limit(max_calls: int, period: int):
"""限制用户调用频率"""
user_calls = defaultdict(list)
def decorator(func):
@wraps(func)
async def wrapper(update, context):
user_id = update.effective_user.id
now = time.time()
# 清理过期记录
user_calls[user_id] = [
t for t in user_calls[user_id] if now - t < period
]
if len(user_calls[user_id]) >= max_calls:
remaining = int(period - (now - user_calls[user_id][0]))
await update.message.reply_text(
f"⏳ 操作太频繁,请 {remaining} 秒后再试。"
)
return
user_calls[user_id].append(now)
return await func(update, context)
return wrapper
return decorator
# 使用示例
@admin_only
async def admin_broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""管理员群发消息"""
# ... 群发逻辑
pass
@rate_limit(max_calls=5, period=60)
async def ai_chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""AI 聊天(每分钟最多5次)"""
# ... AI 对接逻辑
pass
第六章:Node.js 版本完整实战
6.1 Telegraf 基础 Bot
// bot.js - Telegraf 版本
require('dotenv').config();
const { Telegraf, Markup, session } = require('telegraf');
const bot = new Telegraf(process.env.BOT_TOKEN);
// 使用 session 中间件
bot.use(session());
// 日志中间件
bot.use(async (ctx, next) => {
const start = Date.now();
await next();
const ms = Date.now() - start;
console.log(`[${new Date().toISOString()}] ${ctx.updateType} from ${ctx.from?.id} - ${ms}ms`);
});
// /start 命令
bot.start(async (ctx) => {
const name = ctx.from.first_name;
await ctx.reply(
`👋 你好,${name}!欢迎使用我们的 Bot!\n\n可用命令:\n/help - 帮助\n/settings - 设置`,
Markup.inlineKeyboard([
[
Markup.button.callback('📚 教程', 'tutorial'),
Markup.button.callback('⚙️ 设置', 'settings'),
],
[Markup.button.url('💬 联系我们', 'https://t.me/your_support')],
])
);
});
// /help 命令
bot.help(async (ctx) => {
await ctx.replyWithMarkdown(
'*📖 帮助中心*\n\n' +
'🔹 /start - 重新开始\n' +
'🔹 /help - 查看帮助\n' +
'🔹 /settings - 个人设置\n' +
'🔹 /status - 系统状态'
);
});
// 回调查询处理
bot.action('tutorial', async (ctx) => {
await ctx.answerCbQuery();
await ctx.editMessageText(
'📚 *使用教程*\n\n1️⃣ 发送 /start 开始\n2️⃣ 选择功能\n3️⃣ 按提示操作',
{ parse_mode: 'Markdown' }
);
});
bot.action('settings', async (ctx) => {
await ctx.answerCbQuery();
await ctx.editMessageText(
'⚙️ *设置*\n\n请选择:',
{
parse_mode: 'Markdown',
...Markup.inlineKeyboard([
[Markup.button.callback('🌍 语言', 'lang')],
[Markup.button.callback('🔔 通知', 'notify')],
[Markup.button.callback('🔙 返回', 'back')],
]),
}
);
});
// 文本消息处理
bot.on('text', async (ctx) => {
const text = ctx.message.text;
if (text.includes('你好') || text.toLowerCase().includes('hello')) {
await ctx.reply(`你好呀!😊`);
} else {
await ctx.reply(`收到你的消息:「${text}」\n\n使用 /help 查看功能列表。`);
}
});
// 图片处理
bot.on('photo', async (ctx) => {
const photo = ctx.message.photo.pop();
await ctx.reply(
`📷 收到图片!\n尺寸: ${photo.width}x${photo.height}\n大小: ${photo.file_size} bytes`
);
});
// 错误处理
bot.catch((err, ctx) => {
console.error(`Error for ${ctx.updateType}:`, err);
});
// 启动
bot.launch()
.then(() => console.log('🤖 Bot 已启动!'))
.catch(err => console.error('启动失败:', err));
// 优雅退出
process.once('SIGINT', () => bot.stop('SIGINT'));
process.once('SIGTERM', () => bot.stop('SIGTERM'));
6.2 grammY(TypeScript)版本
// bot.ts - grammY 版本
import { Bot, Context, session, InlineKeyboard, GrammyError, HttpError } from "grammy";
import { type Conversation, type ConversationFlavor, conversations, createConversation } from "@grammyjs/conversations";
// 定义 Context 类型
type MyContext = Context & ConversationFlavor;
type MyConversation = Conversation<MyContext>;
// 创建 Bot 实例
const bot = new Bot<MyContext>(process.env.BOT_TOKEN!);
// 中间件
bot.use(session({ initial: () => ({}) }));
bot.use(conversations());
// 注册对话
async function registration(conversation: MyConversation, ctx: MyContext) {
await ctx.reply("📝 请输入你的名字:");
const nameCtx = await conversation.wait();
const name = nameCtx.message?.text;
await ctx.reply("请输入你的邮箱:");
const emailCtx = await conversation.wait();
const email = emailCtx.message?.text;
await ctx.reply(
`✅ 注册成功!\n\n姓名: ${name}\n邮箱: ${email}`
);
}
bot.use(createConversation(registration));
// 命令
bot.command("start", async (ctx) => {
const keyboard = new InlineKeyboard()
.text("📚 教程", "tutorial")
.text("⚙️ 设置", "settings")
.row()
.url("💬 联系我们", "https://t.me/your_support");
await ctx.reply(`👋 欢迎,${ctx.from?.first_name}!`, {
reply_markup: keyboard,
});
});
bot.command("register", async (ctx) => {
await ctx.conversation.enter("registration");
});
// 错误处理
bot.catch((err) => {
const ctx = err.ctx;
console.error(`Error while handling update ${ctx.update.update_id}:`);
const e = err.error;
if (e instanceof GrammyError) {
console.error("Error in request:", e.description);
} else if (e instanceof HttpError) {
console.error("Could not contact Telegram:", e);
} else {
console.error("Unknown error:", e);
}
});
bot.start();
第七章:Webhook 与 Polling 模式详解
7.1 两种模式的选择
| 特性 | Polling | Webhook |
|---|---|---|
| 实现难度 | ⭐ 简单 | ⭐⭐⭐ 需要 HTTPS |
| 实时性 | 略有延迟 | 接近实时 |
| 资源消耗 | 持续占用连接 | 按需触发 |
| 调试便利 | ✅ 本地直接运行 | ❌ 需要公网地址 |
| 适合场景 | 开发、小型项目 | 生产环境 |
| 可靠性 | 依赖网络连接 | 更稳定 |
结论:开发阶段使用 Polling,生产环境使用 Webhook。
7.2 Webhook 搭建详细步骤
方式一:Nginx 反向代理 + Let’s Encrypt
# 1. 安装 Nginx 和 Certbot
sudo apt install -y nginx certbot python3-certbot-nginx
# 2. 配置 Nginx
sudo vim /etc/nginx/sites-available/bot
# /etc/nginx/sites-available/bot
server {
listen 80;
server_name bot.yourdomain.com;
location / {
return 301 https://$server_name$request_uri;
}
}
server {
listen 443 ssl;
server_name bot.yourdomain.com;
# SSL 证书(由 Certbot 自动配置)
ssl_certificate /etc/letsencrypt/live/bot.yourdomain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/bot.yourdomain.com/privkey.pem;
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
# Bot Webhook 端点
location /webhook {
proxy_pass http://127.0.0.1:8443;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# 健康检查端点
location /health {
proxy_pass http://127.0.0.1:8443;
}
}
# 3. 启用站点和获取证书
sudo ln -s /etc/nginx/sites-available/bot /etc/nginx/sites-enabled/
sudo certbot --nginx -d bot.yourdomain.com
sudo nginx -t && sudo systemctl reload nginx
方式二:使用 Caddy(更简单的替代方案)
# Caddyfile
bot.yourdomain.com {
reverse_proxy /webhook localhost:8443
reverse_proxy /health localhost:8443
}
Caddy 会自动获取和续期 SSL 证书,非常方便。
7.3 Python Webhook 代码
# webhook.py - Webhook 模式启动
import os
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application
load_dotenv()
async def main():
app = Application.builder().token(os.getenv("BOT_TOKEN")).build()
# ... 注册各种 handler(同之前的代码)
# 设置 Webhook
webhook_url = os.getenv("WEBHOOK_URL") # https://bot.yourdomain.com/webhook
secret_token = os.getenv("WEBHOOK_SECRET", "your-secret-token")
await app.bot.set_webhook(
url=f"{webhook_url}/webhook",
secret_token=secret_token,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True, # 忽略离线期间的消息
)
# 启动 Webhook 服务器
await app.run_webhook(
listen="0.0.0.0",
port=8443,
url_path="/webhook",
secret_token=secret_token,
webhook_url=f"{webhook_url}/webhook",
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
7.4 本地开发使用 ngrok
在本地开发时,可以使用 ngrok 创建临时隧道来测试 Webhook:
# 安装 ngrok
curl -s https://ngrok-agent.s3.amazonaws.com/ngrok.asc | \
sudo tee /etc/apt/trusted.gpg.d/ngrok.asc >/dev/null && \
echo "deb https://ngrok-agent.s3.amazonaws.com buster main" | \
sudo tee /etc/apt/sources.list.d/ngrok.list && \
sudo apt update && sudo apt install ngrok
# 配置 auth token
ngrok config add-authtoken YOUR_AUTH_TOKEN
# 启动隧道
ngrok http 8443
# ngrok 会输出类似:
# Forwarding https://abc123.ngrok-free.app -> http://localhost:8443
然后将 ngrok 的 HTTPS 地址设置为 Webhook URL 即可。
第八章:对接数据库
8.1 SQLite(适合小型项目)
# database/sqlite_db.py
import aiosqlite
import os
DB_PATH = os.getenv("DB_PATH", "bot_data.db")
async def init_db():
"""初始化数据库"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
language TEXT DEFAULT 'zh',
is_premium BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
message_text TEXT,
message_type TEXT DEFAULT 'text',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
user_id INTEGER PRIMARY KEY,
plan TEXT DEFAULT 'free',
expires_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
await db.commit()
async def upsert_user(user_id: int, username: str, first_name: str):
"""创建或更新用户"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("""
INSERT INTO users (user_id, username, first_name, last_active)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(user_id) DO UPDATE SET
username = excluded.username,
first_name = excluded.first_name,
last_active = CURRENT_TIMESTAMP
""", (user_id, username, first_name))
await db.commit()
async def get_user(user_id: int):
"""获取用户信息"""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
) as cursor:
return await cursor.fetchone()
async def save_message(user_id: int, text: str, msg_type: str = "text"):
"""保存消息记录"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO messages (user_id, message_text, message_type) VALUES (?, ?, ?)",
(user_id, text, msg_type)
)
await db.commit()
async def get_user_stats():
"""获取用户统计"""
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute("SELECT COUNT(*) FROM users") as cursor:
total_users = (await cursor.fetchone())[0]
async with db.execute(
"SELECT COUNT(*) FROM users WHERE last_active > datetime('now', '-1 day')"
) as cursor:
active_users = (await cursor.fetchone())[0]
async with db.execute("SELECT COUNT(*) FROM messages") as cursor:
total_messages = (await cursor.fetchone())[0]
return {
"total_users": total_users,
"active_users": active_users,
"total_messages": total_messages,
}
8.2 PostgreSQL(推荐生产环境)
# database/pg_db.py
import asyncpg
import os
class Database:
def __init__(self):
self.pool = None
async def connect(self):
"""创建连接池"""
self.pool = await asyncpg.create_pool(
dsn=os.getenv("DATABASE_URL"),
min_size=5,
max_size=20,
command_timeout=60,
)
await self._init_tables()
async def _init_tables(self):
"""初始化表结构"""
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id BIGINT PRIMARY KEY,
username VARCHAR(255),
first_name VARCHAR(255),
language VARCHAR(10) DEFAULT 'zh',
is_premium BOOLEAN DEFAULT FALSE,
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
last_active TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_users_last_active
ON users(last_active);
CREATE TABLE IF NOT EXISTS messages (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT REFERENCES users(user_id),
chat_id BIGINT,
message_text TEXT,
message_type VARCHAR(50) DEFAULT 'text',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_messages_user
ON messages(user_id, created_at DESC);
""")
async def upsert_user(self, user_id: int, username: str, first_name: str):
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO users (user_id, username, first_name, last_active)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (user_id) DO UPDATE SET
username = EXCLUDED.username,
first_name = EXCLUDED.first_name,
last_active = NOW()
""", user_id, username, first_name)
async def get_user(self, user_id: int):
async with self.pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM users WHERE user_id = $1", user_id
)
async def close(self):
if self.pool:
await self.pool.close()
# 全局数据库实例
db = Database()
8.3 Redis 缓存层
# database/redis_cache.py
import redis.asyncio as redis
import json
import os
class RedisCache:
def __init__(self):
self.redis = None
async def connect(self):
self.redis = redis.from_url(
os.getenv("REDIS_URL", "redis://localhost:6379"),
decode_responses=True,
)
async def get_user_cache(self, user_id: int):
"""获取用户缓存"""
data = await self.redis.get(f"user:{user_id}")
return json.loads(data) if data else None
async def set_user_cache(self, user_id: int, data: dict, ttl: int = 3600):
"""设置用户缓存(默认1小时过期)"""
await self.redis.setex(
f"user:{user_id}",
ttl,
json.dumps(data, ensure_ascii=False),
)
async def increment_counter(self, key: str, ttl: int = 86400):
"""递增计数器"""
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, ttl)
results = await pipe.execute()
return results[0]
async def rate_limit_check(self, user_id: int, limit: int = 10, window: int = 60):
"""滑动窗口限流"""
import time
key = f"rate:{user_id}"
now = time.time()
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, now - window)
pipe.zadd(key, {str(now): now})
pipe.zcard(key)
pipe.expire(key, window)
results = await pipe.execute()
return results[2] <= limit
async def close(self):
if self.redis:
await self.redis.close()
cache = RedisCache()
第九章:对接 AI 服务(OpenAI / Claude)
9.1 对接 OpenAI GPT
# services/openai_service.py
import os
import openai
from typing import List, Dict, Optional
client = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 用户对话历史存储
user_conversations: Dict[int, List[Dict]] = {}
MAX_HISTORY = 20 # 最大对话轮数
async def chat_with_gpt(
user_id: int,
message: str,
system_prompt: Optional[str] = None
) -> str:
"""与 GPT 对话"""
# 获取用户对话历史
if user_id not in user_conversations:
user_conversations[user_id] = []
history = user_conversations[user_id]
# 构建消息列表
messages = []
# 系统提示
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
else:
messages.append({
"role": "system",
"content": (
"你是一个友好的 Telegram Bot 助手。"
"用简洁、友好的语气回复用户。"
"如果用户使用中文,用中文回复。"
"回复应该简短,适合聊天场景。"
)
})
# 添加历史消息
messages.extend(history[-MAX_HISTORY:])
# 添加当前消息
messages.append({"role": "user", "content": message})
try:
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1000,
temperature=0.7,
)
reply = response.choices[0].message.content
# 更新对话历史
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": reply})
# 限制历史长度
if len(history) > MAX_HISTORY * 2:
user_conversations[user_id] = history[-MAX_HISTORY * 2:]
return reply
except openai.RateLimitError:
return "⏳ AI 服务繁忙,请稍后再试。"
except openai.APIError as e:
return f"❌ AI 服务出错: {str(e)}"
def clear_history(user_id: int):
"""清除用户对话历史"""
user_conversations.pop(user_id, None)
9.2 对接 Anthropic Claude
# services/claude_service.py
import os
import anthropic
from typing import List, Dict, Optional
client = anthropic.AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
user_conversations: Dict[int, List[Dict]] = {}
MAX_HISTORY = 20
async def chat_with_claude(
user_id: int,
message: str,
system_prompt: Optional[str] = None
) -> str:
"""与 Claude 对话"""
if user_id not in user_conversations:
user_conversations[user_id] = []
history = user_conversations[user_id]
system = system_prompt or (
"你是一个友好的 Telegram Bot 助手。"
"用简洁、友好的语气回复用户。"
"回复应该简短,适合聊天场景。"
)
messages = history[-MAX_HISTORY:] + [{"role": "user", "content": message}]
try:
response = await client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=1000,
system=system,
messages=messages,
)
reply = response.content[0].text
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": reply})
if len(history) > MAX_HISTORY * 2:
user_conversations[user_id] = history[-MAX_HISTORY * 2:]
return reply
except anthropic.RateLimitError:
return "⏳ AI 服务繁忙,请稍后再试。"
except anthropic.APIError as e:
return f"❌ AI 服务出错: {str(e)}"
9.3 在 Bot 中集成 AI 对话
# 在 bot.py 中添加
from services.openai_service import chat_with_gpt, clear_history
# 或者 from services.claude_service import chat_with_claude, clear_history
async def ai_chat_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""AI 对话处理"""
user_id = update.effective_user.id
message = update.message.text
# 发送"正在输入"状态
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action="typing"
)
# 调用 AI
reply = await chat_with_gpt(user_id, message)
# 或者: reply = await chat_with_claude(user_id, message)
# 发送回复(处理超长消息)
if len(reply) > 4096:
# Telegram 单条消息最长 4096 字符
for i in range(0, len(reply), 4096):
await update.message.reply_text(reply[i:i+4096])
else:
await update.message.reply_text(reply)
async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/clear 命令 - 清除对话历史"""
clear_history(update.effective_user.id)
await update.message.reply_text("🗑️ 对话历史已清除!")
# 注册 handler
# app.add_handler(CommandHandler("clear", clear_context))
# app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, ai_chat_handler))
9.4 多 AI 模型切换
# services/ai_router.py - AI 路由器
from enum import Enum
from services.openai_service import chat_with_gpt
from services.claude_service import chat_with_claude
class AIModel(Enum):
GPT4 = "gpt-4o"
CLAUDE = "claude-sonnet"
# 用户模型偏好
user_model_prefs: Dict[int, AIModel] = {}
async def ai_chat(user_id: int, message: str) -> str:
"""根据用户偏好路由到不同 AI"""
model = user_model_prefs.get(user_id, AIModel.GPT4)
if model == AIModel.GPT4:
return await chat_with_gpt(user_id, message)
elif model == AIModel.CLAUDE:
return await chat_with_claude(user_id, message)
async def switch_model(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换 AI 模型"""
keyboard = [
[InlineKeyboardButton("🤖 GPT-4o", callback_data="model_gpt4")],
[InlineKeyboardButton("🧠 Claude", callback_data="model_claude")],
]
await update.message.reply_text(
"请选择 AI 模型:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
第十章:对接支付系统
10.1 Telegram 内置支付
Telegram Bot 原生支持通过 Stripe、YooMoney 等支付渠道收款:
# payments/telegram_pay.py
from telegram import LabeledPrice, ShippingOption
PAYMENT_PROVIDER_TOKEN = os.getenv("PAYMENT_PROVIDER_TOKEN")
async def send_invoice(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""发送支付发票"""
prices = [
LabeledPrice("月度会员", 999), # 金额单位:分
LabeledPrice("新用户折扣", -200), # 负数为折扣
]
await context.bot.send_invoice(
chat_id=update.effective_chat.id,
title="Bot 高级会员",
description="解锁全部高级功能,包括无限 AI 对话、优先响应等",
payload="premium_monthly", # 自定义数据,用于识别订单
provider_token=PAYMENT_PROVIDER_TOKEN,
currency="USD",
prices=prices,
start_parameter="premium",
photo_url="https://example.com/premium.jpg",
photo_width=640,
photo_height=480,
need_name=True,
need_email=True,
is_flexible=False,
)
async def precheckout_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""预结账回调 - 必须在10秒内响应"""
query = update.pre_checkout_query
if query.invoice_payload != "premium_monthly":
await query.answer(ok=False, error_message="出错了,请重试。")
else:
await query.answer(ok=True)
async def successful_payment(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""支付成功回调"""
payment = update.message.successful_payment
user_id = update.effective_user.id
# 更新用户会员状态
# await db.upgrade_user(user_id, "premium", days=30)
await update.message.reply_text(
"🎉 支付成功!\n\n"
f"支付金额: {payment.total_amount / 100} {payment.currency}\n"
"你的高级会员已激活,有效期 30 天。\n\n"
"感谢你的支持!"
)
# 注册支付相关 handler
# app.add_handler(CommandHandler("buy", send_invoice))
# app.add_handler(PreCheckoutQueryHandler(precheckout_callback))
# app.add_handler(MessageHandler(filters.SUCCESSFUL_PAYMENT, successful_payment))
10.2 对接第三方支付(USDT / 加密货币)
很多 Telegram Bot 需要接受加密货币支付。以下是对接 TON 区块链支付的示例架构:
# payments/crypto_pay.py - TON 支付示例
"""
TON 支付流程:
1. 用户点击"支付" → Bot 生成唯一订单ID
2. Bot 返回包含订单ID的 TON 转账地址(深链接或二维码)
3. 用户在 TON 钱包中完成转账
4. 后端监听 TON 区块链交易
5. 匹配到订单后,激活用户权益
"""
import hashlib
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class Order:
order_id: str
user_id: int
amount: float
currency: str
status: str # pending, paid, expired
created_at: float
paid_at: Optional[float] = None
# 订单存储(生产环境应使用数据库)
orders: dict = {}
async def create_crypto_order(
update: Update, context: ContextTypes.DEFAULT_TYPE
):
"""创建加密货币支付订单"""
user_id = update.effective_user.id
amount_ton = 5.0 # 5 TON
# 生成唯一订单ID
order_id = hashlib.sha256(
f"{user_id}:{time.time()}".encode()
).hexdigest()[:16]
order = Order(
order_id=order_id,
user_id=user_id,
amount=amount_ton,
currency="TON",
status="pending",
created_at=time.time(),
)
orders[order_id] = order
# TON 支付深链接(memo 中包含订单ID用于匹配)
ton_address = os.getenv("TON_WALLET_ADDRESS")
ton_link = f"ton://transfer/{ton_address}?amount={int(amount_ton * 1e9)}&text={order_id}"
keyboard = [
[InlineKeyboardButton("💎 使用 TON 支付", url=ton_link)],
[InlineKeyboardButton("🔄 检查支付状态", callback_data=f"check_{order_id}")],
]
await update.message.reply_text(
f"📋 *订单信息*\n\n"
f"订单号: `{order_id}`\n"
f"金额: {amount_ton} TON\n"
f"收款地址:\n`{ton_address}`\n\n"
f"请在 30 分钟内完成支付。\n"
f"备注(memo)中务必填写订单号!",
parse_mode="Markdown",
reply_markup=InlineKeyboardMarkup(keyboard),
)
第十一章:对接消息队列与任务调度
11.1 使用 Celery 处理异步任务
# tasks/celery_config.py
from celery import Celery
import os
celery_app = Celery(
"bot_tasks",
broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
backend=os.getenv("REDIS_URL", "redis://localhost:6379/1"),
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
task_soft_time_limit=300,
task_time_limit=600,
)
# tasks/worker.py
from tasks.celery_config import celery_app
import httpx
@celery_app.task(bind=True, max_retries=3)
def send_notification(self, chat_id: int, text: str):
"""异步发送通知"""
try:
token = os.getenv("BOT_TOKEN")
url = f"https://api.telegram.org/bot{token}/sendMessage"
response = httpx.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
})
response.raise_for_status()
except Exception as exc:
self.retry(exc=exc, countdown=60)
@celery_app.task
def process_image(file_path: str, user_id: int):
"""异步处理图片(如 AI 识别、压缩等)"""
# ... 图片处理逻辑
pass
@celery_app.task
def generate_report(user_id: int, report_type: str):
"""异步生成报告"""
# ... 报告生成逻辑
pass
11.2 定时任务(APScheduler)
# scheduler/jobs.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
async def daily_summary(context):
"""每日摘要推送"""
# 获取所有订阅用户
subscribers = await db.get_subscribers()
stats = await db.get_daily_stats()
for user_id in subscribers:
try:
await context.bot.send_message(
chat_id=user_id,
text=(
"📊 *每日摘要*\n\n"
f"今日新增用户: {stats['new_users']}\n"
f"今日消息数: {stats['messages']}\n"
f"活跃用户: {stats['active_users']}"
),
parse_mode="Markdown",
)
except Exception as e:
logger.error(f"Failed to send summary to {user_id}: {e}")
async def cleanup_expired_orders():
"""清理过期订单"""
expired = await db.get_expired_orders(hours=24)
for order in expired:
await db.update_order_status(order.id, "expired")
logger.info(f"Cleaned up {len(expired)} expired orders")
def setup_scheduler(application):
"""配置定时任务"""
# 每天早上 9 点发送摘要
scheduler.add_job(
daily_summary,
CronTrigger(hour=9, minute=0),
args=[application],
id="daily_summary",
)
# 每小时清理过期订单
scheduler.add_job(
cleanup_expired_orders,
CronTrigger(minute=0),
id="cleanup_orders",
)
scheduler.start()
第十二章:部署方案全解析
12.1 Docker 部署(推荐)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 创建非 root 用户
RUN useradd -m botuser
USER botuser
# 启动 Bot
CMD ["python", "bot.py"]
# docker-compose.yml
version: "3.8"
services:
bot:
build: .
restart: unless-stopped
env_file: .env
depends_on:
- postgres
- redis
volumes:
- ./downloads:/app/downloads
networks:
- bot-network
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
postgres:
image: postgres:16-alpine
restart: unless-stopped
environment:
POSTGRES_DB: botdb
POSTGRES_USER: botuser
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
networks:
- bot-network
redis:
image: redis:7-alpine
restart: unless-stopped
command: redis-server --appendonly yes
volumes:
- redisdata:/data
networks:
- bot-network
nginx:
image: nginx:alpine
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d
- ./certbot/conf:/etc/letsencrypt
- ./certbot/www:/var/www/certbot
depends_on:
- bot
networks:
- bot-network
volumes:
pgdata:
redisdata:
networks:
bot-network:
driver: bridge
# 部署命令
docker compose up -d --build
# 查看日志
docker compose logs -f bot
# 更新重启
docker compose pull
docker compose up -d --build
# 备份数据库
docker compose exec postgres pg_dump -U botuser botdb > backup.sql
12.2 Systemd 服务部署
如果不使用 Docker,可以配置 systemd 服务:
# /etc/systemd/system/telegram-bot.service
[Unit]
Description=Telegram Bot Service
After=network.target postgresql.service redis.service
[Service]
Type=simple
User=botuser
Group=botuser
WorkingDirectory=/home/botuser/telegram-bot
EnvironmentFile=/home/botuser/telegram-bot/.env
ExecStart=/home/botuser/telegram-bot/venv/bin/python bot.py
Restart=always
RestartSec=10
StartLimitBurst=5
StartLimitIntervalSec=60
# 安全限制
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/home/botuser/telegram-bot
# 日志
StandardOutput=journal
StandardError=journal
SyslogIdentifier=telegram-bot
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable telegram-bot
sudo systemctl start telegram-bot
sudo systemctl status telegram-bot
sudo journalctl -u telegram-bot -f # 查看日志
12.3 Serverless 部署(AWS Lambda)
# lambda_handler.py - AWS Lambda Webhook 处理
import json
import os
from telegram import Update, Bot
from telegram.ext import Application
# 冷启动时初始化
bot = Bot(token=os.getenv("BOT_TOKEN"))
app = Application.builder().token(os.getenv("BOT_TOKEN")).build()
# 注册 handlers...
def lambda_handler(event, context):
"""AWS Lambda 入口"""
try:
body = json.loads(event.get("body", "{}"))
update = Update.de_json(body, bot)
# 异步处理
import asyncio
asyncio.get_event_loop().run_until_complete(
app.process_update(update)
)
return {"statusCode": 200, "body": "OK"}
except Exception as e:
print(f"Error: {e}")
return {"statusCode": 500, "body": str(e)}
第十三章:安全加固与最佳实践
13.1 Token 安全
# 永远不要这样做:
BOT_TOKEN = "7123456789:AAH..." # ❌ 硬编码 Token
# 正确做法:
BOT_TOKEN = os.getenv("BOT_TOKEN") # ✅ 从环境变量读取
if not BOT_TOKEN:
raise ValueError("BOT_TOKEN 未设置!")
13.2 Webhook 安全验证
# 方式一:使用 Secret Token(推荐)
# Telegram 会在每个 Webhook 请求中包含 X-Telegram-Bot-Api-Secret-Token 头
secret_token = os.getenv("WEBHOOK_SECRET")
# 在 set_webhook 时指定
await bot.set_webhook(url=webhook_url, secret_token=secret_token)
# 方式二:限制来源 IP
# Telegram Webhook 请求来自以下 IP 段:
# 149.154.160.0/20
# 91.108.4.0/22
13.3 输入验证与防注入
import re
import html
def sanitize_input(text: str, max_length: int = 4096) -> str:
"""清理用户输入"""
# 限制长度
text = text[:max_length]
# HTML 转义
text = html.escape(text)
# 移除控制字符
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
return text
def validate_command_args(args: list, expected_types: list) -> bool:
"""验证命令参数"""
if len(args) != len(expected_types):
return False
for arg, expected_type in zip(args, expected_types):
try:
expected_type(arg)
except (ValueError, TypeError):
return False
return True
13.4 防滥用策略
# security/anti_abuse.py
from collections import defaultdict
import time
class AntiAbuse:
def __init__(self):
self.blocked_users = set()
self.warning_count = defaultdict(int)
self.message_timestamps = defaultdict(list)
def is_spam(self, user_id: int, window: int = 5, max_messages: int = 10) -> bool:
"""检测是否为垃圾消息(5秒内超过10条)"""
now = time.time()
timestamps = self.message_timestamps[user_id]
timestamps.append(now)
# 清理旧记录
self.message_timestamps[user_id] = [
t for t in timestamps if now - t < window
]
if len(self.message_timestamps[user_id]) > max_messages:
self.warning_count[user_id] += 1
if self.warning_count[user_id] >= 3:
self.blocked_users.add(user_id)
return True
return False
def is_blocked(self, user_id: int) -> bool:
return user_id in self.blocked_users
def unblock(self, user_id: int):
self.blocked_users.discard(user_id)
self.warning_count[user_id] = 0
anti_abuse = AntiAbuse()
# 在 Bot 中使用
async def security_middleware(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""安全中间件 - 放在所有 handler 之前"""
user_id = update.effective_user.id
if anti_abuse.is_blocked(user_id):
return # 静默忽略被封禁用户
if anti_abuse.is_spam(user_id):
await update.message.reply_text(
"⚠️ 消息发送过于频繁,请稍后再试。"
)
return
# 继续处理...
第十四章:监控、日志与运维
14.1 结构化日志
# utils/logger.py
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON 格式日志"""
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
if hasattr(record, "user_id"):
log_entry["user_id"] = record.user_id
if hasattr(record, "chat_id"):
log_entry["chat_id"] = record.chat_id
return json.dumps(log_entry, ensure_ascii=False)
def setup_logging():
"""配置日志"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# 文件输出(带轮转)
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
"logs/bot.log",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
14.2 健康检查端点
# health.py
from aiohttp import web
import time
async def health_check(request):
"""健康检查端点"""
uptime = time.time() - app_start_time
stats = await db.get_stats()
return web.json_response({
"status": "healthy",
"uptime_seconds": int(uptime),
"database": "connected",
"redis": "connected",
"total_users": stats["total_users"],
"messages_today": stats["messages_today"],
})
# 在 Docker 中配置健康检查
# docker-compose.yml:
# healthcheck:
# test: ["CMD", "curl", "-f", "http://localhost:8443/health"]
# interval: 30s
# timeout: 10s
# retries: 3
14.3 Prometheus 指标收集
# metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 定义指标
MESSAGES_TOTAL = Counter(
"bot_messages_total",
"Total messages received",
["type", "command"],
)
RESPONSE_TIME = Histogram(
"bot_response_seconds",
"Response time in seconds",
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
ACTIVE_USERS = Gauge(
"bot_active_users",
"Number of active users",
)
ERRORS_TOTAL = Counter(
"bot_errors_total",
"Total errors",
["type"],
)
# 启动 Prometheus HTTP 服务器
start_http_server(9090)
# 在处理消息时记录指标
import time
async def metrics_middleware(update, context):
start = time.time()
MESSAGES_TOTAL.labels(type="text", command="none").inc()
# ... 处理逻辑
duration = time.time() - start
RESPONSE_TIME.observe(duration)
第十五章:常见问题与排错指南
15.1 常见错误及解决方案
| 错误 | 原因 | 解决方案 |
|---|---|---|
Conflict: terminated by other getUpdates request |
有多个实例在轮询 | 确保只有一个实例运行 |
Unauthorized |
Token 无效 | 检查 Token 是否正确,是否包含多余空格 |
Bad Request: chat not found |
用户从未与 Bot 交互 | 用户必须先 /start 才能收到消息 |
Flood control exceeded |
发送消息过于频繁 | 实现消息队列,控制发送速率 |
Connection timeout |
网络问题 | 检查服务器网络,考虑使用代理 |
Webhook was already set |
Webhook 和 Polling 冲突 | 清除 Webhook 后再使用 Polling |
SSL error |
证书问题 | 检查 SSL 证书是否有效 |
15.2 调试技巧
# 1. 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# 2. 查看 Webhook 信息
import httpx
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/getWebhookInfo")
print(resp.json())
# 3. 手动删除 Webhook(切换回 Polling)
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/deleteWebhook")
print(resp.json())
# 4. 获取 Bot 信息
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/getMe")
print(resp.json())
15.3 性能优化清单
- 使用连接池:数据库和 Redis 都应使用连接池
- 异步处理:所有 I/O 操作都应异步执行
- 消息队列:耗时操作放入消息队列异步处理
- 缓存策略:频繁读取的数据使用 Redis 缓存
- 批量操作:群发消息时使用批量发送而非逐条发送
- 限流保护:对所有用户操作实施频率限制
- 资源清理:定期清理过期数据和临时文件
- 日志级别:生产环境设置为 INFO 或 WARNING
附录:完整项目结构参考
telegram-bot/
├── bot.py # 主入口
├── config.py # 配置管理
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 构建文件
├── docker-compose.yml # Docker Compose 编排
├── .env # 环境变量(不提交到 Git)
├── .env.example # 环境变量模板
├── .gitignore
│
├── handlers/ # 消息处理器
│ ├── __init__.py
│ ├── commands.py # 命令处理
│ ├── messages.py # 消息处理
│ ├── callbacks.py # 回调查询处理
│ └── conversations.py # 对话流程
│
├── services/ # 外部服务对接
│ ├── __init__.py
│ ├── openai_service.py # OpenAI 对接
│ ├── claude_service.py # Claude 对接
│ └── payment_service.py # 支付服务
│
├── database/ # 数据库
│ ├── __init__.py
│ ├── models.py # 数据模型
│ ├── pg_db.py # PostgreSQL
│ ├── redis_cache.py # Redis 缓存
│ └── migrations/ # 数据库迁移
│
├── middleware/ # 中间件
│ ├── __init__.py
│ ├── auth.py # 权限验证
│ ├── rate_limit.py # 频率限制
│ └── logging.py # 日志中间件
│
├── tasks/ # 异步任务
│ ├── __init__.py
│ ├── celery_config.py
│ └── worker.py
│
├── scheduler/ # 定时任务
│ ├── __init__.py
│ └── jobs.py
│
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── helpers.py
│ └── validators.py
│
├── security/ # 安全模块
│ ├── __init__.py
│ └── anti_abuse.py
│
├── nginx/ # Nginx 配置
│ └── conf.d/
│ └── bot.conf
│
├── logs/ # 日志目录
│
├── tests/ # 测试
│ ├── test_handlers.py
│ ├── test_services.py
│ └── test_database.py
│
└── docs/ # 文档
├── API.md
└── DEPLOYMENT.md
总结
通过本文,我们完整地走过了 Telegram Bot 从创建到部署的全流程:
- 基础概念:理解了 Bot 的工作原理和核心概念
- 环境配置:搭建了完整的开发环境
- Bot 创建:通过 BotFather 创建并配置了 Bot
- 框架选型:对比了各语言的主流框架
- 代码实战:分别用 Python 和 Node.js 实现了完整的 Bot
- Webhook:配置了生产级的 Webhook 服务
- 数据库对接:实现了 SQLite、PostgreSQL、Redis 的完整对接
- AI 对接:集成了 OpenAI 和 Claude 的 AI 对话能力
- 支付对接:实现了 Telegram 原生支付和加密货币支付
- 任务调度:配置了异步任务和定时任务
- 部署方案:提供了 Docker、Systemd、Serverless 三种部署方案
- 安全加固:实现了完整的安全防护措施
- 监控运维:配置了日志、健康检查和指标收集
希望这篇教程能帮助你成功搭建自己的 Telegram Bot!如有问题,欢迎在评论区讨论。
相关资源:
本文首发于万先生,转载请注明出处。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)