提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合
本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。
1、背景
日志是非常重要的信息资源。它们记录了应用程序的运行状态、错误和异常情况,帮助我们了解系统的健康状况以及发现潜在的问题。为了高效地管理和分析日志数据,许多组织采用了Elasticsearch、Logstash和Kibana(ELK)堆栈作为日志收集和分析的解决方案。
开发一个实时监控和告警脚本,专门用于监控ELK平台中的错误日志,并及时发送告警通知给相关人员。该系统将通过扫描Elasticsearch中的日志数据,筛选出等级为ERROR的错误日志,并根据预设的告警规则进行处理。
2、目的
使用Python从Elasticsearch中查询特定级别为ERROR的错误日志,并通过钉钉机器人实现告警聚合和发送,以提高错误日志的处理效率和及时响应能力。
为什么开发这个脚本?
因为目前我们这边没有监控日志的信息,出现问题不能及时发现 和预知
优势
1、消息进行聚合,每个项目的多条告警信息,汇总一条发送。突破钉钉机器人每分钟只能发送20条的限制
2、告警信息you太多的重复,进行去重处理,添加告警次数发送。防止被钉钉限流
3、原理
- 使用Python的Elasticsearch库连接到Elasticsearch集群。
- 构建Elasticsearch查询DSL(领域专用语言),过滤出级别为ERROR的日志记录。
- 执行查询并获取结果。
- 对查询结果进行聚合,统计每个项目的错误次数。
- 根据聚合结果,生成告警消息的Markdown格式内容。
- 使用钉钉机器人发送告警消息到指定的钉钉群。
4、流程
- 导入必要的Python库,包括
elasticsearch
和requests
。 - 创建Elasticsearch连接,指定Elasticsearch集群的主机和端口。
- 构建Elasticsearch查询DSL,设置查询条件为日志级别为ERROR。
- 执行查询,获取查询结果。
- 对查询结果进行处理,聚合每个项目的错误次数。
- 根据聚合结果生成告警消息的Markdown内容。
- 使用钉钉机器人API发送告警消息到指定的钉钉群。
5、实现代码
es版本是7.2的 : elasticsearch-7.2.0 kibana-7.2.0-linux-x86_64
python 使用的插件:pip install elasticsearch==7.5.1
# -*- coding: utf-8 -*-
# @Time : 2023/6/17 18:11
# @Author : 南宫乘风
# @Email : 1794748404@qq.com
# @File : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedelta
import requests
from elasticsearch import Elasticsearch
from monitor.es_ding import send_pretty_message
# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),
sniff_on_start=True, # 连接前测试
sniff_on_connection_fail=True, # 节点无响应时刷新节点
sniff_timeout=300, # 设置超时时间
headers={'Content-Type': 'application/json'})
def format_timestamp(timestamp):
"""格式化时间为Elasticsearch接受的字符串格式"""
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
def search_errors():
"""执行查询,获取错误日志数据"""
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=10)
current_time_str = format_timestamp(current_time)
one_minute_ago_str = format_timestamp(one_minute_ago)
index = 'app-prod-*' # 替换为实际的索引名称
query = {
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": one_minute_ago_str,
"lt": current_time_str,
"format": "yyyy-MM-dd HH:mm:ss",
"time_zone": "+08:00"
}
}
},
{
"match": {
"loglevel": "ERROR" #匹配项目错误等级
}
},
{
"bool": {
"must_not": [
{
"match": {
"projectname": "fox-data-spiderman" # 需要屏蔽的项目
}
}
]
}
}
]
}
},
"_source": [ ## 输出的字段
"date",
"projectname",
"threadname",
"msg"
],
"from": 0,
"size": 10000, # 返回查询的条数
}
result = es.search(index=index, body=query)
total_documents = result["hits"]["total"]["value"]
print(f"总共匹配到 {total_documents} 条文档")
result = result['hits']['hits']
all_result = []
for i in result:
all_result.append(i['_source'])
msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
results = []
for d in all_result:
if 'msg' in d and d['msg'] in msg_counter:
count = msg_counter[d['msg']]
del msg_counter[d['msg']]
d['count'] = count
d['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')
results.append(d)
return results
def aggregate_errors(results):
"""按项目名称聚合错误日志"""
aggregated_data = {}
for d in results:
projectname = d.get('projectname')
if projectname:
if projectname not in aggregated_data:
aggregated_data[projectname] = []
aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})
return aggregated_data
def generate_summary(projectname, messages):
"""生成Markdown格式的消息摘要"""
markdown_text = f'### {projectname} \n\n'
for message in messages:
markdown_text += f"**时间:** {message['date']}\n\n"
markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"
markdown_text += f"{message['msg']}\n\n---\n\n"
return markdown_text
def send_message_summary(projectname, messages):
"""发送摘要消息给钉钉机器人"""
summary = generate_summary(projectname, messages)
data = {
'msgtype': 'markdown',
'markdown': {
'title': f'{projectname}消息告警',
'text': summary
}
}
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx' # 替换为实际的Webhook URL
response = requests.post(webhook_url, json=data)
if response.status_code == 200:
print('消息发送成功')
else:
print('消息发送失败')
if __name__ == '__main__':
errors = search_errors()
aggregated_errors = aggregate_errors(errors)
for projectname, messages in aggregated_errors.items():
print(f"{projectname}:")
print(messages)
完整的代码
# -*- coding: utf-8 -*-
# @Time : 2023/6/15 15:11
# @Author : 南宫乘风
# @Email : 1794748404@qq.com
# @File : es_monitor.py
# @Software: PyCharm
import json
from collections import Counter
from datetime import datetime, timedelta
import requests
from elasticsearch import Elasticsearch
# 获取当前时间和一分钟前的时间
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=2)
# 格式化时间为Elasticsearch接受的字符串格式
current_time_str = current_time.strftime("%Y-%m-%d %H:%M:%S")
one_minute_ago_str = one_minute_ago.strftime("%Y-%m-%d %H:%M:%S")
# # 大
# current_time_str = '2023-06-16 12:16:10'
# # 小
# one_minute_ago_str = '2023-06-16 12:00:10'
print(current_time_str, one_minute_ago_str)
# 创建Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxx'),
sniff_on_start=True, # 连接前测试
sniff_on_connection_fail=True, # 节点无响应时刷新节点
sniff_timeout=300, # 设置超时时间
headers={'Content-Type': 'application/json'})
# 执行查询
index = 'app-prod-*' # 替换为实际的索引名称
# 构建查询语句
query = {
"query": {
"bool": {
"filter": [{
"range": {
"@timestamp": {
"gte": one_minute_ago_str,
"lt": current_time_str,
"format": "yyyy-MM-dd HH:mm:ss",
"time_zone": "+08:00"
}
}
},
{
"match": {
"loglevel": "ERROR"
}
},
{
"bool": {
"must_not": [
{
"match": {
"projectname": "fox-data-spiderman"
}
}
]
}
}]
}
},
"_source": [
"date",
"projectname",
"threadname",
"msg"
],
"from": 0,
"size": 10000,
}
def count_errors(result):
# 使用Counter统计msg字段的值出现的次数
msg_counter = Counter(hit['_source']['projectname'] for hit in result)
# 将Counter对象转换为字符串,并以漂亮的格式序列化
formatted_output = (dict(msg_counter))
# 按照数值降序对字典进行排序
sorted_counter = {k: v for k, v in sorted(formatted_output.items(), key=lambda x: x[1], reverse=True)}
title = f" **{one_minute_ago_str} 至 {current_time_str}** \n\n **共有 {total_documents} 条错误日志告警**"
# 发送钉钉消息
send_pretty_message(title, sorted_counter)
def generate_markdown_message(aggregated_data):
markdown_message = ""
for projectname, messages in aggregated_data.items():
markdown_message += f"**{projectname}:**\n\n"
for message in messages:
truncated_msg = message['msg'][:100] + ('...' if len(message['msg']) > 100 else '')
markdown_message += f"- {truncated_msg} Count: {message['count']}\n"
markdown_message += "\n"
return markdown_message
################################错误告摘要############################################
# 生成Markdown格式的消息摘要
def generate_summary(projectname, messages):
markdown_text = f'### {projectname} \n\n'
for message in messages:
markdown_text += f"**时间:** {message['date']}\n\n"
markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"
markdown_text += f"{message['msg']}\n\n---\n\n"
return markdown_text
def send_message_summary(projectname, messages):
# 构建摘要消息
summary = generate_summary(projectname, messages)
# 构建消息数据
data = {
'msgtype': 'markdown',
'markdown': {
'title': f'{projectname}消息告警',
'text': summary
}
}
# 发送消息给钉钉机器人
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxx'
response = requests.post(webhook_url, json=data)
if response.status_code == 200:
print('消息发送成功')
else:
print('消息发送失败')
if __name__ == '__main__': # 执行查询
result = es.search(index=index, body=query)
# print(result)
# 获取匹配的文档数量
# 错误日志的数量
total_documents = result["hits"]["total"]["value"]
print(f"总共匹配到 {total_documents} 条文档")
# 获取匹配的文档
result = result['hits']['hits']
all_result = []
for i in result:
all_result.append(i['_source'])
# 统计重复的msg出现的次数
msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
# 新建一个空列表,用于存储结果
results = []
# 遍历原始数据,只保留第一次出现的具有重复msg的字典,并添加count字段
for d in all_result:
if 'msg' in d and d['msg'] in msg_counter:
count = msg_counter[d['msg']]
del msg_counter[d['msg']]
d['count'] = count
d['msg'] = d['msg'][:600] + (d['msg'][600:] and '...')
results.append(d)
from collections import defaultdict
aggregated_data = defaultdict(list)
for d in results:
date = d.get('date')
projectname = d.get('projectname')
msg = d.get('msg')
count = d.get('count')
if projectname:
aggregated_data[projectname].append({'date': date, 'msg': msg, 'count': count})
for projectname, messages in aggregated_data.items():
print(f"{projectname}:")
print(messages)
send_message_summary(projectname, messages)
6、Crontab添加定时任务
也可以用采用:Jenkins与GitLab的定时任务工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501
#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u es_monitor.py >> es_error_info.log 2>&1
该定时任务的含义是每隔2分钟执行一次指定目录下的 es_monitor.py 脚本,并将输出信息追加到 es_error_info.log 文件中。这样可以定期监控 Elasticsearch 的错误日志,并记录相关信息以便后续查看和分析。
7、总结
本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)