🤖 Telegram Bot 搭建与对接完全指南:从零到生产级部署

作者:万先生 | 最后更新:2026年2月 公司官网:https://hongshuotech.xyz/zh

上一期的tg搭建教程很火,特意二次更新

本文将从最基础的概念开始,手把手教你搭建一个功能完善的 Telegram Bot,并对接各种第三方服务。无论你是初学者还是有一定经验的开发者,都能在本文中找到有价值的内容。全文约 15000+ 字,建议收藏后慢慢阅读。


📑 目录


第一章:Telegram Bot 基础概念

1.1 什么是 Telegram Bot?

Telegram Bot 是运行在 Telegram 平台上的自动化程序。它们可以响应用户的消息、执行命令、发送通知、处理支付,甚至可以作为完整的应用程序运行。与其他即时通讯平台的机器人相比,Telegram Bot 拥有以下独特优势:

  • 完全免费的 API:Telegram 不对 Bot API 的调用收取任何费用
  • 丰富的消息类型:支持文本、图片、视频、文件、位置、联系人、投票等多种消息格式
  • Inline Mode:用户可以在任何聊天中直接调用你的 Bot
  • 支付集成:原生支持多种支付渠道
  • Web App:可以在 Bot 中嵌入完整的 Web 应用
  • 无需审核:创建 Bot 即可立即使用,无需等待平台审核

1.2 Bot 的工作原理

Telegram Bot 的核心工作流程如下:

用户发送消息 → Telegram 服务器 → Bot API → 你的服务器 → 处理逻辑 → Bot API → Telegram 服务器 → 用户收到回复

你的服务器与 Telegram 服务器之间的通信有两种模式:

  1. Polling(轮询):你的服务器主动向 Telegram 服务器请求新消息
  2. Webhook(推送):Telegram 服务器在有新消息时主动推送到你的服务器
┌─────────────────────────────────────────────────┐
│                  Polling 模式                     │
│                                                   │
│  你的服务器 ──请求──> Telegram ──返回消息──> 你的服务器 │
│     ↑                                      │      │
│     └──────────── 循环请求 ────────────────┘      │
└─────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────┐
│                 Webhook 模式                      │
│                                                   │
│  用户发消息 → Telegram ──推送──> 你的服务器(HTTPS)  │
│                          ←──响应──               │
└─────────────────────────────────────────────────┘

1.3 Bot API 的核心概念

在开始编码之前,你需要理解以下核心概念:

Update(更新):Bot 收到的每一条新消息、回调查询、编辑等事件都是一个 Update 对象。每个 Update 有唯一的 update_id

Chat(聊天):可以是私聊(private)、群组(group)、超级群组(supergroup)或频道(channel)。每个聊天都有唯一的 chat_id

Message(消息):用户发送的内容,包含消息文本、发送者信息、时间戳等。

Callback Query(回调查询):用户点击 Inline Keyboard 按钮时触发的事件。

Command(命令):以 / 开头的特殊消息,如 /start/help


第二章:环境准备与工具安装

2.1 选择你的服务器

搭建 Telegram Bot 首先需要一台服务器。以下是常见方案对比:

方案 优点 缺点 适合场景 月成本
自有 VPS 完全控制,灵活 需要运维 正式项目 $5-20
云函数(Serverless) 免运维,按量付费 冷启动延迟 轻量级 Bot $0-5
本地开发机 免费,方便调试 需要内网穿透 开发测试 $0
Docker 容器 可移植,环境一致 需要容器知识 团队协作 取决于宿主机
Heroku / Railway 部署简单 免费额度有限 个人项目 $0-7

推荐方案:对于初学者,建议使用一台 VPS(如 Vultr、DigitalOcean、Hetzner)配合 Docker 部署。

2.2 服务器基础环境配置

假设你已经有了一台 Ubuntu 22.04 / 24.04 的 VPS,按以下步骤配置基础环境:

# 1. 更新系统
sudo apt update && sudo apt upgrade -y

# 2. 安装基础工具
sudo apt install -y curl wget git vim htop unzip software-properties-common

# 3. 配置防火墙(重要!)
sudo ufw allow OpenSSH
sudo ufw allow 443/tcp    # Webhook 需要 HTTPS
sudo ufw allow 80/tcp     # 用于 Let's Encrypt 证书验证
sudo ufw enable

# 4. 创建专用用户(不要用 root 运行 Bot)
sudo adduser botuser
sudo usermod -aG sudo botuser
su - botuser

2.3 安装 Python 环境

# 安装 Python 3.11+
sudo apt install -y python3 python3-pip python3-venv

# 验证版本
python3 --version   # 应该显示 3.11.x 或更高

# 创建项目目录
mkdir -p ~/telegram-bot && cd ~/telegram-bot

# 创建虚拟环境(强烈推荐)
python3 -m venv venv
source venv/bin/activate

# 安装核心依赖
pip install python-telegram-bot[all]  # 官方推荐的 Python 库
pip install aiohttp                    # 异步 HTTP 请求
pip install python-dotenv              # 环境变量管理

2.4 安装 Node.js 环境

# 使用 nvm 安装(推荐方式)
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
source ~/.bashrc

# 安装 Node.js LTS
nvm install --lts
nvm use --lts

# 验证
node --version  # 应该显示 v20.x 或更高
npm --version

# 初始化项目
mkdir -p ~/telegram-bot-node && cd ~/telegram-bot-node
npm init -y

# 安装核心依赖
npm install telegraf        # 最流行的 Node.js Bot 框架
npm install dotenv          # 环境变量
npm install node-fetch      # HTTP 请求

2.5 安装 Docker(可选但推荐)

# 安装 Docker
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $USER
newgrp docker

# 安装 Docker Compose
sudo apt install -y docker-compose-plugin

# 验证
docker --version
docker compose version

第三章:创建你的第一个 Bot

3.1 通过 BotFather 创建 Bot

BotFather 是 Telegram 官方的"机器人之父",所有 Bot 的创建和管理都通过它完成。

