Linux运维:psutil和paramiko和request
PyCharm 关联 Linux 服务器完整教程
我会按照**“先配置远程开发环境→解决镜像源问题→实现代码自动同步→运行监控脚本→压测验证”**的完整流程讲解,解决你提到的清华源访问失败问题,并补充完整的psutil监控代码。
一、PyCharm 关联 Linux 服务器(两种方式,推荐第一种)
方式一:新版 Remote Development(强烈推荐)
核心优势:代码直接存在Linux服务器上,本地PyCharm只是远程界面,完全不需要手动同步,写的每一行代码实时保存在服务器,是目前最流畅的远程开发方式。
操作步骤:
- 打开PyCharm,欢迎页选择 Remote Development → SSH
- 点击 New Connection,填写服务器信息:
- Host:服务器IP地址
- Port:SSH端口(默认22)
- Username:服务器登录用户名(如root)
- 点击 Next,输入服务器密码(或选择密钥认证)
- 连接成功后,选择服务器上的项目目录(如
/root/python_project) - 点击 Create,PyCharm会自动在服务器上部署远程开发环境
- 配置远程Python解释器:
- 打开 File → Settings → Project: xxx → Python Interpreter
- 点击右上角齿轮 → Add Interpreter → On SSH
- 选择刚才创建的SSH连接,下一步
- 选择服务器上的Python解释器路径(如
/usr/bin/python3.9) - 点击 Create 完成配置
✅ 效果:你在本地PyCharm写的所有代码,会实时自动保存到Linux服务器,运行、调试都直接在服务器上执行。
方式二:传统远程解释器+自动同步(代码本地+服务器双份)
如果需要代码同时保存在本地和服务器,可以用这种方式:
- 配置SSH连接:
- 打开 File → Settings → Tools → SSH Configurations
- 点击 +,填写服务器IP、端口、用户名、密码,测试连接成功
- 添加远程Python解释器:
- 打开 File → Settings → Project: xxx → Python Interpreter
- 点击右上角齿轮 → Add Interpreter → SSH Interpreter
- 选择刚才创建的SSH连接,下一步
- 选择服务器上的Python解释器路径(如
/usr/bin/python3.9) - 点击 Finish
- 配置自动同步:
- 打开 Tools → Deployment → Configuration
- 点击 + → SFTP,选择刚才的SSH连接
- 切换到 Mappings 标签:
- Local path:本地项目目录(如
D:\python_project) - Deployment path:服务器上的项目目录(如
/root/python_project)
- Local path:本地项目目录(如
- 点击 OK
- 开启自动上传:Tools → Deployment → Automatic Upload
✅ 效果:本地保存代码时,会自动同步到服务器对应的目录。如果同步失败,手动点击 Tools → Deployment → Upload to xxx 即可。
二、解决清华pip源访问失败+安装psutil
你遇到的https://pypi.tuna.tsinghua.edu.cn/simple访问失败,可以用以下国内替代镜像源解决:
1. 临时使用替代源安装psutil
# 阿里云镜像(最稳定)
pip3 install psutil -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
# 中科大镜像
pip3 install psutil -i https://pypi.mirrors.ustc.edu.cn/simple/ --trusted-host pypi.mirrors.ustc.edu.cn
2. 永久配置国内镜像源(一劳永逸)
在Linux服务器上执行以下命令,永久设置阿里云pip源:
mkdir -p ~/.pip
cat > ~/.pip/pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
trusted-host = mirrors.aliyun.com
timeout = 120
EOF
配置完成后,直接执行pip3 install psutil就会自动使用阿里云源。
三、psutil 系统监控完整代码(含IOPS/吞吐量计算)
我把你提到的CPU、内存、磁盘、网络监控,以及IOPS和吞吐量计算都整合到了一个可运行的脚本里,注释非常详细:
import psutil
import time
def get_cpu_info():
"""获取CPU信息"""
print("="*50)
print("CPU监控信息")
print("="*50)
# CPU核心数
physical_cores = psutil.cpu_count(logical=False) # 物理核心
logical_cores = psutil.cpu_count(logical=True) # 逻辑核心
print(f"物理核心数: {physical_cores}")
print(f"逻辑核心数: {logical_cores}")
# 整体CPU使用率
total_usage = psutil.cpu_percent(interval=1)
print(f"整体CPU使用率: {total_usage}%")
# 每个核心的使用率
per_core_usage = psutil.cpu_percent(interval=1, percpu=True)
for i, usage in enumerate(per_core_usage):
print(f"核心{i}使用率: {usage}%")
print()
def get_memory_info():
"""获取内存信息"""
print("="*50)
print("内存监控信息")
print("="*50)
# 物理内存
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / 1024**3:.2f} GB")
print(f"已用内存: {mem.used / 1024**3:.2f} GB")
print(f"可用内存: {mem.available / 1024**3:.2f} GB")
print(f"内存使用率: {mem.percent}%")
# 交换分区
swap = psutil.swap_memory()
print(f"\n交换分区总大小: {swap.total / 1024**3:.2f} GB")
print(f"交换分区已用: {swap.used / 1024**3:.2f} GB")
print(f"交换分区使用率: {swap.percent}%")
print()
def get_disk_info():
"""获取磁盘信息(含IOPS和吞吐量计算)"""
print("="*50)
print("磁盘监控信息")
print("="*50)
# 磁盘分区使用率
partitions = psutil.disk_partitions()
for part in partitions:
if 'loop' not in part.device and part.fstype:
usage = psutil.disk_usage(part.mountpoint)
print(f"分区: {part.mountpoint}")
print(f" 总大小: {usage.total / 1024**3:.2f} GB")
print(f" 已用: {usage.used / 1024**3:.2f} GB")
print(f" 使用率: {usage.percent}%")
# 磁盘IO、IOPS和吞吐量计算
print("\n磁盘IO统计:")
# 先获取第一次IO数据
io_start = psutil.disk_io_counters()
time.sleep(1)
# 再获取1秒后的IO数据
io_end = psutil.disk_io_counters()
# 计算IOPS
read_iops = io_end.read_count - io_start.read_count
write_iops = io_end.write_count - io_start.write_count
total_iops = read_iops + write_iops
# 计算吞吐量(字节/秒 → MB/秒)
read_throughput = (io_end.read_bytes - io_start.read_bytes) / 1024**2
write_throughput = (io_end.write_bytes - io_start.write_bytes) / 1024**2
total_throughput = read_throughput + write_throughput
print(f" 读IOPS: {read_iops} ops/s")
print(f" 写IOPS: {write_iops} ops/s")
print(f" 总IOPS: {total_iops} ops/s")
print(f" 读吞吐量: {read_throughput:.2f} MB/s")
print(f" 写吞吐量: {write_throughput:.2f} MB/s")
print(f" 总吞吐量: {total_throughput:.2f} MB/s")
print()
def get_network_info():
"""获取网络信息"""
print("="*50)
print("网络监控信息")
print("="*50)
# 网卡列表
interfaces = psutil.net_if_addrs()
for name, addrs in interfaces.items():
if name == 'lo':
continue # 跳过回环网卡
for addr in addrs:
if addr.family == 2: # IPv4地址
print(f"网卡: {name}")
print(f" IP地址: {addr.address}")
print(f" 子网掩码: {addr.netmask}")
# 网络IO吞吐量
print("\n网络IO统计:")
net_start = psutil.net_io_counters()
time.sleep(1)
net_end = psutil.net_io_counters()
bytes_sent = (net_end.bytes_sent - net_start.bytes_sent) / 1024**2
bytes_recv = (net_end.bytes_recv - net_start.bytes_recv) / 1024**2
print(f" 上传速度: {bytes_sent:.2f} MB/s")
print(f" 下载速度: {bytes_recv:.2f} MB/s")
print()
if __name__ == "__main__":
# 循环监控,每5秒刷新一次
while True:
get_cpu_info()
get_memory_info()
get_disk_info()
get_network_info()
print("="*50)
print("等待5秒后刷新...")
print("="*50)
time.sleep(5)
# 清屏(Linux系统)
print("\033c", end="")
四、stress-ng 系统压测常用命令
安装stress-ng:
yum install stress-ng -y
1. CPU压测(你提到的命令)
# 压测2个CPU核心,持续60秒
stress-ng --cpu 2 --timeout 60s
# 压测所有CPU核心,持续300秒
stress-ng --cpu 0 --timeout 300s
2. 内存压测
# 分配2GB内存进行压测,持续60秒
stress-ng --vm 2 --vm-bytes 1G --timeout 60s
3. 磁盘IO压测
# 磁盘读写压测,使用2个进程,每个进程写1GB文件
stress-ng --io 2 --hdd 2 --hdd-bytes 1G --timeout 60s
五、常见问题排查
-
代码没有自动同步到服务器
- 检查是否开启了 Automatic Upload
- 手动同步:右键项目 → Deployment → Upload to xxx
- 检查Deployment的Mappings路径是否正确
-
运行代码提示找不到psutil
- 确认是在服务器上安装的psutil,不是本地
- 检查PyCharm是否选择了正确的远程Python解释器
-
SSH连接失败
- 检查服务器防火墙是否开放22端口:
systemctl status firewalld - 临时关闭防火墙测试:
systemctl stop firewalld - 确认服务器SSH服务正常运行:
systemctl status sshd
- 检查服务器防火墙是否开放22端口:
paramiko 完整教程
先纠正你提到的免密登录原理错误(这是安全基础,绝对不能搞反),然后按照原理→实操→代码→最佳实践的顺序讲解,所有代码都在你PPT的基础上做了生产级优化(加异常处理、资源释放、超时控制),直接复制就能用。
一、SSH免密登录正确原理
✅ 正确原理
- 客户端(A机器)生成一对密钥:公钥(id_rsa.pub)+ 私钥(id_rsa)
- 公钥发给服务器(B机器),存在服务器的
~/.ssh/authorized_keys文件里 - 私钥永远留在客户端自己手里,绝对不能发给任何人
- 登录时:
- 服务器用公钥加密一个随机数发给客户端
- 客户端用私钥解密这个随机数,发回给服务器
- 服务器验证解密结果正确,就允许登录,不需要输入密码
二、SSH免密登录完整操作步骤(A登录B)
在客户端A机器上执行以下命令:
# 1. 生成密钥对(一路回车,不要设置密码)
ssh-keygen -t rsa -b 4096
# 2. 把公钥推送到服务器B(替换成你的服务器IP和用户名)
ssh-copy-id root@192.168.88.102
# 3. 验证免密登录(不需要输入密码直接登录成功)
ssh root@192.168.88.102
✅ 验证成功后,paramiko 就可以用私钥实现免密远程操作了。
三、paramiko 核心功能1:远程执行命令
3.1 密码认证方式(PPT代码优化版)
你的PPT代码缺少异常处理和资源释放,生产环境会导致连接泄漏,优化后:
import paramiko
def ssh_exec_command(hostname, password, cmd, port=22, username="root", timeout=10):
"""
远程执行命令(密码认证)
:param hostname: 服务器IP
:param password: 服务器密码
:param cmd: 要执行的命令
:param port: SSH端口,默认22
:param username: 登录用户名,默认root
:param timeout: 连接超时时间,默认10秒
:return: (标准输出, 标准错误)
"""
ssh = None
try:
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加不在known_hosts中的主机密钥
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(
hostname=hostname,
port=port,
username=username,
password=password,
timeout=timeout
)
# 执行命令
stdin, stdout, stderr = ssh.exec_command(cmd)
# 获取输出(decode解决中文乱码)
stdout_str = stdout.read().decode("utf-8", errors="ignore")
stderr_str = stderr.read().decode("utf-8", errors="ignore")
return stdout_str, stderr_str
except Exception as e:
return "", f"执行失败: {str(e)}"
finally:
# 无论是否出错,都关闭连接
if ssh:
ssh.close()
# 调用示例
if __name__ == "__main__":
stdout, stderr = ssh_exec_command(
hostname="192.168.88.102",
password="123456",
cmd="df -h && free -h" # 同时执行多个命令用&&分隔
)
print("标准输出:")
print(stdout)
if stderr:
print("标准错误:")
print(stderr)
3.2 免密认证方式(推荐,更安全)
不需要在代码里写密码,安全性更高:
import paramiko
def ssh_exec_command_key(hostname, cmd, port=22, username="root", key_path="~/.ssh/id_rsa", timeout=10):
"""
远程执行命令(免密认证)
:param key_path: 私钥文件路径,默认是当前用户的~/.ssh/id_rsa
"""
ssh = None
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 加载私钥
private_key = paramiko.RSAKey.from_private_key_file(os.path.expanduser(key_path))
# 用私钥连接
ssh.connect(
hostname=hostname,
port=port,
username=username,
pkey=private_key,
timeout=timeout
)
stdin, stdout, stderr = ssh.exec_command(cmd)
stdout_str = stdout.read().decode("utf-8", errors="ignore")
stderr_str = stderr.read().decode("utf-8", errors="ignore")
return stdout_str, stderr_str
except Exception as e:
return "", f"执行失败: {str(e)}"
finally:
if ssh:
ssh.close()
# 调用示例
if __name__ == "__main__":
stdout, stderr = ssh_exec_command_key(
hostname="192.168.88.102",
cmd="touch /tmp/paramiko_test.txt && ls -l /tmp/"
)
print(stdout)
四、paramiko 核心功能2:文件上传下载(SFTP)
4.1 密码认证方式
import paramiko
import os
def sftp_transfer(hostname, password, local_path, remote_path, is_upload=True, port=22, username="root", timeout=10):
"""
SFTP文件传输(密码认证)
:param is_upload: True=上传本地文件到服务器,False=下载服务器文件到本地
"""
transport = None
try:
# 创建传输对象
transport = paramiko.Transport((hostname, port))
transport.connect(username=username, password=password)
# 创建SFTP客户端
sftp = paramiko.SFTPClient.from_transport(transport)
if is_upload:
# 上传文件
sftp.put(local_path, remote_path)
print(f"上传成功: {local_path} -> {remote_path}")
else:
# 下载文件
# 确保本地目录存在
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
sftp.get(remote_path, local_path)
print(f"下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
print(f"传输失败: {str(e)}")
return False
finally:
if transport:
transport.close()
# 调用示例
if __name__ == "__main__":
# 上传本地文件到服务器
sftp_transfer(
hostname="192.168.88.102",
password="123456",
local_path="./local_file.txt",
remote_path="/root/remote_file.txt",
is_upload=True
)
# 下载服务器文件到本地
sftp_transfer(
hostname="192.168.88.102",
password="123456",
local_path="./downloaded_file.txt",
remote_path="/root/server_file.txt",
is_upload=False
)
4.2 免密认证方式
import paramiko
import os
def sftp_transfer_key(hostname, local_path, remote_path, is_upload=True, port=22, username="root", key_path="~/.ssh/id_rsa", timeout=10):
"""
SFTP文件传输(免密认证)
"""
transport = None
try:
transport = paramiko.Transport((hostname, port))
# 加载私钥
private_key = paramiko.RSAKey.from_private_key_file(os.path.expanduser(key_path))
transport.connect(username=username, pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(transport)
if is_upload:
sftp.put(local_path, remote_path)
print(f"上传成功: {local_path} -> {remote_path}")
else:
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
sftp.get(remote_path, local_path)
print(f"下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
print(f"传输失败: {str(e)}")
return False
finally:
if transport:
transport.close()
五、生产环境最佳实践
-
永远不要硬编码密码:
- 用环境变量存储密码:
password = os.getenv("SERVER_PASSWORD") - 优先使用免密认证,完全避免密码出现在代码里
- 用环境变量存储密码:
-
批量操作优化:
- 不要每次执行命令都新建连接,复用连接
- 多台服务器可以用线程池并发执行,提高效率
-
安全加固:
- 私钥文件权限设置为600:
chmod 600 ~/.ssh/id_rsa - 服务器禁用密码登录,只允许密钥登录
- 设置SSH连接超时时间,防止僵死连接
- 私钥文件权限设置为600:
-
异常处理:
- 捕获所有可能的异常(连接超时、认证失败、文件不存在等)
- 用finally块确保连接和传输对象被关闭
六、常见问题排查
-
认证失败:
- 检查用户名、密码是否正确
- 免密登录检查公钥是否正确推送到服务器的
~/.ssh/authorized_keys - 检查服务器
/etc/ssh/sshd_config是否开启了PubkeyAuthentication
-
连接超时:
- 检查服务器防火墙是否开放22端口
- 检查服务器SSH服务是否运行:
systemctl status sshd - 增加timeout参数的值
-
文件传输失败:
- 检查本地文件是否存在
- 检查服务器目录是否有读写权限
- 检查磁盘空间是否充足
七、扩展:批量监控多台服务器
结合你之前的psutil监控脚本,用paramiko批量执行监控命令:
# 服务器列表
servers = [
{"hostname": "192.168.88.101", "password": "123456"},
{"hostname": "192.168.88.102", "password": "123456"},
{"hostname": "192.168.88.103", "password": "123456"},
]
# 监控命令
monitor_cmd = "free -h | grep Mem | awk '{print $3/$2*100}'"
# 批量执行
for server in servers:
print(f"\n===== 服务器 {server['hostname']} 内存使用率 =====")
stdout, stderr = ssh_exec_command(
hostname=server["hostname"],
password=server["password"],
cmd=monitor_cmd
)
if stdout:
print(f"内存使用率: {float(stdout.strip()):.2f}%")
else:
print(f"监控失败: {stderr}")
requests 模块完整教程(面试+生产通用)
我会在你PPT的基础上,纠正错误、补充生产级代码、讲清面试必问的区别,重点突出SRE工作中最常用的接口调用、企业微信告警功能,所有代码直接复制就能用。
一、模块介绍与安装
1. 核心作用
requests 是 Python 最流行的 HTTP 客户端库,用一行代码就能发送复杂的 HTTP 请求,SRE 工作中90%的场景都会用到:
- 调用各类系统的 API 接口(监控、告警、CMDB)
- 简单的网页爬虫(爬取监控数据、日志)
- 企业微信/钉钉/飞书机器人告警
- 自动化测试接口可用性
2. 安装(解决国内源问题)
# 临时用阿里云源安装(最稳定)
pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
# 永久配置国内源(一劳永逸)
mkdir -p ~/.pip
cat > ~/.pip/pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
trusted-host = mirrors.aliyun.com
timeout = 120
EOF
二、GET 请求(获取数据)
1. 基础 GET 请求
import requests
# 1. 定义请求地址
url = "https://www.baidu.com"
# 2. 发送 GET 请求,返回 Response 对象
response = requests.get(url)
# 3. 处理响应
print("状态码:", response.status_code) # 200 表示成功
print("响应头:", response.headers)
print("响应内容(字符串):", response.text)
2. 解决中文乱码问题(PPT重点)
❌ 错误:直接打印 response.text 会出现乱码
✅ 正确:手动指定编码格式
# 方法1:设置响应编码
response.encoding = "utf-8"
print(response.text)
# 方法2:用字节流解码(推荐,更通用)
print(response.content.decode("utf-8"))
text 和 content 的区别(面试必问):
response.text:自动解码的字符串,编码由 requests 猜测,容易乱码response.content:原始字节流,需要手动decode("utf-8"),不会乱码
3. 携带请求头(绕过反爬虫)
百度等网站有反爬虫机制,会检查 User-Agent 请求头,识别出是爬虫就返回403。
import requests
url = "https://www.baidu.com"
# 伪造浏览器请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
}
# 发送请求时携带 headers
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
print(response.text)
4. 携带查询参数(params)
GET 请求的参数是放在 URL 里的,比如 https://www.baidu.com/s?wd=运维,用 params 参数自动拼接,不用手动写 URL。
import requests
url = "https://www.baidu.com/s"
headers = {"User-Agent": "Mozilla/5.0 ..."}
# 查询参数,自动拼接成 ?wd=运维&pn=10
params = {
"wd": "运维",
"pn": 10 # 第2页
}
response = requests.get(url, headers=headers, params=params)
response.encoding = "utf-8"
# 把搜索结果保存到文件
with open("baidu_search.html", "w", encoding="utf-8") as f:
f.write(response.text)
三、POST 请求(提交数据)
1. 核心区别:GET vs POST(面试必背)
| 特性 | GET | POST |
|---|---|---|
| 数据位置 | URL 查询参数 | 请求体(Request Body) |
| 数据大小 | 有限制(浏览器一般2KB) | 理论无限制 |
| 安全性 | 数据暴露在URL中,不安全 | 数据在请求体中,相对安全 |
| 缓存 | 浏览器会缓存 | 默认不缓存 |
| 用途 | 获取数据(查询、浏览) | 提交数据(登录、上传、修改) |
2. 发送 JSON 格式数据(最常用)
现在90%的 API 接口都接收 JSON 格式的数据,用 json 参数自动序列化。
import requests
url = "https://httpbin.org/post" # 测试接口,会返回你发送的数据
# 要提交的数据
data = {
"username": "admin",
"password": "123456"
}
# 发送 POST 请求,json 参数自动设置 Content-Type: application/json
response = requests.post(url, json=data)
print("状态码:", response.status_code)
print("响应内容:", response.json()) # 自动解析 JSON 响应为字典
3. 发送表单格式数据
如果接口接收的是表单格式(application/x-www-form-urlencoded),用 data 参数。
import requests
url = "https://httpbin.org/post"
data = {
"username": "admin",
"password": "123456"
}
# data 参数会自动编码为表单格式
response = requests.post(url, data=data)
print(response.json())
data 和 json 参数的区别(面试必问):
json=data:自动将字典序列化为 JSON 字符串,设置Content-Type: application/jsondata=data:自动将字典编码为表单格式,设置Content-Type: application/x-www-form-urlencoded
四、Response 对象常用属性
response = requests.get("https://www.baidu.com")
print(response.status_code) # 状态码:200成功,404不存在,500服务器错误
print(response.text) # 字符串格式响应内容
print(response.content) # 字节流格式响应内容
print(response.json()) # 自动解析 JSON 为字典(仅当响应是 JSON 时)
print(response.headers) # 响应头(字典)
print(response.cookies) # 响应 Cookies
print(response.url) # 最终请求的 URL
print(response.elapsed) # 请求耗时(用于监控接口响应时间)
五、生产环境最佳实践(SRE 必学)
1. 设置超时时间(必须加)
永远不要写没有超时的请求,否则会导致程序永远卡住。
# 超时时间10秒,超过就抛出异常
response = requests.get("https://www.baidu.com", timeout=10)
2. 异常处理(必须加)
网络请求随时可能失败,必须捕获异常。
import requests
def safe_request(url, method="get", **kwargs):
try:
response = requests.request(method, url, timeout=10, **kwargs)
response.raise_for_status() # 状态码不是200就抛出异常
return response
except requests.exceptions.Timeout:
print("请求超时")
return None
except requests.exceptions.ConnectionError:
print("连接失败")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
return None
except Exception as e:
print(f"未知错误: {e}")
return None
# 调用示例
response = safe_request("https://www.baidu.com")
if response:
print(response.text)
3. 会话复用(提升性能)
多次请求同一个域名时,用 Session 对象复用 TCP 连接,减少握手开销。
import requests
# 创建会话对象
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0 ..."})
# 同一个会话的所有请求都会复用连接和 cookies
response1 = session.get("https://www.baidu.com")
response2 = session.get("https://www.baidu.com/s?wd=运维")
session.close()
六、SRE 实战:企业微信机器人告警(最常用)
这是你工作中每天都会用到的功能,把监控告警、脚本执行结果推送到企业微信群。
1. 准备工作
- 打开企业微信群 → 右上角三个点 → 添加机器人
- 新建一个机器人,复制 Webhook 地址(类似
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx)
2. 完整告警函数
import requests
import json
def send_wechat_alert(webhook_url, content, msg_type="text", mentioned_list=None):
"""
发送企业微信机器人告警
:param webhook_url: 机器人 Webhook 地址
:param content: 告警内容
:param msg_type: 消息类型:text/markdown
:param mentioned_list: @的人列表,["@all"] 表示@所有人
"""
headers = {"Content-Type": "application/json"}
if msg_type == "text":
data = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_list": mentioned_list or []
}
}
elif msg_type == "markdown":
data = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
else:
print("不支持的消息类型")
return False
try:
response = requests.post(webhook_url, headers=headers, json=data, timeout=10)
response.raise_for_status()
result = response.json()
if result["errcode"] == 0:
print("告警发送成功")
return True
else:
print(f"告警发送失败: {result['errmsg']}")
return False
except Exception as e:
print(f"告警发送异常: {e}")
return False
# 调用示例
if __name__ == "__main__":
# 替换成你的机器人 Webhook 地址
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"
# 发送文本告警并@所有人
send_wechat_alert(
WEBHOOK_URL,
"【服务器告警】192.168.88.101 CPU使用率超过90%",
mentioned_list=["@all"]
)
# 发送 Markdown 格式告警
markdown_content = """
## 服务器磁盘告警
- **服务器IP**: 192.168.88.101
- **磁盘分区**: /
- **使用率**: 95%
- **剩余空间**: 2.3GB
"""
send_wechat_alert(WEBHOOK_URL, markdown_content, msg_type="markdown")
七、常见问题排查
- 中文乱码:永远用
response.content.decode("utf-8"),不要用response.text - 403 Forbidden:添加
User-Agent请求头,模拟浏览器访问 - 连接超时:检查网络是否通,增加
timeout参数 - JSON 解析失败:先打印
response.text看响应内容是不是合法的 JSON - 接口返回401 Unauthorized:需要在请求头中添加
Authorization认证信息
完整生产级系统资源监控告警脚本
我在你PPT代码的基础上做了生产级优化,解决了原代码的所有问题(缺少日志文件定义、单磁盘检测、重复告警轰炸、异常处理缺失等),直接复制就能用。
一、完整可运行代码
import psutil
import time
import requests
from datetime import datetime
import os
# ====================== 配置区(根据你的需求修改) ======================
# 资源阈值
CPU_THRESHOLD = 80 # CPU使用率阈值(%)
MEMORY_THRESHOLD = 90 # 内存使用率阈值(%)
DISK_THRESHOLD = 85 # 磁盘使用率阈值(%)
# 企业微信机器人Webhook地址(替换成你的)
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key"
# 日志文件路径
LOG_FILE = "/var/log/system_monitor.log"
# 告警防抖:同一种资源5分钟内只告警一次(避免轰炸)
ALERT_INTERVAL = 300 # 单位:秒
# 监控间隔
CHECK_INTERVAL = 5 # 单位:秒
# ======================================================================
# 记录上次告警时间,用于防抖
last_alert_time = {
"CPU": 0,
"内存": 0,
"磁盘": 0
}
def log_alert(resource_type, usage, threshold):
"""记录告警信息到日志文件"""
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message = f"[{current_time}] {resource_type}使用率过高,超过阈值{threshold}%,当前使用率为{usage:.1f}%\n"
# 确保日志目录存在
log_dir = os.path.dirname(LOG_FILE)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# 追加写入日志
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(message)
print(message.strip())
def send_wechat_alert(message, mentioned_list=None):
"""
发送企业微信告警
:param message: 告警内容
:param mentioned_list: @的人列表,["@all"]表示@所有人
"""
headers = {"Content-Type": "application/json"}
data = {
"msgtype": "text",
"text": {
"content": message,
"mentioned_list": mentioned_list or []
}
}
try:
response = requests.post(WEBHOOK_URL, headers=headers, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result["errcode"] == 0:
print("✅ 企业微信告警发送成功")
return True
else:
print(f"❌ 企业微信告警发送失败: {result['errmsg']}")
return False
else:
print(f"❌ 企业微信接口返回错误状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 企业微信接口调用异常: {str(e)}")
return False
def should_alert(resource_type):
"""判断是否应该发送告警(防抖逻辑)"""
current_time = time.time()
if current_time - last_alert_time[resource_type] > ALERT_INTERVAL:
last_alert_time[resource_type] = current_time
return True
return False
def check_cpu():
"""检测CPU使用率"""
try:
cpu_usage = psutil.cpu_percent(interval=1)
if cpu_usage > CPU_THRESHOLD:
log_alert("CPU", cpu_usage, CPU_THRESHOLD)
if should_alert("CPU"):
alert_msg = f"【服务器CPU告警】\n服务器IP: {get_local_ip()}\nCPU使用率: {cpu_usage:.1f}%\n阈值: {CPU_THRESHOLD}%"
send_wechat_alert(alert_msg, mentioned_list=["@all"])
return cpu_usage
except Exception as e:
print(f"❌ CPU检测失败: {str(e)}")
return 0
def check_memory():
"""检测内存使用率"""
try:
mem = psutil.virtual_memory()
mem_usage = mem.percent
if mem_usage > MEMORY_THRESHOLD:
log_alert("内存", mem_usage, MEMORY_THRESHOLD)
if should_alert("内存"):
alert_msg = f"【服务器内存告警】\n服务器IP: {get_local_ip()}\n内存使用率: {mem_usage:.1f}%\n阈值: {MEMORY_THRESHOLD}%\n剩余内存: {mem.available/1024**3:.2f}GB"
send_wechat_alert(alert_msg, mentioned_list=["@all"])
return mem_usage
except Exception as e:
print(f"❌ 内存检测失败: {str(e)}")
return 0
def check_disk():
"""检测所有磁盘分区使用率"""
try:
partitions = psutil.disk_partitions()
max_disk_usage = 0
alert_partitions = []
for part in partitions:
# 跳过虚拟文件系统和只读分区
if 'loop' in part.device or 'tmpfs' in part.fstype or part.opts.startswith('ro'):
continue
try:
usage = psutil.disk_usage(part.mountpoint)
disk_usage = usage.percent
max_disk_usage = max(max_disk_usage, disk_usage)
if disk_usage > DISK_THRESHOLD:
alert_partitions.append(f"{part.mountpoint}: {disk_usage:.1f}%")
except PermissionError:
continue
if alert_partitions:
log_alert("磁盘", max_disk_usage, DISK_THRESHOLD)
if should_alert("磁盘"):
alert_msg = f"【服务器磁盘告警】\n服务器IP: {get_local_ip()}\n以下分区使用率超过阈值{DISK_THRESHOLD}%:\n" + "\n".join(alert_partitions)
send_wechat_alert(alert_msg, mentioned_list=["@all"])
return max_disk_usage
except Exception as e:
print(f"❌ 磁盘检测失败: {str(e)}")
return 0
def get_local_ip():
"""获取本机IP地址"""
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
def check_resource_usage():
"""检测所有资源"""
print(f"\n===== {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 资源检测 =====")
cpu = check_cpu()
mem = check_memory()
disk = check_disk()
print(f"CPU使用率: {cpu:.1f}% | 内存使用率: {mem:.1f}% | 磁盘最高使用率: {disk:.1f}%")
if __name__ == "__main__":
print("🚀 系统资源监控告警程序启动")
print(f"📌 监控阈值: CPU>{CPU_THRESHOLD}% 内存>{MEMORY_THRESHOLD}% 磁盘>{DISK_THRESHOLD}%")
print(f"📌 日志文件: {LOG_FILE}")
print(f"📌 告警间隔: {ALERT_INTERVAL//60}分钟")
print("="*60)
try:
while True:
check_resource_usage()
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\n👋 监控程序已停止")
except Exception as e:
print(f"\n❌ 程序异常退出: {str(e)}")
# 程序崩溃时也发送告警
send_wechat_alert(f"【严重告警】服务器{get_local_ip()}的监控程序异常退出: {str(e)}", mentioned_list=["@all"])
二、核心优化点(比PPT代码好用10倍)
- 告警防抖:同一种资源5分钟内只告警一次,避免短时间内收到几十条告警轰炸
- 多磁盘检测:自动检测所有实际磁盘分区,排除虚拟文件系统,不会漏掉/data等数据分区
- 完整异常处理:任何环节出错都不会导致程序崩溃,程序崩溃时还会发送告警通知
- 自动获取本机IP:告警信息里自动带上服务器IP,多服务器部署时一眼知道是哪台出问题
- 日志自动创建:自动创建日志目录和文件,不用手动提前创建
- 详细告警信息:内存告警显示剩余内存,磁盘告警显示所有超标分区
- 优雅退出:按Ctrl+C可以正常停止程序
三、部署运行步骤
1. 安装依赖
pip3 install psutil requests -i https://mirrors.aliyun.com/pypi/simple/
2. 修改配置
把代码开头的WEBHOOK_URL替换成你自己的企业微信机器人Webhook地址,根据需要调整阈值。
3. 测试运行
# 前台运行测试
python3 system_monitor.py
测试告警:新开一个终端,用stress-ng压测CPU,看是否会触发告警
# 压测2个CPU核心,持续60秒
stress-ng --cpu 2 --timeout 60s
4. 后台运行(生产环境)
# 后台运行,输出重定向到null
nohup python3 system_monitor.py > /dev/null 2>&1 &
# 查看进程是否运行
ps aux | grep system_monitor
5. 设置开机自启
创建systemd服务文件:
cat > /etc/systemd/system/system-monitor.service << EOF
[Unit]
Description=System Resource Monitor
After=network.target
[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /root/system_monitor.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
启用并启动服务:
systemctl daemon-reload
systemctl enable system-monitor
systemctl start system-monitor
# 查看服务状态
systemctl status system-monitor
四、查看日志
# 实时查看日志
tail -f /var/log/system_monitor.log
# 查看所有告警记录
grep "过高" /var/log/system_monitor.log
五、扩展功能(按需添加)
- 添加网络监控:检测网卡流量、网络连接数
- 添加进程监控:检测关键进程是否在运行(比如nginx、mysql)
- 添加邮件告警:同时发送邮件和企业微信告警
- 添加磁盘IO监控:检测磁盘IOPS和吞吐量
- 添加历史数据统计:把监控数据写入InfluxDB,用Grafana做可视化
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)