MQTT订阅

  • 通过broker, port, topic, username, password, log_dir, client_id等信息连接MQTT,从而获取到msg
  • 通过eval()函数将str型数据转换为dict型
  • 将字典型数据传入clean_data()对数据进行过滤
from connect_mqttSubscribe_helper import broker, port, topic, username, password, log_dir, client_id
from paho.mqtt import client as mqtt_client

def connect_mqtt():
    """    MQTT 连接函数    """
    def on_connect(client, userdata, flags, rc):
        """
        连接回调函数
        在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
        """
        if rc == 0:
            print("Connected to MQTT Broker! return code %d" % rc)
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    """     订阅消息       """
    def on_message(client, userdata, msg):
        """
        消息回调函数
        在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
        """
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        msg_dict = eval(msg.payload.decode())  # 数据转为字典
        clean_data(msg_dict)  # 清洗回传数据

    client.subscribe(topic)
    client.on_message = on_message

日志写入

新建log文件夹及将log信息写入logs.log文件

def write_log(log_msg):
    """创建 log 文件夹"""
    folder = log_dir
    if not os.path.exists(folder):
        """如果文件夹不存在则新建"""
        os.makedirs(folder)
    fw = open(log_dir + 'logs.log', 'a', encoding='utf-8')
    fw.write(log_msg)
    fw.close()

将数据写入mongo中

def save_data(msg_dict):
    """将正确数据写入数据库"""
    res = col_jxct_DataCleaning.insert(msg_dict)

数据过滤

  • 通过time值与“2021”进行比较判断,从而剔除时间上错误的数据。时间错误的数据很难判断采集时间,所以剔除。
  • 剔除数值型数据全为0的数据。这种数据很可能是传感器出现异常,因此无法修正。
  • 通过指定采集数据的有效范围进行判断数据的有效性,如果超出了有效范围,则用预先设定的数据进行替换。