操作步骤:

  1. 在 Telegram 中搜索 @BotFather 并打开对话
  2. 发送 /newbot 命令
  3. 输入你的 Bot 名称(显示名称,可以包含中文和空格)
  4. 输入你的 Bot 用户名(必须以 bot 结尾,如 my_awesome_bot
  5. BotFather 会返回你的 API Token
✅ 成功创建 Bot!

Bot 名称: My Awesome Bot
用户名: @my_awesome_bot
Token: 7123456789:AAH1234567890abcdefghijklmnopqrstuv

⚠️ 请务必妥善保管你的 Token!
任何拥有 Token 的人都可以完全控制你的 Bot。

安全提示:永远不要在代码中硬编码 Token,也不要将其提交到 Git 仓库。使用环境变量来管理。

3.2 配置 Bot 的基本信息

创建完 Bot 后,你可以通过 BotFather 进一步配置:

/setdescription  - 设置 Bot 的描述信息(搜索结果中显示)
/setabouttext    - 设置 Bot 的 "关于" 信息
/setuserpic      - 设置 Bot 的头像
/setcommands     - 设置 Bot 的命令列表(命令菜单)
/setprivacy      - 设置隐私模式(群组中是否只接收 @ 和命令)
/setjoingroups   - 设置是否允许被添加到群组
/setinline       - 启用 Inline 模式

设置命令列表示例:

向 BotFather 发送 /setcommands,然后输入:

start - 开始使用
help - 帮助信息
settings - 设置
status - 查看状态
subscribe - 订阅通知
unsubscribe - 取消订阅

3.3 使用环境变量管理配置

创建 .env 文件:

# .env 文件(绝对不要提交到 Git!)
BOT_TOKEN=7123456789:AAH1234567890abcdefghijklmnopqrstuv
BOT_USERNAME=my_awesome_bot
ADMIN_CHAT_ID=123456789
DATABASE_URL=postgresql://user:pass@localhost:5432/botdb
REDIS_URL=redis://localhost:6379
WEBHOOK_URL=https://bot.yourdomain.com
LOG_LEVEL=INFO

创建 .gitignore 文件:

.env
*.pyc
__pycache__/
venv/
node_modules/
*.log

第四章:Bot 开发框架选型

4.1 Python 框架对比

框架 Stars 特点 推荐度
python-telegram-bot 26k+ 官方推荐,功能完整,异步支持 ⭐⭐⭐⭐⭐
aiogram 5k+ 纯异步,性能优秀,现代设计 ⭐⭐⭐⭐⭐
pyTelegramBotAPI 8k+ 简单易用,适合入门 ⭐⭐⭐⭐
Pyrogram 4k+ MTProto 协议,功能更底层 ⭐⭐⭐

推荐选择

  • 新手入门python-telegram-bot(文档最全,社区最大)
  • 高性能场景aiogram(纯异步,设计更现代)
  • 快速原型pyTelegramBotAPI(API 最简洁)

4.2 Node.js 框架对比

框架 Stars 特点 推荐度
Telegraf 8k+ 中间件架构,类似 Koa ⭐⭐⭐⭐⭐
grammY 2k+ TypeScript 优先,现代设计 ⭐⭐⭐⭐⭐
node-telegram-bot-api 8k+ 轻量,低级别封装 ⭐⭐⭐⭐

推荐选择

  • JavaScript 项目Telegraf(生态成熟,插件丰富)
  • TypeScript 项目grammY(类型安全,文档优秀)

4.3 其他语言方案

如果你使用其他语言,以下是各语言的推荐库:

  • Go: telebot(gopkg.in/telebot.v3)
  • Rust: teloxide
  • Java/Kotlin: TelegramBots
  • C#: Telegram.Bot
  • PHP: telegram-bot-sdk

第五章:Python 版本完整实战

5.1 基础 Bot 骨架

# bot.py - 基础版本
import os
import logging
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
    Application,
    CommandHandler,
    MessageHandler,
    CallbackQueryHandler,
    ConversationHandler,
    filters,
    ContextTypes,
)

# 加载环境变量
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")

# 配置日志
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO
)
logger = logging.getLogger(__name__)


# ==================== 命令处理器 ====================

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理 /start 命令"""
    user = update.effective_user
    welcome_text = (
        f"👋 你好,{user.first_name}!\n\n"
        f"欢迎使用我们的 Bot!\n\n"
        f"可用命令:\n"
        f"/help - 查看帮助\n"
        f"/settings - 修改设置\n"
        f"/status - 查看状态"
    )

    # 创建 Inline Keyboard
    keyboard = [
        [
            InlineKeyboardButton("📚 使用教程", callback_data="tutorial"),
            InlineKeyboardButton("⚙️ 设置", callback_data="settings"),
        ],
        [
            InlineKeyboardButton("💬 联系我们", url="https://t.me/your_support"),
        ],
    ]
    reply_markup = InlineKeyboardMarkup(keyboard)

    await update.message.reply_text(welcome_text, reply_markup=reply_markup)


async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理 /help 命令"""
    help_text = (
        "📖 *帮助中心*\n\n"
        "以下是我可以帮你做的事情:\n\n"
        "🔹 `/start` \\- 重新开始\n"
        "🔹 `/help` \\- 查看本帮助\n"
        "🔹 `/settings` \\- 个人设置\n"
        "🔹 `/status` \\- 系统状态\n"
        "🔹 `/subscribe` \\- 订阅通知\n\n"
        "直接发送文字消息,我也会回复哦!"
    )
    await update.message.reply_text(help_text, parse_mode="MarkdownV2")


async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理 /status 命令"""
    import psutil
    import time

    cpu = psutil.cpu_percent()
    memory = psutil.virtual_memory().percent
    uptime = time.time() - context.bot_data.get("start_time", time.time())
    hours = int(uptime // 3600)
    minutes = int((uptime % 3600) // 60)

    status_text = (
        "📊 *系统状态*\n\n"
        f"🖥️ CPU 使用率: {cpu}%\n"
        f"💾 内存使用率: {memory}%\n"
        f"⏱️ 运行时间: {hours}小时{minutes}分钟\n"
        f"🤖 Bot 状态: 正常运行中 ✅"
    )
    await update.message.reply_text(status_text, parse_mode="Markdown")


# ==================== 消息处理器 ====================

async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理普通文本消息"""
    text = update.message.text
    user = update.effective_user
    logger.info(f"收到来自 {user.first_name}({user.id}) 的消息: {text}")

    # 简单的关键词回复
    if "你好" in text or "hello" in text.lower():
        await update.message.reply_text(f"你好呀,{user.first_name}!有什么可以帮你的? 😊")
    elif "谢谢" in text:
        await update.message.reply_text("不客气!随时为你服务 🤗")
    else:
        await update.message.reply_text(
            f"收到你的消息:「{text}」\n\n"
            "如需帮助,请使用 /help 查看功能列表。"
        )


async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理图片消息"""
    photo = update.message.photo[-1]  # 获取最高分辨率的图片
    file = await photo.get_file()
    file_path = f"downloads/{photo.file_id}.jpg"
    await file.download_to_drive(file_path)

    await update.message.reply_text(
        f"📷 收到你的图片!\n"
        f"文件大小: {photo.file_size} bytes\n"
        f"尺寸: {photo.width}x{photo.height}"
    )


async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理文件消息"""
    document = update.message.document
    await update.message.reply_text(
        f"📄 收到文件:{document.file_name}\n"
        f"类型: {document.mime_type}\n"
        f"大小: {document.file_size} bytes"
    )


# ==================== 回调处理器 ====================

async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """处理 Inline Keyboard 按钮点击"""
    query = update.callback_query
    await query.answer()  # 必须回应回调查询

    if query.data == "tutorial":
        await query.edit_message_text(
            "📚 *使用教程*\n\n"
            "1️⃣ 首先,发送 /start 开始\n"
            "2️⃣ 然后,选择你需要的功能\n"
            "3️⃣ 按照提示操作即可\n\n"
            "就是这么简单!",
            parse_mode="Markdown"
        )
    elif query.data == "settings":
        keyboard = [
            [InlineKeyboardButton("🌍 语言设置", callback_data="lang")],
            [InlineKeyboardButton("🔔 通知设置", callback_data="notify")],
            [InlineKeyboardButton("🔙 返回", callback_data="back")],
        ]
        await query.edit_message_text(
            "⚙️ *设置*\n\n请选择要修改的设置项:",
            reply_markup=InlineKeyboardMarkup(keyboard),
            parse_mode="Markdown"
        )
    elif query.data == "back":
        await start(update, context)


# ==================== 错误处理 ====================

async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE):
    """全局错误处理"""
    logger.error(f"Update {update} caused error {context.error}")

    # 通知管理员
    admin_id = os.getenv("ADMIN_CHAT_ID")
    if admin_id:
        await context.bot.send_message(
            chat_id=admin_id,
            text=f"⚠️ Bot 出错了!\n\n错误信息: {context.error}"
        )


# ==================== 主函数 ====================

