PyCharm 关联 Linux 服务器完整教程

我会按照**“先配置远程开发环境→解决镜像源问题→实现代码自动同步→运行监控脚本→压测验证”**的完整流程讲解,解决你提到的清华源访问失败问题,并补充完整的psutil监控代码。


一、PyCharm 关联 Linux 服务器(两种方式,推荐第一种)

方式一:新版 Remote Development(强烈推荐)

核心优势:代码直接存在Linux服务器上,本地PyCharm只是远程界面,完全不需要手动同步,写的每一行代码实时保存在服务器,是目前最流畅的远程开发方式。

操作步骤:

  1. 打开PyCharm,欢迎页选择 Remote Development → SSH
  2. 点击 New Connection,填写服务器信息:
    • Host:服务器IP地址
    • Port:SSH端口(默认22)
    • Username:服务器登录用户名(如root)
    • 点击 Next,输入服务器密码(或选择密钥认证)
  3. 连接成功后,选择服务器上的项目目录(如/root/python_project
  4. 点击 Create,PyCharm会自动在服务器上部署远程开发环境
  5. 配置远程Python解释器:
    • 打开 File → Settings → Project: xxx → Python Interpreter
    • 点击右上角齿轮 → Add Interpreter → On SSH
    • 选择刚才创建的SSH连接,下一步
    • 选择服务器上的Python解释器路径(如/usr/bin/python3.9
    • 点击 Create 完成配置

✅ 效果:你在本地PyCharm写的所有代码,会实时自动保存到Linux服务器,运行、调试都直接在服务器上执行。


方式二:传统远程解释器+自动同步(代码本地+服务器双份)

如果需要代码同时保存在本地和服务器,可以用这种方式:

  1. 配置SSH连接:
    • 打开 File → Settings → Tools → SSH Configurations
    • 点击 +,填写服务器IP、端口、用户名、密码,测试连接成功
  2. 添加远程Python解释器:
    • 打开 File → Settings → Project: xxx → Python Interpreter
    • 点击右上角齿轮 → Add Interpreter → SSH Interpreter
    • 选择刚才创建的SSH连接,下一步
    • 选择服务器上的Python解释器路径(如/usr/bin/python3.9
    • 点击 Finish
  3. 配置自动同步:
    • 打开 Tools → Deployment → Configuration
    • 点击 + → SFTP,选择刚才的SSH连接
    • 切换到 Mappings 标签:
      • Local path:本地项目目录(如D:\python_project
      • Deployment path:服务器上的项目目录(如/root/python_project
    • 点击 OK
    • 开启自动上传:Tools → Deployment → Automatic Upload

✅ 效果:本地保存代码时,会自动同步到服务器对应的目录。如果同步失败,手动点击 Tools → Deployment → Upload to xxx 即可。


二、解决清华pip源访问失败+安装psutil

你遇到的https://pypi.tuna.tsinghua.edu.cn/simple访问失败,可以用以下国内替代镜像源解决:

1. 临时使用替代源安装psutil

# 阿里云镜像(最稳定)
pip3 install psutil -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

# 中科大镜像
pip3 install psutil -i https://pypi.mirrors.ustc.edu.cn/simple/ --trusted-host pypi.mirrors.ustc.edu.cn

2. 永久配置国内镜像源(一劳永逸)

在Linux服务器上执行以下命令,永久设置阿里云pip源:

mkdir -p ~/.pip
cat > ~/.pip/pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
trusted-host = mirrors.aliyun.com
timeout = 120
EOF

配置完成后,直接执行pip3 install psutil就会自动使用阿里云源。


三、psutil 系统监控完整代码(含IOPS/吞吐量计算)

我把你提到的CPU、内存、磁盘、网络监控,以及IOPS和吞吐量计算都整合到了一个可运行的脚本里,注释非常详细:

import psutil
import time

def get_cpu_info():
    """获取CPU信息"""
    print("="*50)
    print("CPU监控信息")
    print("="*50)
    # CPU核心数
    physical_cores = psutil.cpu_count(logical=False)  # 物理核心
    logical_cores = psutil.cpu_count(logical=True)    # 逻辑核心
    print(f"物理核心数: {physical_cores}")
    print(f"逻辑核心数: {logical_cores}")
    
    # 整体CPU使用率
    total_usage = psutil.cpu_percent(interval=1)
    print(f"整体CPU使用率: {total_usage}%")
    
    # 每个核心的使用率
    per_core_usage = psutil.cpu_percent(interval=1, percpu=True)
    for i, usage in enumerate(per_core_usage):
        print(f"核心{i}使用率: {usage}%")
    print()

def get_memory_info():
    """获取内存信息"""
    print("="*50)
    print("内存监控信息")
    print("="*50)
    # 物理内存
    mem = psutil.virtual_memory()
    print(f"总内存: {mem.total / 1024**3:.2f} GB")
    print(f"已用内存: {mem.used / 1024**3:.2f} GB")
    print(f"可用内存: {mem.available / 1024**3:.2f} GB")
    print(f"内存使用率: {mem.percent}%")
    
    # 交换分区
    swap = psutil.swap_memory()
    print(f"\n交换分区总大小: {swap.total / 1024**3:.2f} GB")
    print(f"交换分区已用: {swap.used / 1024**3:.2f} GB")
    print(f"交换分区使用率: {swap.percent}%")
    print()

def get_disk_info():
    """获取磁盘信息(含IOPS和吞吐量计算)"""
    print("="*50)
    print("磁盘监控信息")
    print("="*50)
    # 磁盘分区使用率
    partitions = psutil.disk_partitions()
    for part in partitions:
        if 'loop' not in part.device and part.fstype:
            usage = psutil.disk_usage(part.mountpoint)
            print(f"分区: {part.mountpoint}")
            print(f"  总大小: {usage.total / 1024**3:.2f} GB")
            print(f"  已用: {usage.used / 1024**3:.2f} GB")
            print(f"  使用率: {usage.percent}%")
    
    # 磁盘IO、IOPS和吞吐量计算
    print("\n磁盘IO统计:")
    # 先获取第一次IO数据
    io_start = psutil.disk_io_counters()
    time.sleep(1)
    # 再获取1秒后的IO数据
    io_end = psutil.disk_io_counters()
    
    # 计算IOPS
    read_iops = io_end.read_count - io_start.read_count
    write_iops = io_end.write_count - io_start.write_count
    total_iops = read_iops + write_iops
    
    # 计算吞吐量(字节/秒 → MB/秒)
    read_throughput = (io_end.read_bytes - io_start.read_bytes) / 1024**2
    write_throughput = (io_end.write_bytes - io_start.write_bytes) / 1024**2
    total_throughput = read_throughput + write_throughput
    
    print(f"  读IOPS: {read_iops} ops/s")
    print(f"  写IOPS: {write_iops} ops/s")
    print(f"  总IOPS: {total_iops} ops/s")
    print(f"  读吞吐量: {read_throughput:.2f} MB/s")
    print(f"  写吞吐量: {write_throughput:.2f} MB/s")
    print(f"  总吞吐量: {total_throughput:.2f} MB/s")
    print()

def get_network_info():
    """获取网络信息"""
    print("="*50)
    print("网络监控信息")
    print("="*50)
    # 网卡列表
    interfaces = psutil.net_if_addrs()
    for name, addrs in interfaces.items():
        if name == 'lo':
            continue  # 跳过回环网卡
        for addr in addrs:
            if addr.family == 2:  # IPv4地址
                print(f"网卡: {name}")
                print(f"  IP地址: {addr.address}")
                print(f"  子网掩码: {addr.netmask}")
    
    # 网络IO吞吐量
    print("\n网络IO统计:")
    net_start = psutil.net_io_counters()
    time.sleep(1)
    net_end = psutil.net_io_counters()
    
    bytes_sent = (net_end.bytes_sent - net_start.bytes_sent) / 1024**2
    bytes_recv = (net_end.bytes_recv - net_start.bytes_recv) / 1024**2
    
    print(f"  上传速度: {bytes_sent:.2f} MB/s")
    print(f"  下载速度: {bytes_recv:.2f} MB/s")
    print()

if __name__ == "__main__":
    # 循环监控,每5秒刷新一次
    while True:
        get_cpu_info()
        get_memory_info()
        get_disk_info()
        get_network_info()
        print("="*50)
        print("等待5秒后刷新...")
        print("="*50)
        time.sleep(5)
        # 清屏(Linux系统)
        print("\033c", end="")

四、stress-ng 系统压测常用命令

安装stress-ng:

yum install stress-ng -y

1. CPU压测(你提到的命令)

# 压测2个CPU核心,持续60秒
stress-ng --cpu 2 --timeout 60s

# 压测所有CPU核心,持续300秒
stress-ng --cpu 0 --timeout 300s

2. 内存压测

# 分配2GB内存进行压测,持续60秒
stress-ng --vm 2 --vm-bytes 1G --timeout 60s

3. 磁盘IO压测

# 磁盘读写压测,使用2个进程,每个进程写1GB文件
stress-ng --io 2 --hdd 2 --hdd-bytes 1G --timeout 60s

五、常见问题排查

  1. 代码没有自动同步到服务器

    • 检查是否开启了 Automatic Upload
    • 手动同步:右键项目 → Deployment → Upload to xxx
    • 检查Deployment的Mappings路径是否正确
  2. 运行代码提示找不到psutil

    • 确认是在服务器上安装的psutil,不是本地
    • 检查PyCharm是否选择了正确的远程Python解释器
  3. SSH连接失败

    • 检查服务器防火墙是否开放22端口:systemctl status firewalld
    • 临时关闭防火墙测试:systemctl stop firewalld
    • 确认服务器SSH服务正常运行:systemctl status sshd

paramiko 完整教程

纠正你提到的免密登录原理错误(这是安全基础,绝对不能搞反),然后按照原理→实操→代码→最佳实践的顺序讲解,所有代码都在你PPT的基础上做了生产级优化(加异常处理、资源释放、超时控制),直接复制就能用。


一、SSH免密登录正确原理

正确原理

  1. 客户端(A机器)生成一对密钥:公钥(id_rsa.pub)+ 私钥(id_rsa)
  2. 公钥发给服务器(B机器),存在服务器的 ~/.ssh/authorized_keys 文件里
  3. 私钥永远留在客户端自己手里,绝对不能发给任何人
  4. 登录时:
    • 服务器用公钥加密一个随机数发给客户端
    • 客户端用私钥解密这个随机数,发回给服务器
    • 服务器验证解密结果正确,就允许登录,不需要输入密码

二、SSH免密登录完整操作步骤(A登录B)

客户端A机器上执行以下命令:

# 1. 生成密钥对(一路回车,不要设置密码)
ssh-keygen -t rsa -b 4096

# 2. 把公钥推送到服务器B(替换成你的服务器IP和用户名)
ssh-copy-id root@192.168.88.102

# 3. 验证免密登录(不需要输入密码直接登录成功)
ssh root@192.168.88.102

✅ 验证成功后,paramiko 就可以用私钥实现免密远程操作了。


三、paramiko 核心功能1:远程执行命令

3.1 密码认证方式(PPT代码优化版)

你的PPT代码缺少异常处理和资源释放,生产环境会导致连接泄漏,优化后:

import paramiko

def ssh_exec_command(hostname, password, cmd, port=22, username="root", timeout=10):
    """
    远程执行命令(密码认证)
    :param hostname: 服务器IP
    :param password: 服务器密码
    :param cmd: 要执行的命令
    :param port: SSH端口,默认22
    :param username: 登录用户名,默认root
    :param timeout: 连接超时时间,默认10秒
    :return: (标准输出, 标准错误)
    """
    ssh = None
    try:
        # 创建SSH客户端对象
        ssh = paramiko.SSHClient()
        # 自动添加不在known_hosts中的主机密钥
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接服务器
        ssh.connect(
            hostname=hostname,
            port=port,
            username=username,
            password=password,
            timeout=timeout
        )
        # 执行命令
        stdin, stdout, stderr = ssh.exec_command(cmd)
        # 获取输出(decode解决中文乱码)
        stdout_str = stdout.read().decode("utf-8", errors="ignore")
        stderr_str = stderr.read().decode("utf-8", errors="ignore")
        return stdout_str, stderr_str
    except Exception as e:
        return "", f"执行失败: {str(e)}"
    finally:
        # 无论是否出错,都关闭连接
        if ssh:
            ssh.close()

# 调用示例
if __name__ == "__main__":
    stdout, stderr = ssh_exec_command(
        hostname="192.168.88.102",
        password="123456",
        cmd="df -h && free -h"  # 同时执行多个命令用&&分隔
    )
    print("标准输出:")
    print(stdout)
    if stderr:
        print("标准错误:")
        print(stderr)

3.2 免密认证方式(推荐,更安全)

不需要在代码里写密码,安全性更高:

import paramiko

def ssh_exec_command_key(hostname, cmd, port=22, username="root", key_path="~/.ssh/id_rsa", timeout=10):
    """
    远程执行命令(免密认证)
    :param key_path: 私钥文件路径,默认是当前用户的~/.ssh/id_rsa
    """
    ssh = None
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 加载私钥
        private_key = paramiko.RSAKey.from_private_key_file(os.path.expanduser(key_path))
        # 用私钥连接
        ssh.connect(
            hostname=hostname,
            port=port,
            username=username,
            pkey=private_key,
            timeout=timeout
        )
        stdin, stdout, stderr = ssh.exec_command(cmd)
        stdout_str = stdout.read().decode("utf-8", errors="ignore")
        stderr_str = stderr.read().decode("utf-8", errors="ignore")
        return stdout_str, stderr_str
    except Exception as e:
        return "", f"执行失败: {str(e)}"
    finally:
        if ssh:
            ssh.close()

# 调用示例
if __name__ == "__main__":
    stdout, stderr = ssh_exec_command_key(
        hostname="192.168.88.102",
        cmd="touch /tmp/paramiko_test.txt && ls -l /tmp/"
    )
    print(stdout)

四、paramiko 核心功能2:文件上传下载(SFTP)

4.1 密码认证方式

import paramiko
import os

def sftp_transfer(hostname, password, local_path, remote_path, is_upload=True, port=22, username="root", timeout=10):
    """
    SFTP文件传输(密码认证)
    :param is_upload: True=上传本地文件到服务器,False=下载服务器文件到本地
    """
    transport = None
    try:
        # 创建传输对象
        transport = paramiko.Transport((hostname, port))
        transport.connect(username=username, password=password)
        # 创建SFTP客户端
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        if is_upload:
            # 上传文件
            sftp.put(local_path, remote_path)
            print(f"上传成功: {local_path} -> {remote_path}")
        else:
            # 下载文件
            # 确保本地目录存在
            local_dir = os.path.dirname(local_path)
            if not os.path.exists(local_dir):
                os.makedirs(local_dir)
            sftp.get(remote_path, local_path)
            print(f"下载成功: {remote_path} -> {local_path}")
        return True
    except Exception as e:
        print(f"传输失败: {str(e)}")
        return False
    finally:
        if transport:
            transport.close()

# 调用示例
if __name__ == "__main__":
    # 上传本地文件到服务器
    sftp_transfer(
        hostname="192.168.88.102",
        password="123456",
        local_path="./local_file.txt",
        remote_path="/root/remote_file.txt",
        is_upload=True
    )
    
    # 下载服务器文件到本地
    sftp_transfer(
        hostname="192.168.88.102",
        password="123456",
        local_path="./downloaded_file.txt",
        remote_path="/root/server_file.txt",
        is_upload=False
    )

4.2 免密认证方式

import paramiko
import os

def sftp_transfer_key(hostname, local_path, remote_path, is_upload=True, port=22, username="root", key_path="~/.ssh/id_rsa", timeout=10):
    """
    SFTP文件传输(免密认证)
    """
    transport = None
    try:
        transport = paramiko.Transport((hostname, port))
        # 加载私钥
        private_key = paramiko.RSAKey.from_private_key_file(os.path.expanduser(key_path))
        transport.connect(username=username, pkey=private_key)
        sftp = paramiko.SFTPClient.from_transport(transport)
        
        if is_upload:
            sftp.put(local_path, remote_path)
            print(f"上传成功: {local_path} -> {remote_path}")
        else:
            local_dir = os.path.dirname(local_path)
            if not os.path.exists(local_dir):
                os.makedirs(local_dir)
            sftp.get(remote_path, local_path)
            print(f"下载成功: {remote_path} -> {local_path}")
        return True
    except Exception as e:
        print(f"传输失败: {str(e)}")
        return False
    finally:
        if transport:
            transport.close()

五、生产环境最佳实践

  1. 永远不要硬编码密码

    • 用环境变量存储密码:password = os.getenv("SERVER_PASSWORD")
    • 优先使用免密认证,完全避免密码出现在代码里
  2. 批量操作优化

    • 不要每次执行命令都新建连接,复用连接
    • 多台服务器可以用线程池并发执行,提高效率
  3. 安全加固

    • 私钥文件权限设置为600:chmod 600 ~/.ssh/id_rsa
    • 服务器禁用密码登录,只允许密钥登录
    • 设置SSH连接超时时间,防止僵死连接
  4. 异常处理

    • 捕获所有可能的异常(连接超时、认证失败、文件不存在等)
    • 用finally块确保连接和传输对象被关闭

六、常见问题排查

  1. 认证失败

    • 检查用户名、密码是否正确
    • 免密登录检查公钥是否正确推送到服务器的~/.ssh/authorized_keys
    • 检查服务器/etc/ssh/sshd_config是否开启了PubkeyAuthentication
  2. 连接超时

    • 检查服务器防火墙是否开放22端口
    • 检查服务器SSH服务是否运行:systemctl status sshd
    • 增加timeout参数的值
  3. 文件传输失败

    • 检查本地文件是否存在
    • 检查服务器目录是否有读写权限
    • 检查磁盘空间是否充足

七、扩展:批量监控多台服务器

结合你之前的psutil监控脚本,用paramiko批量执行监控命令:

# 服务器列表
servers = [
    {"hostname": "192.168.88.101", "password": "123456"},
    {"hostname": "192.168.88.102", "password": "123456"},
    {"hostname": "192.168.88.103", "password": "123456"},
]

# 监控命令
monitor_cmd = "free -h | grep Mem | awk '{print $3/$2*100}'"

# 批量执行
for server in servers:
    print(f"\n===== 服务器 {server['hostname']} 内存使用率 =====")
    stdout, stderr = ssh_exec_command(
        hostname=server["hostname"],
        password=server["password"],
        cmd=monitor_cmd
    )
    if stdout:
        print(f"内存使用率: {float(stdout.strip()):.2f}%")
    else:
        print(f"监控失败: {stderr}")

requests 模块完整教程(面试+生产通用)

我会在你PPT的基础上,纠正错误、补充生产级代码、讲清面试必问的区别,重点突出SRE工作中最常用的接口调用、企业微信告警功能,所有代码直接复制就能用。


一、模块介绍与安装

1. 核心作用

requests 是 Python 最流行的 HTTP 客户端库,用一行代码就能发送复杂的 HTTP 请求,SRE 工作中90%的场景都会用到:

  • 调用各类系统的 API 接口(监控、告警、CMDB)
  • 简单的网页爬虫(爬取监控数据、日志)
  • 企业微信/钉钉/飞书机器人告警
  • 自动化测试接口可用性

2. 安装(解决国内源问题)

# 临时用阿里云源安装(最稳定)
pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

# 永久配置国内源(一劳永逸)
mkdir -p ~/.pip
cat > ~/.pip/pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
trusted-host = mirrors.aliyun.com
timeout = 120
EOF

二、GET 请求(获取数据)

1. 基础 GET 请求

import requests

# 1. 定义请求地址
url = "https://www.baidu.com"

# 2. 发送 GET 请求,返回 Response 对象
response = requests.get(url)

# 3. 处理响应
print("状态码:", response.status_code)  # 200 表示成功
print("响应头:", response.headers)
print("响应内容(字符串):", response.text)

2. 解决中文乱码问题(PPT重点)

❌ 错误:直接打印 response.text 会出现乱码
✅ 正确:手动指定编码格式

# 方法1:设置响应编码
response.encoding = "utf-8"
print(response.text)

# 方法2:用字节流解码(推荐,更通用)
print(response.content.decode("utf-8"))

text 和 content 的区别(面试必问)

  • response.text:自动解码的字符串,编码由 requests 猜测,容易乱码
  • response.content:原始字节流,需要手动 decode("utf-8"),不会乱码

3. 携带请求头(绕过反爬虫)

百度等网站有反爬虫机制,会检查 User-Agent 请求头,识别出是爬虫就返回403。

import requests

url = "https://www.baidu.com"

# 伪造浏览器请求头
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
}

# 发送请求时携带 headers
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
print(response.text)

4. 携带查询参数(params)

GET 请求的参数是放在 URL 里的,比如 https://www.baidu.com/s?wd=运维,用 params 参数自动拼接,不用手动写 URL。

import requests

url = "https://www.baidu.com/s"
headers = {"User-Agent": "Mozilla/5.0 ..."}

# 查询参数,自动拼接成 ?wd=运维&pn=10
params = {
    "wd": "运维",
    "pn": 10  # 第2页
}

response = requests.get(url, headers=headers, params=params)
response.encoding = "utf-8"

# 把搜索结果保存到文件
with open("baidu_search.html", "w", encoding="utf-8") as f:
    f.write(response.text)

三、POST 请求(提交数据)

1. 核心区别:GET vs POST(面试必背)

特性 GET POST
数据位置 URL 查询参数 请求体(Request Body)
数据大小 有限制(浏览器一般2KB) 理论无限制
安全性 数据暴露在URL中,不安全 数据在请求体中,相对安全
缓存 浏览器会缓存 默认不缓存
用途 获取数据(查询、浏览) 提交数据(登录、上传、修改)

2. 发送 JSON 格式数据(最常用)

现在90%的 API 接口都接收 JSON 格式的数据,用 json 参数自动序列化。

import requests

url = "https://httpbin.org/post"  # 测试接口,会返回你发送的数据

# 要提交的数据
data = {
    "username": "admin",
    "password": "123456"
}

# 发送 POST 请求,json 参数自动设置 Content-Type: application/json
response = requests.post(url, json=data)

print("状态码:", response.status_code)
print("响应内容:", response.json())  # 自动解析 JSON 响应为字典

3. 发送表单格式数据

如果接口接收的是表单格式(application/x-www-form-urlencoded),用 data 参数。

import requests

url = "https://httpbin.org/post"
data = {
    "username": "admin",
    "password": "123456"
}

# data 参数会自动编码为表单格式
response = requests.post(url, data=data)
print(response.json())

data 和 json 参数的区别(面试必问)

  • json=data:自动将字典序列化为 JSON 字符串,设置 Content-Type: application/json
  • data=data:自动将字典编码为表单格式,设置 Content-Type: application/x-www-form-urlencoded

四、Response 对象常用属性

response = requests.get("https://www.baidu.com")

print(response.status_code)  # 状态码:200成功,404不存在,500服务器错误
print(response.text)         # 字符串格式响应内容
print(response.content)      # 字节流格式响应内容
print(response.json())       # 自动解析 JSON 为字典(仅当响应是 JSON 时)
print(response.headers)      # 响应头(字典)
print(response.cookies)      # 响应 Cookies
print(response.url)          # 最终请求的 URL
print(response.elapsed)      # 请求耗时(用于监控接口响应时间)

五、生产环境最佳实践(SRE 必学)

1. 设置超时时间(必须加)

永远不要写没有超时的请求,否则会导致程序永远卡住。

# 超时时间10秒,超过就抛出异常
response = requests.get("https://www.baidu.com", timeout=10)

2. 异常处理(必须加)

网络请求随时可能失败,必须捕获异常。

import requests

def safe_request(url, method="get", **kwargs):
    try:
        response = requests.request(method, url, timeout=10, **kwargs)
        response.raise_for_status()  # 状态码不是200就抛出异常
        return response
    except requests.exceptions.Timeout:
        print("请求超时")
        return None
    except requests.exceptions.ConnectionError:
        print("连接失败")
        return None
    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
        return None
    except Exception as e:
        print(f"未知错误: {e}")
        return None

# 调用示例
response = safe_request("https://www.baidu.com")
if response:
    print(response.text)

3. 会话复用(提升性能)

多次请求同一个域名时,用 Session 对象复用 TCP 连接,减少握手开销。

import requests

# 创建会话对象
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 ..."})

# 同一个会话的所有请求都会复用连接和 cookies
response1 = session.get("https://www.baidu.com")
response2 = session.get("https://www.baidu.com/s?wd=运维")

session.close()

六、SRE 实战:企业微信机器人告警(最常用)

这是你工作中每天都会用到的功能,把监控告警、脚本执行结果推送到企业微信群。

1. 准备工作

  1. 打开企业微信群 → 右上角三个点 → 添加机器人
  2. 新建一个机器人,复制 Webhook 地址(类似 https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx

2. 完整告警函数

import requests
import json

def send_wechat_alert(webhook_url, content, msg_type="text", mentioned_list=None):
    """
    发送企业微信机器人告警
    :param webhook_url: 机器人 Webhook 地址
    :param content: 告警内容
    :param msg_type: 消息类型:text/markdown
    :param mentioned_list: @的人列表,["@all"] 表示@所有人
    """
    headers = {"Content-Type": "application/json"}
    
    if msg_type == "text":
        data = {
            "msgtype": "text",
            "text": {
                "content": content,
                "mentioned_list": mentioned_list or []
            }
        }
    elif msg_type == "markdown":
        data = {
            "msgtype": "markdown",
            "markdown": {
                "content": content
            }
        }
    else:
        print("不支持的消息类型")
        return False
    
    try:
        response = requests.post(webhook_url, headers=headers, json=data, timeout=10)
        response.raise_for_status()
        result = response.json()
        if result["errcode"] == 0:
            print("告警发送成功")
            return True
        else:
            print(f"告警发送失败: {result['errmsg']}")
            return False
    except Exception as e:
        print(f"告警发送异常: {e}")
        return False

# 调用示例
if __name__ == "__main__":
    # 替换成你的机器人 Webhook 地址
    WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"
    
    # 发送文本告警并@所有人
    send_wechat_alert(
        WEBHOOK_URL,
        "【服务器告警】192.168.88.101 CPU使用率超过90%",
        mentioned_list=["@all"]
    )
    
    # 发送 Markdown 格式告警
    markdown_content = """
## 服务器磁盘告警
- **服务器IP**: 192.168.88.101
- **磁盘分区**: /
- **使用率**: 95%
- **剩余空间**: 2.3GB
"""
    send_wechat_alert(WEBHOOK_URL, markdown_content, msg_type="markdown")

七、常见问题排查

  1. 中文乱码:永远用 response.content.decode("utf-8"),不要用 response.text
  2. 403 Forbidden:添加 User-Agent 请求头,模拟浏览器访问
  3. 连接超时:检查网络是否通,增加 timeout 参数
  4. JSON 解析失败:先打印 response.text 看响应内容是不是合法的 JSON
  5. 接口返回401 Unauthorized:需要在请求头中添加 Authorization 认证信息

完整生产级系统资源监控告警脚本

我在你PPT代码的基础上做了生产级优化,解决了原代码的所有问题(缺少日志文件定义、单磁盘检测、重复告警轰炸、异常处理缺失等),直接复制就能用。

一、完整可运行代码

import psutil
import time
import requests
from datetime import datetime
import os

# ====================== 配置区(根据你的需求修改) ======================
# 资源阈值
CPU_THRESHOLD = 80       # CPU使用率阈值(%)
MEMORY_THRESHOLD = 90    # 内存使用率阈值(%)
DISK_THRESHOLD = 85      # 磁盘使用率阈值(%)

# 企业微信机器人Webhook地址(替换成你的)
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"

# 日志文件路径
LOG_FILE = "/var/log/system_monitor.log"

# 告警防抖:同一种资源5分钟内只告警一次(避免轰炸)
ALERT_INTERVAL = 300  # 单位:秒

# 监控间隔
CHECK_INTERVAL = 5    # 单位:秒
# ======================================================================

# 记录上次告警时间,用于防抖
last_alert_time = {
    "CPU": 0,
    "内存": 0,
    "磁盘": 0
}

def log_alert(resource_type, usage, threshold):
    """记录告警信息到日志文件"""
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    message = f"[{current_time}] {resource_type}使用率过高,超过阈值{threshold}%,当前使用率为{usage:.1f}%\n"
    
    # 确保日志目录存在
    log_dir = os.path.dirname(LOG_FILE)
    if log_dir and not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # 追加写入日志
    with open(LOG_FILE, 'a', encoding='utf-8') as f:
        f.write(message)
    
    print(message.strip())

def send_wechat_alert(message, mentioned_list=None):
    """
    发送企业微信告警
    :param message: 告警内容
    :param mentioned_list: @的人列表,["@all"]表示@所有人
    """
    headers = {"Content-Type": "application/json"}
    data = {
        "msgtype": "text",
        "text": {
            "content": message,
            "mentioned_list": mentioned_list or []
        }
    }
    
    try:
        response = requests.post(WEBHOOK_URL, headers=headers, json=data, timeout=10)
        if response.status_code == 200:
            result = response.json()
            if result["errcode"] == 0:
                print("✅ 企业微信告警发送成功")
                return True
            else:
                print(f"❌ 企业微信告警发送失败: {result['errmsg']}")
                return False
        else:
            print(f"❌ 企业微信接口返回错误状态码: {response.status_code}")
            return False
    except Exception as e:
        print(f"❌ 企业微信接口调用异常: {str(e)}")
        return False

def should_alert(resource_type):
    """判断是否应该发送告警(防抖逻辑)"""
    current_time = time.time()
    if current_time - last_alert_time[resource_type] > ALERT_INTERVAL:
        last_alert_time[resource_type] = current_time
        return True
    return False

def check_cpu():
    """检测CPU使用率"""
    try:
        cpu_usage = psutil.cpu_percent(interval=1)
        if cpu_usage > CPU_THRESHOLD:
            log_alert("CPU", cpu_usage, CPU_THRESHOLD)
            if should_alert("CPU"):
                alert_msg = f"【服务器CPU告警】\n服务器IP: {get_local_ip()}\nCPU使用率: {cpu_usage:.1f}%\n阈值: {CPU_THRESHOLD}%"
                send_wechat_alert(alert_msg, mentioned_list=["@all"])
        return cpu_usage
    except Exception as e:
        print(f"❌ CPU检测失败: {str(e)}")
        return 0

def check_memory():
    """检测内存使用率"""
    try:
        mem = psutil.virtual_memory()
        mem_usage = mem.percent
        if mem_usage > MEMORY_THRESHOLD:
            log_alert("内存", mem_usage, MEMORY_THRESHOLD)
            if should_alert("内存"):
                alert_msg = f"【服务器内存告警】\n服务器IP: {get_local_ip()}\n内存使用率: {mem_usage:.1f}%\n阈值: {MEMORY_THRESHOLD}%\n剩余内存: {mem.available/1024**3:.2f}GB"
                send_wechat_alert(alert_msg, mentioned_list=["@all"])
        return mem_usage
    except Exception as e:
        print(f"❌ 内存检测失败: {str(e)}")
        return 0

def check_disk():
    """检测所有磁盘分区使用率"""
    try:
        partitions = psutil.disk_partitions()
        max_disk_usage = 0
        alert_partitions = []
        
        for part in partitions:
            # 跳过虚拟文件系统和只读分区
            if 'loop' in part.device or 'tmpfs' in part.fstype or part.opts.startswith('ro'):
                continue
            
            try:
                usage = psutil.disk_usage(part.mountpoint)
                disk_usage = usage.percent
                max_disk_usage = max(max_disk_usage, disk_usage)
                
                if disk_usage > DISK_THRESHOLD:
                    alert_partitions.append(f"{part.mountpoint}: {disk_usage:.1f}%")
            except PermissionError:
                continue
        
        if alert_partitions:
            log_alert("磁盘", max_disk_usage, DISK_THRESHOLD)
            if should_alert("磁盘"):
                alert_msg = f"【服务器磁盘告警】\n服务器IP: {get_local_ip()}\n以下分区使用率超过阈值{DISK_THRESHOLD}%:\n" + "\n".join(alert_partitions)
                send_wechat_alert(alert_msg, mentioned_list=["@all"])
        
        return max_disk_usage
    except Exception as e:
        print(f"❌ 磁盘检测失败: {str(e)}")
        return 0

def get_local_ip():
    """获取本机IP地址"""
    try:
        import socket
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except:
        return "127.0.0.1"

def check_resource_usage():
    """检测所有资源"""
    print(f"\n===== {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 资源检测 =====")
    
    cpu = check_cpu()
    mem = check_memory()
    disk = check_disk()
    
    print(f"CPU使用率: {cpu:.1f}% | 内存使用率: {mem:.1f}% | 磁盘最高使用率: {disk:.1f}%")

if __name__ == "__main__":
    print("🚀 系统资源监控告警程序启动")
    print(f"📌 监控阈值: CPU>{CPU_THRESHOLD}% 内存>{MEMORY_THRESHOLD}% 磁盘>{DISK_THRESHOLD}%")
    print(f"📌 日志文件: {LOG_FILE}")
    print(f"📌 告警间隔: {ALERT_INTERVAL//60}分钟")
    print("="*60)
    
    try:
        while True:
            check_resource_usage()
            time.sleep(CHECK_INTERVAL)
    except KeyboardInterrupt:
        print("\n👋 监控程序已停止")
    except Exception as e:
        print(f"\n❌ 程序异常退出: {str(e)}")
        # 程序崩溃时也发送告警
        send_wechat_alert(f"【严重告警】服务器{get_local_ip()}的监控程序异常退出: {str(e)}", mentioned_list=["@all"])

二、核心优化点(比PPT代码好用10倍)

  1. 告警防抖:同一种资源5分钟内只告警一次,避免短时间内收到几十条告警轰炸
  2. 多磁盘检测:自动检测所有实际磁盘分区,排除虚拟文件系统,不会漏掉/data等数据分区
  3. 完整异常处理:任何环节出错都不会导致程序崩溃,程序崩溃时还会发送告警通知
  4. 自动获取本机IP:告警信息里自动带上服务器IP,多服务器部署时一眼知道是哪台出问题
  5. 日志自动创建:自动创建日志目录和文件,不用手动提前创建
  6. 详细告警信息:内存告警显示剩余内存,磁盘告警显示所有超标分区
  7. 优雅退出:按Ctrl+C可以正常停止程序

三、部署运行步骤

1. 安装依赖

pip3 install psutil requests -i https://mirrors.aliyun.com/pypi/simple/

2. 修改配置

把代码开头的WEBHOOK_URL替换成你自己的企业微信机器人Webhook地址,根据需要调整阈值。

3. 测试运行

# 前台运行测试
python3 system_monitor.py

测试告警:新开一个终端,用stress-ng压测CPU,看是否会触发告警

# 压测2个CPU核心,持续60秒
stress-ng --cpu 2 --timeout 60s

4. 后台运行(生产环境)

# 后台运行,输出重定向到null
nohup python3 system_monitor.py > /dev/null 2>&1 &

# 查看进程是否运行
ps aux | grep system_monitor

5. 设置开机自启

创建systemd服务文件:

cat > /etc/systemd/system/system-monitor.service << EOF
[Unit]
Description=System Resource Monitor
After=network.target

[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /root/system_monitor.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

启用并启动服务:

systemctl daemon-reload
systemctl enable system-monitor
systemctl start system-monitor

# 查看服务状态
systemctl status system-monitor

四、查看日志

# 实时查看日志
tail -f /var/log/system_monitor.log

# 查看所有告警记录
grep "过高" /var/log/system_monitor.log

五、扩展功能(按需添加)

  1. 添加网络监控:检测网卡流量、网络连接数
  2. 添加进程监控:检测关键进程是否在运行(比如nginx、mysql)
  3. 添加邮件告警:同时发送邮件和企业微信告警
  4. 添加磁盘IO监控:检测磁盘IOPS和吞吐量
  5. 添加历史数据统计:把监控数据写入InfluxDB,用Grafana做可视化
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