def clean_data(msg_dict):
    """清洗传感器回传数据、写入日志、写入数据库"""
    raw_data = str(msg_dict)
    # 1. 判断time是否错误
    del_msg_flag = False
    if msg_dict['time'] < '2021':  # 时间小于2021年,显然时间错误
        del_msg_flag = True

        # 写入日志
        stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
        del_reason = 'del_reason : ' + "Time error\n"
        del_data = '\tdel_data : ' + raw_data + "\n"
        log_msg = stime + del_reason + del_data
        write_log(log_msg)

    # 2. 数值型数据上是否全为0
    if del_msg_flag == False:
        if msg_dict['Tem'] == msg_dict['Hum'] == msg_dict['daqiya'] == msg_dict['Noisse'] == \
                msg_dict['PM25'] == msg_dict['PM10'] == msg_dict['CO2'] == msg_dict['O3'] == \
                msg_dict['NO2'] == msg_dict['SO2'] == msg_dict['tr_wendu'] == msg_dict['tr_shidu'] == \
                msg_dict['tr_ph'] == msg_dict['N'] == msg_dict['P'] == msg_dict['K'] == msg_dict['VCC'] == 0:
            del_msg_flag = True

            # 将删除的数据写入日志
            stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
            del_reason = 'del_reason : ' + "All valid values equal 0\n"
            del_data = '\tdel_data : ' + raw_data + "\n"
            log_msg = stime + del_reason + del_data
            write_log(log_msg)

    # 3. 逐个判断数值是否全在有效范围内
    # Tem:空气温度
    value_change_flag = False
    if del_msg_flag == False:  # 如果数据没有被删除,则检查并修正数据
        if fields_bound_value['tem_l'] <= msg_dict['Tem'] <= fields_bound_value['tem_h']:
            pass
        else:
            msg_dict['Tem'] = fields_default['Tem_dft']
            value_change_flag = True

        # Hum:相对湿度
        if fields_bound_value['hum_l'] <= msg_dict['Hum'] <= fields_bound_value['hum_h']:
            pass
        else:
            msg_dict['Hum'] = fields_default['Hum_dft']
            value_change_flag = True

        # daqiya:大气压
        if fields_bound_value['daqiya_l'] <= msg_dict['daqiya'] <= fields_bound_value['daqiya_h']:
            pass
        else:
            msg_dict['daqiya'] = fields_default['daqiya_dft']
            value_change_flag = True

        # Noisse:噪声
        if fields_bound_value['noisse_l'] <= msg_dict['Noisse'] <= fields_bound_value['noisse_h']:
            pass
        else:
            msg_dict['Noisse'] = fields_default['Noisse_dft']
            value_change_flag = True

        # PM25:PM2.5
        if fields_bound_value['pm25_l'] <= msg_dict['PM25'] <= fields_bound_value['pm25_h']:
            pass
        else:
            msg_dict['PM25'] = fields_default['PM25_dft']
            value_change_flag = True

        # PM10:PM10
        if fields_bound_value['pm10_l'] <= msg_dict['PM10'] <= fields_bound_value['pm10_h']:
            pass
        else:
            msg_dict['PM10'] = fields_default['PM10_dft']
            value_change_flag = True

        # CO2:二氧化碳
        if fields_bound_value['co2_l'] <= msg_dict['CO2'] <= fields_bound_value['co2_h']:
            pass
        else:
            msg_dict['CO2'] = fields_default['CO2_dft']
            value_change_flag = True

        # O3:臭氧
        if fields_bound_value['o3_l'] <= msg_dict['O3'] <= fields_bound_value['o3_h']:
            pass
        else:
            msg_dict['O3'] = fields_default['O3_dft']
            value_change_flag = True

        # NO2:二氧化氮
        if fields_bound_value['no2_l'] <= msg_dict['NO2'] <= fields_bound_value['no2_h']:
            pass
        else:
            msg_dict['NO2'] = fields_default['NO2_dft']
            value_change_flag = True

        # SO2:二氧化硫
        if fields_bound_value['so2_l'] <= msg_dict['SO2'] <= fields_bound_value['so2_h']:
            pass
        else:
            msg_dict['SO2'] = fields_default['SO2_dft']
            value_change_flag = True

        # trwendu:土壤温度
        if fields_bound_value['trwendu_l'] <= msg_dict['tr_wendu'] <= fields_bound_value['trwendu_h']:
            pass
        else:
            msg_dict['tr_wendu'] = fields_default['tr_wendu_dft']
            value_change_flag = True

        # trshidu:土壤湿度
        if fields_bound_value['trshidu_l'] <= msg_dict['tr_shidu'] <= fields_bound_value['trshidu_h']:
            pass
        else:
            msg_dict['tr_shidu'] = fields_default['tr_shidu_dft']
            value_change_flag = True

        # tr_ph:土壤PH
        if fields_bound_value['trph_l'] <= msg_dict['tr_ph'] <= fields_bound_value['trph_h']:
            pass
        else:
            msg_dict['trph'] = fields_default['tr_ph_dft']
            value_change_flag = True

        # N:氮
        if fields_bound_value['n_l'] <= msg_dict['N'] <= fields_bound_value['n_h']:
            pass
        else:
            msg_dict['N'] = fields_default['N_dft']
            value_change_flag = True

        # P:磷
        if fields_bound_value['p_l'] <= msg_dict['P'] <= fields_bound_value['p_h']:
            pass
        else:
            msg_dict['P'] = fields_default['P_dft']
            value_change_flag = True

        # K:钾
        if fields_bound_value['k_l'] <= msg_dict['K'] <= fields_bound_value['k_h']:
            pass
        else:
            msg_dict['K'] = fields_default['K_dft']
            value_change_flag = True

        # VCC:电路的供电电压
        if fields_bound_value['vcc_l'] <= msg_dict['VCC'] <= fields_bound_value['vcc_h']:
            pass
        else:
            msg_dict['VCC'] = fields_default['VCC_dft']
            value_change_flag = True

        # 将未删除且未修正的数据写入日志
        if del_msg_flag == False and value_change_flag == True:
            stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
            del_reason = 'cor_reason : ' + "Some data is out of range\n"
            raw_data = "\traw_data : " + raw_data + '\n'
            cor_data = '\tcor_data : ' + str(msg_dict) + "\n"
            log_msg = stime + del_reason + raw_data + cor_data
            write_log(log_msg)

    # 未删除的数据写入数据库
    db_item_data = dict()
    if del_msg_flag == False:
        db_item_data['tem'] = msg_dict['Tem']
        db_item_data['hum'] = msg_dict['Hum']
        db_item_data['daqiya'] = msg_dict['daqiya']
        db_item_data['noisse'] = msg_dict['Noisse']
        db_item_data['pm25'] = msg_dict['PM25']
        db_item_data['pm10'] = msg_dict['PM10']
        db_item_data['co2'] = msg_dict['CO2']
        db_item_data['o3'] = msg_dict['O3']
        db_item_data['no2'] = msg_dict['NO2']
        db_item_data['so2'] = msg_dict['SO2']
        db_item_data['trwendu'] = msg_dict['tr_wendu']
        db_item_data['trshidu'] = msg_dict['tr_shidu']
        db_item_data['trph'] = msg_dict['tr_ph']
        db_item_data['n'] = msg_dict['N']
        db_item_data['p'] = msg_dict['P']
        db_item_data['k'] = msg_dict['K']
        db_item_data['vcc'] = msg_dict['VCC']
        db_item_data['time'] = msg_dict['time']
        save_data(db_item_data)