def main():
    """启动 Bot"""
    import time

    app = Application.builder().token(BOT_TOKEN).build()

    # 记录启动时间
    app.bot_data["start_time"] = time.time()

    # 注册处理器
    app.add_handler(CommandHandler("start", start))
    app.add_handler(CommandHandler("help", help_command))
    app.add_handler(CommandHandler("status", status))
    app.add_handler(CallbackQueryHandler(button_callback))
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
    app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
    app.add_handler(MessageHandler(filters.Document.ALL, handle_document))

    # 注册错误处理器
    app.add_error_handler(error_handler)

    # 启动 Bot
    logger.info("Bot 启动中...")
    app.run_polling(allowed_updates=Update.ALL_TYPES)


if __name__ == "__main__":
    main()

5.2 进阶:ConversationHandler(对话流程)

ConversationHandler 是实现多步交互的核心组件。以下示例实现了一个用户注册流程:

# conversation.py - 对话流程示例
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.ext import (
    ConversationHandler,
    CommandHandler,
    MessageHandler,
    filters,
    ContextTypes,
)

# 定义对话状态
NAME, AGE, LOCATION, CONFIRM = range(4)


async def register_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """开始注册流程"""
    await update.message.reply_text(
        "📝 开始注册流程\n\n"
        "请输入你的名字(发送 /cancel 可随时取消):"
    )
    return NAME


async def get_name(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """获取用户名字"""
    context.user_data["name"] = update.message.text
    await update.message.reply_text(
        f"好的,{update.message.text}!\n\n请输入你的年龄:"
    )
    return AGE


async def get_age(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """获取用户年龄"""
    try:
        age = int(update.message.text)
        if age < 1 or age > 150:
            raise ValueError
        context.user_data["age"] = age
    except ValueError:
        await update.message.reply_text("请输入有效的年龄(1-150):")
        return AGE

    keyboard = [["🌏 亚洲", "🌍 欧洲"], ["🌎 美洲", "🌏 其他"]]
    await update.message.reply_text(
        "请选择你的所在地区:",
        reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True)
    )
    return LOCATION


async def get_location(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """获取用户地区"""
    context.user_data["location"] = update.message.text

    # 显示确认信息
    data = context.user_data
    confirm_text = (
        "📋 *请确认你的注册信息:*\n\n"
        f"姓名: {data['name']}\n"
        f"年龄: {data['age']}\n"
        f"地区: {data['location']}\n\n"
        "确认注册请发送 /confirm\n"
        "重新填写请发送 /cancel"
    )
    await update.message.reply_text(
        confirm_text,
        reply_markup=ReplyKeyboardRemove(),
        parse_mode="Markdown"
    )
    return CONFIRM


async def confirm_registration(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """确认注册"""
    data = context.user_data

    # 这里可以将数据保存到数据库
    # await db.save_user(update.effective_user.id, data)

    await update.message.reply_text(
        "✅ 注册成功!\n\n"
        f"欢迎你,{data['name']}!"
    )
    context.user_data.clear()
    return ConversationHandler.END


async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """取消注册"""
    context.user_data.clear()
    await update.message.reply_text(
        "❌ 已取消注册。",
        reply_markup=ReplyKeyboardRemove()
    )
    return ConversationHandler.END


# 创建 ConversationHandler
registration_handler = ConversationHandler(
    entry_points=[CommandHandler("register", register_start)],
    states={
        NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_name)],
        AGE: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_age)],
        LOCATION: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_location)],
        CONFIRM: [CommandHandler("confirm", confirm_registration)],
    },
    fallbacks=[CommandHandler("cancel", cancel)],
    conversation_timeout=300,  # 5分钟超时
)

5.3 进阶:中间件与装饰器

# middleware.py - 权限控制与限流
import time
from functools import wraps
from collections import defaultdict


# 管理员权限检查装饰器
def admin_only(func):
    @wraps(func)
    async def wrapper(update, context):
        admin_ids = [int(id) for id in os.getenv("ADMIN_IDS", "").split(",") if id]
        if update.effective_user.id not in admin_ids:
            await update.message.reply_text("⛔ 你没有权限执行此操作。")
            return
        return await func(update, context)
    return wrapper


# 频率限制装饰器
def rate_limit(max_calls: int, period: int):
    """限制用户调用频率"""
    user_calls = defaultdict(list)

    def decorator(func):
        @wraps(func)
        async def wrapper(update, context):
            user_id = update.effective_user.id
            now = time.time()

            # 清理过期记录
            user_calls[user_id] = [
                t for t in user_calls[user_id] if now - t < period
            ]

            if len(user_calls[user_id]) >= max_calls:
                remaining = int(period - (now - user_calls[user_id][0]))
                await update.message.reply_text(
                    f"⏳ 操作太频繁,请 {remaining} 秒后再试。"
                )
                return

            user_calls[user_id].append(now)
            return await func(update, context)
        return wrapper
    return decorator


# 使用示例
@admin_only
async def admin_broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """管理员群发消息"""
    # ... 群发逻辑
    pass


@rate_limit(max_calls=5, period=60)
async def ai_chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """AI 聊天(每分钟最多5次)"""
    # ... AI 对接逻辑
    pass

第六章:Node.js 版本完整实战

6.1 Telegraf 基础 Bot

// bot.js - Telegraf 版本
require('dotenv').config();
const { Telegraf, Markup, session } = require('telegraf');

const bot = new Telegraf(process.env.BOT_TOKEN);

// 使用 session 中间件
bot.use(session());

// 日志中间件
bot.use(async (ctx, next) => {
    const start = Date.now();
    await next();
    const ms = Date.now() - start;
    console.log(`[${new Date().toISOString()}] ${ctx.updateType} from ${ctx.from?.id} - ${ms}ms`);
});

// /start 命令
bot.start(async (ctx) => {
    const name = ctx.from.first_name;
    await ctx.reply(
        `👋 你好,${name}!欢迎使用我们的 Bot!\n\n可用命令:\n/help - 帮助\n/settings - 设置`,
        Markup.inlineKeyboard([
            [
                Markup.button.callback('📚 教程', 'tutorial'),
                Markup.button.callback('⚙️ 设置', 'settings'),
            ],
            [Markup.button.url('💬 联系我们', 'https://t.me/your_support')],
        ])
    );
});

// /help 命令
bot.help(async (ctx) => {
    await ctx.replyWithMarkdown(
        '*📖 帮助中心*\n\n' +
        '🔹 /start - 重新开始\n' +
        '🔹 /help - 查看帮助\n' +
        '🔹 /settings - 个人设置\n' +
        '🔹 /status - 系统状态'
    );
});

// 回调查询处理
bot.action('tutorial', async (ctx) => {
    await ctx.answerCbQuery();
    await ctx.editMessageText(
        '📚 *使用教程*\n\n1️⃣ 发送 /start 开始\n2️⃣ 选择功能\n3️⃣ 按提示操作',
        { parse_mode: 'Markdown' }
    );
});

bot.action('settings', async (ctx) => {
    await ctx.answerCbQuery();
    await ctx.editMessageText(
        '⚙️ *设置*\n\n请选择:',
        {
            parse_mode: 'Markdown',
            ...Markup.inlineKeyboard([
                [Markup.button.callback('🌍 语言', 'lang')],
                [Markup.button.callback('🔔 通知', 'notify')],
                [Markup.button.callback('🔙 返回', 'back')],
            ]),
        }
    );
});

// 文本消息处理
bot.on('text', async (ctx) => {
    const text = ctx.message.text;
    if (text.includes('你好') || text.toLowerCase().includes('hello')) {
        await ctx.reply(`你好呀!😊`);
    } else {
        await ctx.reply(`收到你的消息:「${text}」\n\n使用 /help 查看功能列表。`);
    }
});

// 图片处理
bot.on('photo', async (ctx) => {
    const photo = ctx.message.photo.pop();
    await ctx.reply(
        `📷 收到图片!\n尺寸: ${photo.width}x${photo.height}\n大小: ${photo.file_size} bytes`
    );
});

