MQTT订阅及数据过滤写入
MQTT订阅及数据过滤写入MQTT订阅日志写入将数据写入mongo中数据过滤完整代码MQTT订阅通过broker, port, topic, username, password, log_dir, client_id等信息连接MQTT,从而获取到msg通过eval()函数将str型数据转换为dict型将字典型数据传入clean_data()对数据进行过滤from connect_mqttSubs
·
MQTT订阅及数据过滤写入
MQTT订阅
- 通过
broker
,port
,topic
,username
,password
,log_dir
,client_id
等信息连接MQTT,从而获取到msg
- 通过
eval()
函数将str型数据转换为dict型 - 将字典型数据传入
clean_data()
对数据进行过滤
from connect_mqttSubscribe_helper import broker, port, topic, username, password, log_dir, client_id
from paho.mqtt import client as mqtt_client
def connect_mqtt():
""" MQTT 连接函数 """
def on_connect(client, userdata, flags, rc):
"""
连接回调函数
在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
"""
if rc == 0:
print("Connected to MQTT Broker! return code %d" % rc)
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
""" 订阅消息 """
def on_message(client, userdata, msg):
"""
消息回调函数
在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
"""
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
msg_dict = eval(msg.payload.decode()) # 数据转为字典
clean_data(msg_dict) # 清洗回传数据
client.subscribe(topic)
client.on_message = on_message
日志写入
新建log
文件夹及将log信息写入logs.log
文件
def write_log(log_msg):
"""创建 log 文件夹"""
folder = log_dir
if not os.path.exists(folder):
"""如果文件夹不存在则新建"""
os.makedirs(folder)
fw = open(log_dir + 'logs.log', 'a', encoding='utf-8')
fw.write(log_msg)
fw.close()
将数据写入mongo中
def save_data(msg_dict):
"""将正确数据写入数据库"""
res = col_jxct_DataCleaning.insert(msg_dict)
数据过滤
- 通过time值与“2021”进行比较判断,从而剔除时间上错误的数据。时间错误的数据很难判断采集时间,所以剔除。
- 剔除数值型数据全为0的数据。这种数据很可能是传感器出现异常,因此无法修正。
- 通过指定采集数据的有效范围进行判断数据的有效性,如果超出了有效范围,则用预先设定的数据进行替换。
def clean_data(msg_dict):
"""清洗传感器回传数据、写入日志、写入数据库"""
raw_data = str(msg_dict)
# 1. 判断time是否错误
del_msg_flag = False
if msg_dict['time'] < '2021': # 时间小于2021年,显然时间错误
del_msg_flag = True
# 写入日志
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'del_reason : ' + "Time error\n"
del_data = '\tdel_data : ' + raw_data + "\n"
log_msg = stime + del_reason + del_data
write_log(log_msg)
# 2. 数值型数据上是否全为0
if del_msg_flag == False:
if msg_dict['Tem'] == msg_dict['Hum'] == msg_dict['daqiya'] == msg_dict['Noisse'] == \
msg_dict['PM25'] == msg_dict['PM10'] == msg_dict['CO2'] == msg_dict['O3'] == \
msg_dict['NO2'] == msg_dict['SO2'] == msg_dict['tr_wendu'] == msg_dict['tr_shidu'] == \
msg_dict['tr_ph'] == msg_dict['N'] == msg_dict['P'] == msg_dict['K'] == msg_dict['VCC'] == 0:
del_msg_flag = True
# 将删除的数据写入日志
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'del_reason : ' + "All valid values equal 0\n"
del_data = '\tdel_data : ' + raw_data + "\n"
log_msg = stime + del_reason + del_data
write_log(log_msg)
# 3. 逐个判断数值是否全在有效范围内
# Tem:空气温度
value_change_flag = False
if del_msg_flag == False: # 如果数据没有被删除,则检查并修正数据
if fields_bound_value['tem_l'] <= msg_dict['Tem'] <= fields_bound_value['tem_h']:
pass
else:
msg_dict['Tem'] = fields_default['Tem_dft']
value_change_flag = True
# Hum:相对湿度
if fields_bound_value['hum_l'] <= msg_dict['Hum'] <= fields_bound_value['hum_h']:
pass
else:
msg_dict['Hum'] = fields_default['Hum_dft']
value_change_flag = True
# daqiya:大气压
if fields_bound_value['daqiya_l'] <= msg_dict['daqiya'] <= fields_bound_value['daqiya_h']:
pass
else:
msg_dict['daqiya'] = fields_default['daqiya_dft']
value_change_flag = True
# Noisse:噪声
if fields_bound_value['noisse_l'] <= msg_dict['Noisse'] <= fields_bound_value['noisse_h']:
pass
else:
msg_dict['Noisse'] = fields_default['Noisse_dft']
value_change_flag = True
# PM25:PM2.5
if fields_bound_value['pm25_l'] <= msg_dict['PM25'] <= fields_bound_value['pm25_h']:
pass
else:
msg_dict['PM25'] = fields_default['PM25_dft']
value_change_flag = True
# PM10:PM10
if fields_bound_value['pm10_l'] <= msg_dict['PM10'] <= fields_bound_value['pm10_h']:
pass
else:
msg_dict['PM10'] = fields_default['PM10_dft']
value_change_flag = True
# CO2:二氧化碳
if fields_bound_value['co2_l'] <= msg_dict['CO2'] <= fields_bound_value['co2_h']:
pass
else:
msg_dict['CO2'] = fields_default['CO2_dft']
value_change_flag = True
# O3:臭氧
if fields_bound_value['o3_l'] <= msg_dict['O3'] <= fields_bound_value['o3_h']:
pass
else:
msg_dict['O3'] = fields_default['O3_dft']
value_change_flag = True
# NO2:二氧化氮
if fields_bound_value['no2_l'] <= msg_dict['NO2'] <= fields_bound_value['no2_h']:
pass
else:
msg_dict['NO2'] = fields_default['NO2_dft']
value_change_flag = True
# SO2:二氧化硫
if fields_bound_value['so2_l'] <= msg_dict['SO2'] <= fields_bound_value['so2_h']:
pass
else:
msg_dict['SO2'] = fields_default['SO2_dft']
value_change_flag = True
# trwendu:土壤温度
if fields_bound_value['trwendu_l'] <= msg_dict['tr_wendu'] <= fields_bound_value['trwendu_h']:
pass
else:
msg_dict['tr_wendu'] = fields_default['tr_wendu_dft']
value_change_flag = True
# trshidu:土壤湿度
if fields_bound_value['trshidu_l'] <= msg_dict['tr_shidu'] <= fields_bound_value['trshidu_h']:
pass
else:
msg_dict['tr_shidu'] = fields_default['tr_shidu_dft']
value_change_flag = True
# tr_ph:土壤PH
if fields_bound_value['trph_l'] <= msg_dict['tr_ph'] <= fields_bound_value['trph_h']:
pass
else:
msg_dict['trph'] = fields_default['tr_ph_dft']
value_change_flag = True
# N:氮
if fields_bound_value['n_l'] <= msg_dict['N'] <= fields_bound_value['n_h']:
pass
else:
msg_dict['N'] = fields_default['N_dft']
value_change_flag = True
# P:磷
if fields_bound_value['p_l'] <= msg_dict['P'] <= fields_bound_value['p_h']:
pass
else:
msg_dict['P'] = fields_default['P_dft']
value_change_flag = True
# K:钾
if fields_bound_value['k_l'] <= msg_dict['K'] <= fields_bound_value['k_h']:
pass
else:
msg_dict['K'] = fields_default['K_dft']
value_change_flag = True
# VCC:电路的供电电压
if fields_bound_value['vcc_l'] <= msg_dict['VCC'] <= fields_bound_value['vcc_h']:
pass
else:
msg_dict['VCC'] = fields_default['VCC_dft']
value_change_flag = True
# 将未删除且未修正的数据写入日志
if del_msg_flag == False and value_change_flag == True:
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'cor_reason : ' + "Some data is out of range\n"
raw_data = "\traw_data : " + raw_data + '\n'
cor_data = '\tcor_data : ' + str(msg_dict) + "\n"
log_msg = stime + del_reason + raw_data + cor_data
write_log(log_msg)
# 未删除的数据写入数据库
db_item_data = dict()
if del_msg_flag == False:
db_item_data['tem'] = msg_dict['Tem']
db_item_data['hum'] = msg_dict['Hum']
db_item_data['daqiya'] = msg_dict['daqiya']
db_item_data['noisse'] = msg_dict['Noisse']
db_item_data['pm25'] = msg_dict['PM25']
db_item_data['pm10'] = msg_dict['PM10']
db_item_data['co2'] = msg_dict['CO2']
db_item_data['o3'] = msg_dict['O3']
db_item_data['no2'] = msg_dict['NO2']
db_item_data['so2'] = msg_dict['SO2']
db_item_data['trwendu'] = msg_dict['tr_wendu']
db_item_data['trshidu'] = msg_dict['tr_shidu']
db_item_data['trph'] = msg_dict['tr_ph']
db_item_data['n'] = msg_dict['N']
db_item_data['p'] = msg_dict['P']
db_item_data['k'] = msg_dict['K']
db_item_data['vcc'] = msg_dict['VCC']
db_item_data['time'] = msg_dict['time']
save_data(db_item_data)
完整代码
from jxct_sensorData_boundValue import fields_bound_value, fields_default
from connect_mqttSubscribe_helper import broker, port, topic, username, password, log_dir, client_id
from connect_mongo import connect_to_mongodb
from paho.mqtt import client as mqtt_client
import time
import os
# 链接两个数据库
col_jxct, col_jxct_DataCleaning = connect_to_mongodb()
def write_log(log_msg):
"""创建 log 文件夹"""
folder = log_dir
if not os.path.exists(folder):
"""如果文件夹不存在则新建"""
os.makedirs(folder)
fw = open(log_dir + 'logs.log', 'a', encoding='utf-8')
fw.write(log_msg)
fw.close()
def save_data(msg_dict):
"""将正确数据写入数据库"""
res = col_jxct_DataCleaning.insert(msg_dict)
def clean_data(msg_dict):
"""清洗传感器回传数据、写入日志、写入数据库"""
raw_data = str(msg_dict)
# 1. 判断time是否错误
del_msg_flag = False
if msg_dict['time'] < '2021': # 时间小于2021年,显然时间错误
del_msg_flag = True
# 写入日志
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'del_reason : ' + "Time error\n"
del_data = '\tdel_data : ' + raw_data + "\n"
log_msg = stime + del_reason + del_data
write_log(log_msg)
# 2. 数值型数据上是否全为0
if del_msg_flag == False:
if msg_dict['Tem'] == msg_dict['Hum'] == msg_dict['daqiya'] == msg_dict['Noisse'] == \
msg_dict['PM25'] == msg_dict['PM10'] == msg_dict['CO2'] == msg_dict['O3'] == \
msg_dict['NO2'] == msg_dict['SO2'] == msg_dict['tr_wendu'] == msg_dict['tr_shidu'] == \
msg_dict['tr_ph'] == msg_dict['N'] == msg_dict['P'] == msg_dict['K'] == msg_dict['VCC'] == 0:
del_msg_flag = True
# 将删除的数据写入日志
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'del_reason : ' + "All valid values equal 0\n"
del_data = '\tdel_data : ' + raw_data + "\n"
log_msg = stime + del_reason + del_data
write_log(log_msg)
# 3. 逐个判断数值是否全在有效范围内
# Tem:空气温度
value_change_flag = False
if del_msg_flag == False: # 如果数据没有被删除,则检查并修正数据
if fields_bound_value['tem_l'] <= msg_dict['Tem'] <= fields_bound_value['tem_h']:
pass
else:
msg_dict['Tem'] = fields_default['Tem_dft']
value_change_flag = True
# Hum:相对湿度
if fields_bound_value['hum_l'] <= msg_dict['Hum'] <= fields_bound_value['hum_h']:
pass
else:
msg_dict['Hum'] = fields_default['Hum_dft']
value_change_flag = True
# daqiya:大气压
if fields_bound_value['daqiya_l'] <= msg_dict['daqiya'] <= fields_bound_value['daqiya_h']:
pass
else:
msg_dict['daqiya'] = fields_default['daqiya_dft']
value_change_flag = True
# Noisse:噪声
if fields_bound_value['noisse_l'] <= msg_dict['Noisse'] <= fields_bound_value['noisse_h']:
pass
else:
msg_dict['Noisse'] = fields_default['Noisse_dft']
value_change_flag = True
# PM25:PM2.5
if fields_bound_value['pm25_l'] <= msg_dict['PM25'] <= fields_bound_value['pm25_h']:
pass
else:
msg_dict['PM25'] = fields_default['PM25_dft']
value_change_flag = True
# PM10:PM10
if fields_bound_value['pm10_l'] <= msg_dict['PM10'] <= fields_bound_value['pm10_h']:
pass
else:
msg_dict['PM10'] = fields_default['PM10_dft']
value_change_flag = True
# CO2:二氧化碳
if fields_bound_value['co2_l'] <= msg_dict['CO2'] <= fields_bound_value['co2_h']:
pass
else:
msg_dict['CO2'] = fields_default['CO2_dft']
value_change_flag = True
# O3:臭氧
if fields_bound_value['o3_l'] <= msg_dict['O3'] <= fields_bound_value['o3_h']:
pass
else:
msg_dict['O3'] = fields_default['O3_dft']
value_change_flag = True
# NO2:二氧化氮
if fields_bound_value['no2_l'] <= msg_dict['NO2'] <= fields_bound_value['no2_h']:
pass
else:
msg_dict['NO2'] = fields_default['NO2_dft']
value_change_flag = True
# SO2:二氧化硫
if fields_bound_value['so2_l'] <= msg_dict['SO2'] <= fields_bound_value['so2_h']:
pass
else:
msg_dict['SO2'] = fields_default['SO2_dft']
value_change_flag = True
# trwendu:土壤温度
if fields_bound_value['trwendu_l'] <= msg_dict['tr_wendu'] <= fields_bound_value['trwendu_h']:
pass
else:
msg_dict['tr_wendu'] = fields_default['tr_wendu_dft']
value_change_flag = True
# trshidu:土壤湿度
if fields_bound_value['trshidu_l'] <= msg_dict['tr_shidu'] <= fields_bound_value['trshidu_h']:
pass
else:
msg_dict['tr_shidu'] = fields_default['tr_shidu_dft']
value_change_flag = True
# tr_ph:土壤PH
if fields_bound_value['trph_l'] <= msg_dict['tr_ph'] <= fields_bound_value['trph_h']:
pass
else:
msg_dict['trph'] = fields_default['tr_ph_dft']
value_change_flag = True
# N:氮
if fields_bound_value['n_l'] <= msg_dict['N'] <= fields_bound_value['n_h']:
pass
else:
msg_dict['N'] = fields_default['N_dft']
value_change_flag = True
# P:磷
if fields_bound_value['p_l'] <= msg_dict['P'] <= fields_bound_value['p_h']:
pass
else:
msg_dict['P'] = fields_default['P_dft']
value_change_flag = True
# K:钾
if fields_bound_value['k_l'] <= msg_dict['K'] <= fields_bound_value['k_h']:
pass
else:
msg_dict['K'] = fields_default['K_dft']
value_change_flag = True
# VCC:电路的供电电压
if fields_bound_value['vcc_l'] <= msg_dict['VCC'] <= fields_bound_value['vcc_h']:
pass
else:
msg_dict['VCC'] = fields_default['VCC_dft']
value_change_flag = True
# 将未删除且未修正的数据写入日志
if del_msg_flag == False and value_change_flag == True:
stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
del_reason = 'cor_reason : ' + "Some data is out of range\n"
raw_data = "\traw_data : " + raw_data + '\n'
cor_data = '\tcor_data : ' + str(msg_dict) + "\n"
log_msg = stime + del_reason + raw_data + cor_data
write_log(log_msg)
# 未删除的数据写入数据库
db_item_data = dict()
if del_msg_flag == False:
db_item_data['tem'] = msg_dict['Tem']
db_item_data['hum'] = msg_dict['Hum']
db_item_data['daqiya'] = msg_dict['daqiya']
db_item_data['noisse'] = msg_dict['Noisse']
db_item_data['pm25'] = msg_dict['PM25']
db_item_data['pm10'] = msg_dict['PM10']
db_item_data['co2'] = msg_dict['CO2']
db_item_data['o3'] = msg_dict['O3']
db_item_data['no2'] = msg_dict['NO2']
db_item_data['so2'] = msg_dict['SO2']
db_item_data['trwendu'] = msg_dict['tr_wendu']
db_item_data['trshidu'] = msg_dict['tr_shidu']
db_item_data['trph'] = msg_dict['tr_ph']
db_item_data['n'] = msg_dict['N']
db_item_data['p'] = msg_dict['P']
db_item_data['k'] = msg_dict['K']
db_item_data['vcc'] = msg_dict['VCC']
db_item_data['time'] = msg_dict['time']
save_data(db_item_data)
def connect_mqtt():
""" MQTT 连接函数。 """
def on_connect(client, userdata, flags, rc):
"""
连接回调函数
在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
"""
if rc == 0:
print("Connected to MQTT Broker! return code %d" % rc)
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
""" 订阅消息 """
def on_message(client, userdata, msg):
"""
消息回调函数
在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
"""
# 格式:{"time":"2021-05-25 14:28:00","Tem":29.5,"Hum":49.4,"daqiya":1011.56,"Noisse":58.2,"PM25":23,
# "PM10":33,"CO2":1152,"O3":0.00,"NO2":1.02,"SO2":0.06,"tr_wendu":18.4,"tr_shidu":100.0,
# "tr_ph":6.09,"N":50,"P":67,"K":159,"VCC":0.6}
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
msg_dict = eval(msg.payload.decode()) # 数据转为字典
clean_data(msg_dict) # 清洗回传数据
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)