完整代码

from jxct_sensorData_boundValue import fields_bound_value, fields_default
from connect_mqttSubscribe_helper import broker, port, topic, username, password, log_dir, client_id
from connect_mongo import connect_to_mongodb

from paho.mqtt import client as mqtt_client
import time
import os


# 链接两个数据库
col_jxct, col_jxct_DataCleaning = connect_to_mongodb()


def write_log(log_msg):
    """创建 log 文件夹"""
    folder = log_dir
    if not os.path.exists(folder):
        """如果文件夹不存在则新建"""
        os.makedirs(folder)
    fw = open(log_dir + 'logs.log', 'a', encoding='utf-8')
    fw.write(log_msg)
    fw.close()

def save_data(msg_dict):
    """将正确数据写入数据库"""
    res = col_jxct_DataCleaning.insert(msg_dict)

def clean_data(msg_dict):
    """清洗传感器回传数据、写入日志、写入数据库"""
    raw_data = str(msg_dict)
    # 1. 判断time是否错误
    del_msg_flag = False
    if msg_dict['time'] < '2021':  # 时间小于2021年,显然时间错误
        del_msg_flag = True

        # 写入日志
        stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
        del_reason = 'del_reason : ' + "Time error\n"
        del_data = '\tdel_data : ' + raw_data + "\n"
        log_msg = stime + del_reason + del_data
        write_log(log_msg)

    # 2. 数值型数据上是否全为0
    if del_msg_flag == False:
        if msg_dict['Tem'] == msg_dict['Hum'] == msg_dict['daqiya'] == msg_dict['Noisse'] == \
                msg_dict['PM25'] == msg_dict['PM10'] == msg_dict['CO2'] == msg_dict['O3'] == \
                msg_dict['NO2'] == msg_dict['SO2'] == msg_dict['tr_wendu'] == msg_dict['tr_shidu'] == \
                msg_dict['tr_ph'] == msg_dict['N'] == msg_dict['P'] == msg_dict['K'] == msg_dict['VCC'] == 0:
            del_msg_flag = True

            # 将删除的数据写入日志
            stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
            del_reason = 'del_reason : ' + "All valid values equal 0\n"
            del_data = '\tdel_data : ' + raw_data + "\n"
            log_msg = stime + del_reason + del_data
            write_log(log_msg)

    # 3. 逐个判断数值是否全在有效范围内
    # Tem:空气温度
    value_change_flag = False
    if del_msg_flag == False:  # 如果数据没有被删除,则检查并修正数据
        if fields_bound_value['tem_l'] <= msg_dict['Tem'] <= fields_bound_value['tem_h']:
            pass
        else:
            msg_dict['Tem'] = fields_default['Tem_dft']
            value_change_flag = True

        # Hum:相对湿度
        if fields_bound_value['hum_l'] <= msg_dict['Hum'] <= fields_bound_value['hum_h']:
            pass
        else:
            msg_dict['Hum'] = fields_default['Hum_dft']
            value_change_flag = True

        # daqiya:大气压
        if fields_bound_value['daqiya_l'] <= msg_dict['daqiya'] <= fields_bound_value['daqiya_h']:
            pass
        else:
            msg_dict['daqiya'] = fields_default['daqiya_dft']
            value_change_flag = True

        # Noisse:噪声
        if fields_bound_value['noisse_l'] <= msg_dict['Noisse'] <= fields_bound_value['noisse_h']:
            pass
        else:
            msg_dict['Noisse'] = fields_default['Noisse_dft']
            value_change_flag = True

        # PM25:PM2.5
        if fields_bound_value['pm25_l'] <= msg_dict['PM25'] <= fields_bound_value['pm25_h']:
            pass
        else:
            msg_dict['PM25'] = fields_default['PM25_dft']
            value_change_flag = True

        # PM10:PM10
        if fields_bound_value['pm10_l'] <= msg_dict['PM10'] <= fields_bound_value['pm10_h']:
            pass
        else:
            msg_dict['PM10'] = fields_default['PM10_dft']
            value_change_flag = True

        # CO2:二氧化碳
        if fields_bound_value['co2_l'] <= msg_dict['CO2'] <= fields_bound_value['co2_h']:
            pass
        else:
            msg_dict['CO2'] = fields_default['CO2_dft']
            value_change_flag = True

        # O3:臭氧
        if fields_bound_value['o3_l'] <= msg_dict['O3'] <= fields_bound_value['o3_h']:
            pass
        else:
            msg_dict['O3'] = fields_default['O3_dft']
            value_change_flag = True

        # NO2:二氧化氮
        if fields_bound_value['no2_l'] <= msg_dict['NO2'] <= fields_bound_value['no2_h']:
            pass
        else:
            msg_dict['NO2'] = fields_default['NO2_dft']
            value_change_flag = True

        # SO2:二氧化硫
        if fields_bound_value['so2_l'] <= msg_dict['SO2'] <= fields_bound_value['so2_h']:
            pass
        else:
            msg_dict['SO2'] = fields_default['SO2_dft']
            value_change_flag = True

        # trwendu:土壤温度
        if fields_bound_value['trwendu_l'] <= msg_dict['tr_wendu'] <= fields_bound_value['trwendu_h']:
            pass
        else:
            msg_dict['tr_wendu'] = fields_default['tr_wendu_dft']
            value_change_flag = True

        # trshidu:土壤湿度
        if fields_bound_value['trshidu_l'] <= msg_dict['tr_shidu'] <= fields_bound_value['trshidu_h']:
            pass
        else:
            msg_dict['tr_shidu'] = fields_default['tr_shidu_dft']
            value_change_flag = True

        # tr_ph:土壤PH
        if fields_bound_value['trph_l'] <= msg_dict['tr_ph'] <= fields_bound_value['trph_h']:
            pass
        else:
            msg_dict['trph'] = fields_default['tr_ph_dft']
            value_change_flag = True

        # N:氮
        if fields_bound_value['n_l'] <= msg_dict['N'] <= fields_bound_value['n_h']:
            pass
        else:
            msg_dict['N'] = fields_default['N_dft']
            value_change_flag = True

        # P:磷
        if fields_bound_value['p_l'] <= msg_dict['P'] <= fields_bound_value['p_h']:
            pass
        else:
            msg_dict['P'] = fields_default['P_dft']
            value_change_flag = True

        # K:钾
        if fields_bound_value['k_l'] <= msg_dict['K'] <= fields_bound_value['k_h']:
            pass
        else:
            msg_dict['K'] = fields_default['K_dft']
            value_change_flag = True

        # VCC:电路的供电电压
        if fields_bound_value['vcc_l'] <= msg_dict['VCC'] <= fields_bound_value['vcc_h']:
            pass
        else:
            msg_dict['VCC'] = fields_default['VCC_dft']
            value_change_flag = True

        # 将未删除且未修正的数据写入日志
        if del_msg_flag == False and value_change_flag == True:
            stime = '[' + str(time.strftime("%Y-%m-%d %H:%M:%S")) + '] : '
            del_reason = 'cor_reason : ' + "Some data is out of range\n"
            raw_data = "\traw_data : " + raw_data + '\n'
            cor_data = '\tcor_data : ' + str(msg_dict) + "\n"
            log_msg = stime + del_reason + raw_data + cor_data
            write_log(log_msg)

    # 未删除的数据写入数据库
    db_item_data = dict()
    if del_msg_flag == False:
        db_item_data['tem'] = msg_dict['Tem']
        db_item_data['hum'] = msg_dict['Hum']
        db_item_data['daqiya'] = msg_dict['daqiya']
        db_item_data['noisse'] = msg_dict['Noisse']
        db_item_data['pm25'] = msg_dict['PM25']
        db_item_data['pm10'] = msg_dict['PM10']
        db_item_data['co2'] = msg_dict['CO2']
        db_item_data['o3'] = msg_dict['O3']
        db_item_data['no2'] = msg_dict['NO2']
        db_item_data['so2'] = msg_dict['SO2']
        db_item_data['trwendu'] = msg_dict['tr_wendu']
        db_item_data['trshidu'] = msg_dict['tr_shidu']
        db_item_data['trph'] = msg_dict['tr_ph']
        db_item_data['n'] = msg_dict['N']
        db_item_data['p'] = msg_dict['P']
        db_item_data['k'] = msg_dict['K']
        db_item_data['vcc'] = msg_dict['VCC']
        db_item_data['time'] = msg_dict['time']
        save_data(db_item_data)


def connect_mqtt():
    """    MQTT 连接函数。    """

    def on_connect(client, userdata, flags, rc):
        """
        连接回调函数
        在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
        """
        if rc == 0:
            print("Connected to MQTT Broker! return code %d" % rc)
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    """     订阅消息       """

    def on_message(client, userdata, msg):
        """
        消息回调函数
        在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
        """
        # 格式:{"time":"2021-05-25 14:28:00","Tem":29.5,"Hum":49.4,"daqiya":1011.56,"Noisse":58.2,"PM25":23,
        #        "PM10":33,"CO2":1152,"O3":0.00,"NO2":1.02,"SO2":0.06,"tr_wendu":18.4,"tr_shidu":100.0,
        #        "tr_ph":6.09,"N":50,"P":67,"K":159,"VCC":0.6}
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        msg_dict = eval(msg.payload.decode())  # 数据转为字典
        clean_data(msg_dict)  # 清洗回传数据

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