// 错误处理
bot.catch((err, ctx) => {
    console.error(`Error for ${ctx.updateType}:`, err);
});

// 启动
bot.launch()
    .then(() => console.log('🤖 Bot 已启动!'))
    .catch(err => console.error('启动失败:', err));

// 优雅退出
process.once('SIGINT', () => bot.stop('SIGINT'));
process.once('SIGTERM', () => bot.stop('SIGTERM'));

6.2 grammY(TypeScript)版本

// bot.ts - grammY 版本
import { Bot, Context, session, InlineKeyboard, GrammyError, HttpError } from "grammy";
import { type Conversation, type ConversationFlavor, conversations, createConversation } from "@grammyjs/conversations";

// 定义 Context 类型
type MyContext = Context & ConversationFlavor;
type MyConversation = Conversation<MyContext>;

// 创建 Bot 实例
const bot = new Bot<MyContext>(process.env.BOT_TOKEN!);

// 中间件
bot.use(session({ initial: () => ({}) }));
bot.use(conversations());

// 注册对话
async function registration(conversation: MyConversation, ctx: MyContext) {
    await ctx.reply("📝 请输入你的名字:");
    const nameCtx = await conversation.wait();
    const name = nameCtx.message?.text;

    await ctx.reply("请输入你的邮箱:");
    const emailCtx = await conversation.wait();
    const email = emailCtx.message?.text;

    await ctx.reply(
        `✅ 注册成功!\n\n姓名: ${name}\n邮箱: ${email}`
    );
}

bot.use(createConversation(registration));

// 命令
bot.command("start", async (ctx) => {
    const keyboard = new InlineKeyboard()
        .text("📚 教程", "tutorial")
        .text("⚙️ 设置", "settings")
        .row()
        .url("💬 联系我们", "https://t.me/your_support");

    await ctx.reply(`👋 欢迎,${ctx.from?.first_name}`, {
        reply_markup: keyboard,
    });
});

bot.command("register", async (ctx) => {
    await ctx.conversation.enter("registration");
});

// 错误处理
bot.catch((err) => {
    const ctx = err.ctx;
    console.error(`Error while handling update ${ctx.update.update_id}:`);
    const e = err.error;
    if (e instanceof GrammyError) {
        console.error("Error in request:", e.description);
    } else if (e instanceof HttpError) {
        console.error("Could not contact Telegram:", e);
    } else {
        console.error("Unknown error:", e);
    }
});

bot.start();

第七章:Webhook 与 Polling 模式详解

7.1 两种模式的选择

特性 Polling Webhook
实现难度 ⭐ 简单 ⭐⭐⭐ 需要 HTTPS
实时性 略有延迟 接近实时
资源消耗 持续占用连接 按需触发
调试便利 ✅ 本地直接运行 ❌ 需要公网地址
适合场景 开发、小型项目 生产环境
可靠性 依赖网络连接 更稳定

结论:开发阶段使用 Polling,生产环境使用 Webhook。

7.2 Webhook 搭建详细步骤

方式一:Nginx 反向代理 + Let’s Encrypt

# 1. 安装 Nginx 和 Certbot
sudo apt install -y nginx certbot python3-certbot-nginx

# 2. 配置 Nginx
sudo vim /etc/nginx/sites-available/bot
# /etc/nginx/sites-available/bot
server {
    listen 80;
    server_name bot.yourdomain.com;

    location / {
        return 301 https://$server_name$request_uri;
    }
}

server {
    listen 443 ssl;
    server_name bot.yourdomain.com;

    # SSL 证书(由 Certbot 自动配置)
    ssl_certificate /etc/letsencrypt/live/bot.yourdomain.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/bot.yourdomain.com/privkey.pem;

    # 安全头
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-Content-Type-Options "nosniff" always;

    # Bot Webhook 端点
    location /webhook {
        proxy_pass http://127.0.0.1:8443;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 超时设置
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # 健康检查端点
    location /health {
        proxy_pass http://127.0.0.1:8443;
    }
}
# 3. 启用站点和获取证书
sudo ln -s /etc/nginx/sites-available/bot /etc/nginx/sites-enabled/
sudo certbot --nginx -d bot.yourdomain.com
sudo nginx -t && sudo systemctl reload nginx

方式二:使用 Caddy(更简单的替代方案)

# Caddyfile
bot.yourdomain.com {
    reverse_proxy /webhook localhost:8443
    reverse_proxy /health localhost:8443
}

Caddy 会自动获取和续期 SSL 证书,非常方便。

7.3 Python Webhook 代码

# webhook.py - Webhook 模式启动
import os
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application

load_dotenv()

async def main():
    app = Application.builder().token(os.getenv("BOT_TOKEN")).build()

    # ... 注册各种 handler(同之前的代码)

    # 设置 Webhook
    webhook_url = os.getenv("WEBHOOK_URL")  # https://bot.yourdomain.com/webhook
    secret_token = os.getenv("WEBHOOK_SECRET", "your-secret-token")

    await app.bot.set_webhook(
        url=f"{webhook_url}/webhook",
        secret_token=secret_token,
        allowed_updates=Update.ALL_TYPES,
        drop_pending_updates=True,  # 忽略离线期间的消息
    )

    # 启动 Webhook 服务器
    await app.run_webhook(
        listen="0.0.0.0",
        port=8443,
        url_path="/webhook",
        secret_token=secret_token,
        webhook_url=f"{webhook_url}/webhook",
    )

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

7.4 本地开发使用 ngrok

在本地开发时,可以使用 ngrok 创建临时隧道来测试 Webhook:

# 安装 ngrok
curl -s https://ngrok-agent.s3.amazonaws.com/ngrok.asc | \
  sudo tee /etc/apt/trusted.gpg.d/ngrok.asc >/dev/null && \
  echo "deb https://ngrok-agent.s3.amazonaws.com buster main" | \
  sudo tee /etc/apt/sources.list.d/ngrok.list && \
  sudo apt update && sudo apt install ngrok

# 配置 auth token
ngrok config add-authtoken YOUR_AUTH_TOKEN

# 启动隧道
ngrok http 8443

# ngrok 会输出类似:
# Forwarding  https://abc123.ngrok-free.app -> http://localhost:8443

然后将 ngrok 的 HTTPS 地址设置为 Webhook URL 即可。


第八章:对接数据库

8.1 SQLite(适合小型项目)

# database/sqlite_db.py
import aiosqlite
import os

DB_PATH = os.getenv("DB_PATH", "bot_data.db")


async def init_db():
    """初始化数据库"""
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                user_id INTEGER PRIMARY KEY,
                username TEXT,
                first_name TEXT,
                language TEXT DEFAULT 'zh',
                is_premium BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        await db.execute("""
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id INTEGER,
                message_text TEXT,
                message_type TEXT DEFAULT 'text',
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (user_id) REFERENCES users(user_id)
            )
        """)
        await db.execute("""
            CREATE TABLE IF NOT EXISTS subscriptions (
                user_id INTEGER PRIMARY KEY,
                plan TEXT DEFAULT 'free',
                expires_at TIMESTAMP,
                FOREIGN KEY (user_id) REFERENCES users(user_id)
            )
        """)
        await db.commit()


async def upsert_user(user_id: int, username: str, first_name: str):
    """创建或更新用户"""
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("""
            INSERT INTO users (user_id, username, first_name, last_active)
            VALUES (?, ?, ?, CURRENT_TIMESTAMP)
            ON CONFLICT(user_id) DO UPDATE SET
                username = excluded.username,
                first_name = excluded.first_name,
                last_active = CURRENT_TIMESTAMP
        """, (user_id, username, first_name))
        await db.commit()


async def get_user(user_id: int):
    """获取用户信息"""
    async with aiosqlite.connect(DB_PATH) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute(
            "SELECT * FROM users WHERE user_id = ?", (user_id,)
        ) as cursor:
            return await cursor.fetchone()


async def save_message(user_id: int, text: str, msg_type: str = "text"):
    """保存消息记录"""
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT INTO messages (user_id, message_text, message_type) VALUES (?, ?, ?)",
            (user_id, text, msg_type)
        )
        await db.commit()


async def get_user_stats():
    """获取用户统计"""
    async with aiosqlite.connect(DB_PATH) as db:
        async with db.execute("SELECT COUNT(*) FROM users") as cursor:
            total_users = (await cursor.fetchone())[0]
        async with db.execute(
            "SELECT COUNT(*) FROM users WHERE last_active > datetime('now', '-1 day')"
        ) as cursor:
            active_users = (await cursor.fetchone())[0]
        async with db.execute("SELECT COUNT(*) FROM messages") as cursor:
            total_messages = (await cursor.fetchone())[0]
    return {
        "total_users": total_users,
        "active_users": active_users,
        "total_messages": total_messages,
    }

8.2 PostgreSQL(推荐生产环境)

# database/pg_db.py
import asyncpg
import os


class Database:
    def __init__(self):
        self.pool = None

    async def connect(self):
        """创建连接池"""
        self.pool = await asyncpg.create_pool(
            dsn=os.getenv("DATABASE_URL"),
            min_size=5,
            max_size=20,
            command_timeout=60,
        )
        await self._init_tables()

    async def _init_tables(self):
        """初始化表结构"""
        async with self.pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    user_id BIGINT PRIMARY KEY,
                    username VARCHAR(255),
                    first_name VARCHAR(255),
                    language VARCHAR(10) DEFAULT 'zh',
                    is_premium BOOLEAN DEFAULT FALSE,
                    settings JSONB DEFAULT '{}',
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    last_active TIMESTAMPTZ DEFAULT NOW()
                );

                CREATE INDEX IF NOT EXISTS idx_users_last_active
                ON users(last_active);

                CREATE TABLE IF NOT EXISTS messages (
                    id BIGSERIAL PRIMARY KEY,
                    user_id BIGINT REFERENCES users(user_id),
                    chat_id BIGINT,
                    message_text TEXT,
                    message_type VARCHAR(50) DEFAULT 'text',
                    metadata JSONB DEFAULT '{}',
                    created_at TIMESTAMPTZ DEFAULT NOW()
                );

                CREATE INDEX IF NOT EXISTS idx_messages_user
                ON messages(user_id, created_at DESC);
            """)

    async def upsert_user(self, user_id: int, username: str, first_name: str):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO users (user_id, username, first_name, last_active)
                VALUES ($1, $2, $3, NOW())
                ON CONFLICT (user_id) DO UPDATE SET
                    username = EXCLUDED.username,
                    first_name = EXCLUDED.first_name,
                    last_active = NOW()
            """, user_id, username, first_name)

    async def get_user(self, user_id: int):
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(
                "SELECT * FROM users WHERE user_id = $1", user_id
            )

    async def close(self):
        if self.pool:
            await self.pool.close()


# 全局数据库实例
db = Database()

8.3 Redis 缓存层

# database/redis_cache.py
import redis.asyncio as redis
import json
import os


class RedisCache:
    def __init__(self):
        self.redis = None

    async def connect(self):
        self.redis = redis.from_url(
            os.getenv("REDIS_URL", "redis://localhost:6379"),
            decode_responses=True,
        )

    async def get_user_cache(self, user_id: int):
        """获取用户缓存"""
        data = await self.redis.get(f"user:{user_id}")
        return json.loads(data) if data else None

    async def set_user_cache(self, user_id: int, data: dict, ttl: int = 3600):
        """设置用户缓存(默认1小时过期)"""
        await self.redis.setex(
            f"user:{user_id}",
            ttl,
            json.dumps(data, ensure_ascii=False),
        )

    async def increment_counter(self, key: str, ttl: int = 86400):
        """递增计数器"""
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, ttl)
        results = await pipe.execute()
        return results[0]

    async def rate_limit_check(self, user_id: int, limit: int = 10, window: int = 60):
        """滑动窗口限流"""
        import time
        key = f"rate:{user_id}"
        now = time.time()
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, now - window)
        pipe.zadd(key, {str(now): now})
        pipe.zcard(key)
        pipe.expire(key, window)
        results = await pipe.execute()
        return results[2] <= limit

    async def close(self):
        if self.redis:
            await self.redis.close()


cache = RedisCache()

第九章:对接 AI 服务(OpenAI / Claude)

9.1 对接 OpenAI GPT

# services/openai_service.py
import os
import openai
from typing import List, Dict, Optional

client = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# 用户对话历史存储
user_conversations: Dict[int, List[Dict]] = {}
MAX_HISTORY = 20  # 最大对话轮数


async def chat_with_gpt(
    user_id: int,
    message: str,
    system_prompt: Optional[str] = None
) -> str:
    """与 GPT 对话"""

    # 获取用户对话历史
    if user_id not in user_conversations:
        user_conversations[user_id] = []

    history = user_conversations[user_id]

    # 构建消息列表
    messages = []

    # 系统提示
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    else:
        messages.append({
            "role": "system",
            "content": (
                "你是一个友好的 Telegram Bot 助手。"
                "用简洁、友好的语气回复用户。"
                "如果用户使用中文,用中文回复。"
                "回复应该简短,适合聊天场景。"
            )
        })

    # 添加历史消息
    messages.extend(history[-MAX_HISTORY:])

    # 添加当前消息
    messages.append({"role": "user", "content": message})

    try:
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            max_tokens=1000,
            temperature=0.7,
        )

        reply = response.choices[0].message.content

        # 更新对话历史
        history.append({"role": "user", "content": message})
        history.append({"role": "assistant", "content": reply})

        # 限制历史长度
        if len(history) > MAX_HISTORY * 2:
            user_conversations[user_id] = history[-MAX_HISTORY * 2:]

        return reply

    except openai.RateLimitError:
        return "⏳ AI 服务繁忙,请稍后再试。"
    except openai.APIError as e:
        return f"❌ AI 服务出错: {str(e)}"


def clear_history(user_id: int):
    """清除用户对话历史"""
    user_conversations.pop(user_id, None)

9.2 对接 Anthropic Claude

# services/claude_service.py
import os
import anthropic
from typing import List, Dict, Optional

client = anthropic.AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

user_conversations: Dict[int, List[Dict]] = {}
MAX_HISTORY = 20


async def chat_with_claude(
    user_id: int,
    message: str,
    system_prompt: Optional[str] = None
) -> str:
    """与 Claude 对话"""

    if user_id not in user_conversations:
        user_conversations[user_id] = []

    history = user_conversations[user_id]

    system = system_prompt or (
        "你是一个友好的 Telegram Bot 助手。"
        "用简洁、友好的语气回复用户。"
        "回复应该简短,适合聊天场景。"
    )

    messages = history[-MAX_HISTORY:] + [{"role": "user", "content": message}]

    try:
        response = await client.messages.create(
            model="claude-sonnet-4-5-20250929",
            max_tokens=1000,
            system=system,
            messages=messages,
        )

        reply = response.content[0].text

        history.append({"role": "user", "content": message})
        history.append({"role": "assistant", "content": reply})

        if len(history) > MAX_HISTORY * 2:
            user_conversations[user_id] = history[-MAX_HISTORY * 2:]

        return reply

    except anthropic.RateLimitError:
        return "⏳ AI 服务繁忙,请稍后再试。"
    except anthropic.APIError as e:
        return f"❌ AI 服务出错: {str(e)}"

9.3 在 Bot 中集成 AI 对话

# 在 bot.py 中添加
from services.openai_service import chat_with_gpt, clear_history
# 或者 from services.claude_service import chat_with_claude, clear_history


async def ai_chat_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """AI 对话处理"""
    user_id = update.effective_user.id
    message = update.message.text

    # 发送"正在输入"状态
    await context.bot.send_chat_action(
        chat_id=update.effective_chat.id,
        action="typing"
    )

    # 调用 AI
    reply = await chat_with_gpt(user_id, message)
    # 或者: reply = await chat_with_claude(user_id, message)

    # 发送回复(处理超长消息)
    if len(reply) > 4096:
        # Telegram 单条消息最长 4096 字符
        for i in range(0, len(reply), 4096):
            await update.message.reply_text(reply[i:i+4096])
    else:
        await update.message.reply_text(reply)


async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """/clear 命令 - 清除对话历史"""
    clear_history(update.effective_user.id)
    await update.message.reply_text("🗑️ 对话历史已清除!")


# 注册 handler
# app.add_handler(CommandHandler("clear", clear_context))
# app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, ai_chat_handler))

9.4 多 AI 模型切换

# services/ai_router.py - AI 路由器
from enum import Enum
from services.openai_service import chat_with_gpt
from services.claude_service import chat_with_claude

class AIModel(Enum):
    GPT4 = "gpt-4o"
    CLAUDE = "claude-sonnet"

# 用户模型偏好
user_model_prefs: Dict[int, AIModel] = {}


async def ai_chat(user_id: int, message: str) -> str:
    """根据用户偏好路由到不同 AI"""
    model = user_model_prefs.get(user_id, AIModel.GPT4)

    if model == AIModel.GPT4:
        return await chat_with_gpt(user_id, message)
    elif model == AIModel.CLAUDE:
        return await chat_with_claude(user_id, message)


async def switch_model(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """切换 AI 模型"""
    keyboard = [
        [InlineKeyboardButton("🤖 GPT-4o", callback_data="model_gpt4")],
        [InlineKeyboardButton("🧠 Claude", callback_data="model_claude")],
    ]
    await update.message.reply_text(
        "请选择 AI 模型:",
        reply_markup=InlineKeyboardMarkup(keyboard)
    )

第十章:对接支付系统

10.1 Telegram 内置支付

Telegram Bot 原生支持通过 Stripe、YooMoney 等支付渠道收款:

# payments/telegram_pay.py
from telegram import LabeledPrice, ShippingOption

PAYMENT_PROVIDER_TOKEN = os.getenv("PAYMENT_PROVIDER_TOKEN")


async def send_invoice(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """发送支付发票"""
    prices = [
        LabeledPrice("月度会员", 999),      # 金额单位:分
        LabeledPrice("新用户折扣", -200),    # 负数为折扣
    ]

    await context.bot.send_invoice(
        chat_id=update.effective_chat.id,
        title="Bot 高级会员",
        description="解锁全部高级功能,包括无限 AI 对话、优先响应等",
        payload="premium_monthly",           # 自定义数据,用于识别订单
        provider_token=PAYMENT_PROVIDER_TOKEN,
        currency="USD",
        prices=prices,
        start_parameter="premium",
        photo_url="https://example.com/premium.jpg",
        photo_width=640,
        photo_height=480,
        need_name=True,
        need_email=True,
        is_flexible=False,
    )


async def precheckout_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """预结账回调 - 必须在10秒内响应"""
    query = update.pre_checkout_query
    if query.invoice_payload != "premium_monthly":
        await query.answer(ok=False, error_message="出错了,请重试。")
    else:
        await query.answer(ok=True)


async def successful_payment(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """支付成功回调"""
    payment = update.message.successful_payment
    user_id = update.effective_user.id

    # 更新用户会员状态
    # await db.upgrade_user(user_id, "premium", days=30)

    await update.message.reply_text(
        "🎉 支付成功!\n\n"
        f"支付金额: {payment.total_amount / 100} {payment.currency}\n"
        "你的高级会员已激活,有效期 30 天。\n\n"
        "感谢你的支持!"
    )


# 注册支付相关 handler
# app.add_handler(CommandHandler("buy", send_invoice))
# app.add_handler(PreCheckoutQueryHandler(precheckout_callback))
# app.add_handler(MessageHandler(filters.SUCCESSFUL_PAYMENT, successful_payment))

10.2 对接第三方支付(USDT / 加密货币)

很多 Telegram Bot 需要接受加密货币支付。以下是对接 TON 区块链支付的示例架构:

# payments/crypto_pay.py - TON 支付示例
"""
TON 支付流程:
1. 用户点击"支付" → Bot 生成唯一订单ID
2. Bot 返回包含订单ID的 TON 转账地址(深链接或二维码)
3. 用户在 TON 钱包中完成转账
4. 后端监听 TON 区块链交易
5. 匹配到订单后,激活用户权益
"""

import hashlib
import time
from dataclasses import dataclass
from typing import Optional


@dataclass
class Order:
    order_id: str
    user_id: int
    amount: float
    currency: str
    status: str  # pending, paid, expired
    created_at: float
    paid_at: Optional[float] = None


# 订单存储(生产环境应使用数据库)
orders: dict = {}


async def create_crypto_order(
    update: Update, context: ContextTypes.DEFAULT_TYPE
):
    """创建加密货币支付订单"""
    user_id = update.effective_user.id
    amount_ton = 5.0  # 5 TON

    # 生成唯一订单ID
    order_id = hashlib.sha256(
        f"{user_id}:{time.time()}".encode()
    ).hexdigest()[:16]

    order = Order(
        order_id=order_id,
        user_id=user_id,
        amount=amount_ton,
        currency="TON",
        status="pending",
        created_at=time.time(),
    )
    orders[order_id] = order

    # TON 支付深链接(memo 中包含订单ID用于匹配)
    ton_address = os.getenv("TON_WALLET_ADDRESS")
    ton_link = f"ton://transfer/{ton_address}?amount={int(amount_ton * 1e9)}&text={order_id}"

    keyboard = [
        [InlineKeyboardButton("💎 使用 TON 支付", url=ton_link)],
        [InlineKeyboardButton("🔄 检查支付状态", callback_data=f"check_{order_id}")],
    ]

    await update.message.reply_text(
        f"📋 *订单信息*\n\n"
        f"订单号: `{order_id}`\n"
        f"金额: {amount_ton} TON\n"
        f"收款地址:\n`{ton_address}`\n\n"
        f"请在 30 分钟内完成支付。\n"
        f"备注(memo)中务必填写订单号!",
        parse_mode="Markdown",
        reply_markup=InlineKeyboardMarkup(keyboard),
    )

第十一章:对接消息队列与任务调度

11.1 使用 Celery 处理异步任务

# tasks/celery_config.py
from celery import Celery
import os

celery_app = Celery(
    "bot_tasks",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/1"),
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_soft_time_limit=300,
    task_time_limit=600,
)


# tasks/worker.py
from tasks.celery_config import celery_app
import httpx


@celery_app.task(bind=True, max_retries=3)
def send_notification(self, chat_id: int, text: str):
    """异步发送通知"""
    try:
        token = os.getenv("BOT_TOKEN")
        url = f"https://api.telegram.org/bot{token}/sendMessage"
        response = httpx.post(url, json={
            "chat_id": chat_id,
            "text": text,
            "parse_mode": "Markdown",
        })
        response.raise_for_status()
    except Exception as exc:
        self.retry(exc=exc, countdown=60)


@celery_app.task
def process_image(file_path: str, user_id: int):
    """异步处理图片(如 AI 识别、压缩等)"""
    # ... 图片处理逻辑
    pass


@celery_app.task
def generate_report(user_id: int, report_type: str):
    """异步生成报告"""
    # ... 报告生成逻辑
    pass

11.2 定时任务(APScheduler)

# scheduler/jobs.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")


async def daily_summary(context):
    """每日摘要推送"""
    # 获取所有订阅用户
    subscribers = await db.get_subscribers()
    stats = await db.get_daily_stats()

    for user_id in subscribers:
        try:
            await context.bot.send_message(
                chat_id=user_id,
                text=(
                    "📊 *每日摘要*\n\n"
                    f"今日新增用户: {stats['new_users']}\n"
                    f"今日消息数: {stats['messages']}\n"
                    f"活跃用户: {stats['active_users']}"
                ),
                parse_mode="Markdown",
            )
        except Exception as e:
            logger.error(f"Failed to send summary to {user_id}: {e}")


async def cleanup_expired_orders():
    """清理过期订单"""
    expired = await db.get_expired_orders(hours=24)
    for order in expired:
        await db.update_order_status(order.id, "expired")
    logger.info(f"Cleaned up {len(expired)} expired orders")


def setup_scheduler(application):
    """配置定时任务"""
    # 每天早上 9 点发送摘要
    scheduler.add_job(
        daily_summary,
        CronTrigger(hour=9, minute=0),
        args=[application],
        id="daily_summary",
    )

    # 每小时清理过期订单
    scheduler.add_job(
        cleanup_expired_orders,
        CronTrigger(minute=0),
        id="cleanup_orders",
    )

    scheduler.start()

第十二章:部署方案全解析

12.1 Docker 部署(推荐)

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 创建非 root 用户
RUN useradd -m botuser
USER botuser

# 启动 Bot
CMD ["python", "bot.py"]
# docker-compose.yml
version: "3.8"

services:
  bot:
    build: .
    restart: unless-stopped
    env_file: .env
    depends_on:
      - postgres
      - redis
    volumes:
      - ./downloads:/app/downloads
    networks:
      - bot-network
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

  postgres:
    image: postgres:16-alpine
    restart: unless-stopped
    environment:
      POSTGRES_DB: botdb
      POSTGRES_USER: botuser
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data
    networks:
      - bot-network

  redis:
    image: redis:7-alpine
    restart: unless-stopped
    command: redis-server --appendonly yes
    volumes:
      - redisdata:/data
    networks:
      - bot-network

  nginx:
    image: nginx:alpine
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/conf.d:/etc/nginx/conf.d
      - ./certbot/conf:/etc/letsencrypt
      - ./certbot/www:/var/www/certbot
    depends_on:
      - bot
    networks:
      - bot-network

volumes:
  pgdata:
  redisdata:

networks:
  bot-network:
    driver: bridge
# 部署命令
docker compose up -d --build

# 查看日志
docker compose logs -f bot

# 更新重启
docker compose pull
docker compose up -d --build

# 备份数据库
docker compose exec postgres pg_dump -U botuser botdb > backup.sql

12.2 Systemd 服务部署

如果不使用 Docker,可以配置 systemd 服务:

# /etc/systemd/system/telegram-bot.service
[Unit]
Description=Telegram Bot Service
After=network.target postgresql.service redis.service

[Service]
Type=simple
User=botuser
Group=botuser
WorkingDirectory=/home/botuser/telegram-bot
EnvironmentFile=/home/botuser/telegram-bot/.env
ExecStart=/home/botuser/telegram-bot/venv/bin/python bot.py
Restart=always
RestartSec=10
StartLimitBurst=5
StartLimitIntervalSec=60

# 安全限制
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/home/botuser/telegram-bot

# 日志
StandardOutput=journal
StandardError=journal
SyslogIdentifier=telegram-bot

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable telegram-bot
sudo systemctl start telegram-bot
sudo systemctl status telegram-bot
sudo journalctl -u telegram-bot -f  # 查看日志

12.3 Serverless 部署(AWS Lambda)

# lambda_handler.py - AWS Lambda Webhook 处理
import json
import os
from telegram import Update, Bot
from telegram.ext import Application

# 冷启动时初始化
bot = Bot(token=os.getenv("BOT_TOKEN"))
app = Application.builder().token(os.getenv("BOT_TOKEN")).build()

# 注册 handlers...

def lambda_handler(event, context):
    """AWS Lambda 入口"""
    try:
        body = json.loads(event.get("body", "{}"))
        update = Update.de_json(body, bot)

        # 异步处理
        import asyncio
        asyncio.get_event_loop().run_until_complete(
            app.process_update(update)
        )

        return {"statusCode": 200, "body": "OK"}
    except Exception as e:
        print(f"Error: {e}")
        return {"statusCode": 500, "body": str(e)}

第十三章:安全加固与最佳实践

13.1 Token 安全

# 永远不要这样做:
BOT_TOKEN = "7123456789:AAH..."  # ❌ 硬编码 Token

# 正确做法:
BOT_TOKEN = os.getenv("BOT_TOKEN")  # ✅ 从环境变量读取
if not BOT_TOKEN:
    raise ValueError("BOT_TOKEN 未设置!")

13.2 Webhook 安全验证

# 方式一:使用 Secret Token(推荐)
# Telegram 会在每个 Webhook 请求中包含 X-Telegram-Bot-Api-Secret-Token 头
secret_token = os.getenv("WEBHOOK_SECRET")

# 在 set_webhook 时指定
await bot.set_webhook(url=webhook_url, secret_token=secret_token)

# 方式二:限制来源 IP
# Telegram Webhook 请求来自以下 IP 段:
# 149.154.160.0/20
# 91.108.4.0/22

13.3 输入验证与防注入

import re
import html


def sanitize_input(text: str, max_length: int = 4096) -> str:
    """清理用户输入"""
    # 限制长度
    text = text[:max_length]
    # HTML 转义
    text = html.escape(text)
    # 移除控制字符
    text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', text)
    return text


def validate_command_args(args: list, expected_types: list) -> bool:
    """验证命令参数"""
    if len(args) != len(expected_types):
        return False
    for arg, expected_type in zip(args, expected_types):
        try:
            expected_type(arg)
        except (ValueError, TypeError):
            return False
    return True

13.4 防滥用策略

# security/anti_abuse.py
from collections import defaultdict
import time


class AntiAbuse:
    def __init__(self):
        self.blocked_users = set()
        self.warning_count = defaultdict(int)
        self.message_timestamps = defaultdict(list)

    def is_spam(self, user_id: int, window: int = 5, max_messages: int = 10) -> bool:
        """检测是否为垃圾消息(5秒内超过10条)"""
        now = time.time()
        timestamps = self.message_timestamps[user_id]
        timestamps.append(now)

        # 清理旧记录
        self.message_timestamps[user_id] = [
            t for t in timestamps if now - t < window
        ]

        if len(self.message_timestamps[user_id]) > max_messages:
            self.warning_count[user_id] += 1
            if self.warning_count[user_id] >= 3:
                self.blocked_users.add(user_id)
            return True
        return False

    def is_blocked(self, user_id: int) -> bool:
        return user_id in self.blocked_users

    def unblock(self, user_id: int):
        self.blocked_users.discard(user_id)
        self.warning_count[user_id] = 0


anti_abuse = AntiAbuse()


# 在 Bot 中使用
async def security_middleware(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """安全中间件 - 放在所有 handler 之前"""
    user_id = update.effective_user.id

    if anti_abuse.is_blocked(user_id):
        return  # 静默忽略被封禁用户

    if anti_abuse.is_spam(user_id):
        await update.message.reply_text(
            "⚠️ 消息发送过于频繁,请稍后再试。"
        )
        return

    # 继续处理...

第十四章:监控、日志与运维

14.1 结构化日志

# utils/logger.py
import logging
import json
from datetime import datetime


class JSONFormatter(logging.Formatter):
    """JSON 格式日志"""
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        if hasattr(record, "user_id"):
            log_entry["user_id"] = record.user_id
        if hasattr(record, "chat_id"):
            log_entry["chat_id"] = record.chat_id
        return json.dumps(log_entry, ensure_ascii=False)


def setup_logging():
    """配置日志"""
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # 控制台输出
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(JSONFormatter())
    logger.addHandler(console_handler)

    # 文件输出(带轮转)
    from logging.handlers import RotatingFileHandler
    file_handler = RotatingFileHandler(
        "logs/bot.log",
        maxBytes=10 * 1024 * 1024,  # 10MB
        backupCount=5,
        encoding="utf-8",
    )
    file_handler.setFormatter(JSONFormatter())
    logger.addHandler(file_handler)

14.2 健康检查端点

# health.py
from aiohttp import web
import time


async def health_check(request):
    """健康检查端点"""
    uptime = time.time() - app_start_time
    stats = await db.get_stats()

    return web.json_response({
        "status": "healthy",
        "uptime_seconds": int(uptime),
        "database": "connected",
        "redis": "connected",
        "total_users": stats["total_users"],
        "messages_today": stats["messages_today"],
    })


# 在 Docker 中配置健康检查
# docker-compose.yml:
#   healthcheck:
#     test: ["CMD", "curl", "-f", "http://localhost:8443/health"]
#     interval: 30s
#     timeout: 10s
#     retries: 3

14.3 Prometheus 指标收集

# metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 定义指标
MESSAGES_TOTAL = Counter(
    "bot_messages_total",
    "Total messages received",
    ["type", "command"],
)
RESPONSE_TIME = Histogram(
    "bot_response_seconds",
    "Response time in seconds",
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
ACTIVE_USERS = Gauge(
    "bot_active_users",
    "Number of active users",
)
ERRORS_TOTAL = Counter(
    "bot_errors_total",
    "Total errors",
    ["type"],
)

# 启动 Prometheus HTTP 服务器
start_http_server(9090)

# 在处理消息时记录指标
import time

async def metrics_middleware(update, context):
    start = time.time()
    MESSAGES_TOTAL.labels(type="text", command="none").inc()

    # ... 处理逻辑

    duration = time.time() - start
    RESPONSE_TIME.observe(duration)

第十五章:常见问题与排错指南

15.1 常见错误及解决方案

错误 原因 解决方案
Conflict: terminated by other getUpdates request 有多个实例在轮询 确保只有一个实例运行
Unauthorized Token 无效 检查 Token 是否正确,是否包含多余空格
Bad Request: chat not found 用户从未与 Bot 交互 用户必须先 /start 才能收到消息
Flood control exceeded 发送消息过于频繁 实现消息队列,控制发送速率
Connection timeout 网络问题 检查服务器网络,考虑使用代理
Webhook was already set Webhook 和 Polling 冲突 清除 Webhook 后再使用 Polling
SSL error 证书问题 检查 SSL 证书是否有效

15.2 调试技巧

# 1. 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)

# 2. 查看 Webhook 信息
import httpx
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/getWebhookInfo")
print(resp.json())

# 3. 手动删除 Webhook(切换回 Polling)
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/deleteWebhook")
print(resp.json())

# 4. 获取 Bot 信息
resp = httpx.get(f"https://api.telegram.org/bot{TOKEN}/getMe")
print(resp.json())

15.3 性能优化清单

  1. 使用连接池:数据库和 Redis 都应使用连接池
  2. 异步处理:所有 I/O 操作都应异步执行
  3. 消息队列:耗时操作放入消息队列异步处理
  4. 缓存策略:频繁读取的数据使用 Redis 缓存
  5. 批量操作:群发消息时使用批量发送而非逐条发送
  6. 限流保护:对所有用户操作实施频率限制
  7. 资源清理:定期清理过期数据和临时文件
  8. 日志级别:生产环境设置为 INFO 或 WARNING

附录:完整项目结构参考

telegram-bot/
├── bot.py                    # 主入口
├── config.py                 # 配置管理
├── requirements.txt          # Python 依赖
├── Dockerfile                # Docker 构建文件
├── docker-compose.yml        # Docker Compose 编排
├── .env                      # 环境变量(不提交到 Git)
├── .env.example              # 环境变量模板
├── .gitignore
│
├── handlers/                 # 消息处理器
│   ├── __init__.py
│   ├── commands.py           # 命令处理
│   ├── messages.py           # 消息处理
│   ├── callbacks.py          # 回调查询处理
│   └── conversations.py      # 对话流程
│
├── services/                 # 外部服务对接
│   ├── __init__.py
│   ├── openai_service.py     # OpenAI 对接
│   ├── claude_service.py     # Claude 对接
│   └── payment_service.py    # 支付服务
│
├── database/                 # 数据库
│   ├── __init__.py
│   ├── models.py             # 数据模型
│   ├── pg_db.py              # PostgreSQL
│   ├── redis_cache.py        # Redis 缓存
│   └── migrations/           # 数据库迁移
│
├── middleware/                # 中间件
│   ├── __init__.py
│   ├── auth.py               # 权限验证
│   ├── rate_limit.py         # 频率限制
│   └── logging.py            # 日志中间件
│
├── tasks/                    # 异步任务
│   ├── __init__.py
│   ├── celery_config.py
│   └── worker.py
│
├── scheduler/                # 定时任务
│   ├── __init__.py
│   └── jobs.py
│
├── utils/                    # 工具函数
│   ├── __init__.py
│   ├── helpers.py
│   └── validators.py
│
├── security/                 # 安全模块
│   ├── __init__.py
│   └── anti_abuse.py
│
├── nginx/                    # Nginx 配置
│   └── conf.d/
│       └── bot.conf
│
├── logs/                     # 日志目录
│
├── tests/                    # 测试
│   ├── test_handlers.py
│   ├── test_services.py
│   └── test_database.py
│
└── docs/                     # 文档
    ├── API.md
    └── DEPLOYMENT.md

总结

通过本文,我们完整地走过了 Telegram Bot 从创建到部署的全流程:

  1. 基础概念:理解了 Bot 的工作原理和核心概念
  2. 环境配置:搭建了完整的开发环境
  3. Bot 创建:通过 BotFather 创建并配置了 Bot
  4. 框架选型:对比了各语言的主流框架
  5. 代码实战:分别用 Python 和 Node.js 实现了完整的 Bot
  6. Webhook:配置了生产级的 Webhook 服务
  7. 数据库对接:实现了 SQLite、PostgreSQL、Redis 的完整对接
  8. AI 对接:集成了 OpenAI 和 Claude 的 AI 对话能力
  9. 支付对接:实现了 Telegram 原生支付和加密货币支付
  10. 任务调度:配置了异步任务和定时任务
  11. 部署方案:提供了 Docker、Systemd、Serverless 三种部署方案
  12. 安全加固:实现了完整的安全防护措施
  13. 监控运维:配置了日志、健康检查和指标收集

希望这篇教程能帮助你成功搭建自己的 Telegram Bot!如有问题,欢迎在评论区讨论。


相关资源:


本文首发于万先生,转载请注明出处。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