一、创建产品设备

我用的是MQTT+数据流

记住产品ID、设备名称、设备密钥、access_key。

二、安全鉴权

OneNET - 中国移动物联网开放平台

token生成工具

产品级key就是access_key,设备级key就是设备密钥。et是密码也就是token过期的时间。

三、MQTT协议上传模拟数据

python

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import json
import base64
import hmac
import time
from urllib.parse import quote
import random

# 配置信息
ServerUrl = "mqtts.heclouds.com"  # OneNET服务器地址
ServerPort = 1883                 # 端口
DeviceName = " "           # 设备名称
Productid = " "          # 产品ID
accesskey = " "

# 主题定义
Pub_topic1 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json"
Sub_topic1 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json/accepted"
Sub_topic2 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json/rejected"


# 动态生成倾斜角度数据
def generate_angle_data():
    angle = round(random.uniform(-15.5, 15.5), 1)  
    data = {
        "id": int(time.time()),  # 使用时间戳作为唯一ID
        "dp": {
            "Angle": [{"v": angle}]  # 倾斜角度数据点
        }
    }
    return json.dumps(data)




# MQTT 回调函数
def on_subscribe(client, userdata, mid, reason_code_list, properties):
    if reason_code_list[0].is_failure:
        print(f"Broker rejected subscription: {reason_code_list[0]}")
    else:
        print(f"Subscription QoS: {reason_code_list[0].value}")


def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
    if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
        print("Unsubscribe succeeded")
    else:
        print(f"Unsubscribe failed: {reason_code_list[0]}")


def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code.is_failure:
        print(f"连接失败: {reason_code}")
    else:
        print("连接成功:" + mqtt.connack_string(reason_code))
        client.subscribe(Sub_topic1)
        client.subscribe(Sub_topic2)


def on_message(client, userdata, message):
    print("收到消息:", str(message.payload, 'utf-8'))


def on_publish(client, userdata, mid):
    print(str(mid))


# 主程序
def main():
    passw = " "   #token
   

    mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, DeviceName)
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe

    mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)
    mqttc.username_pw_set(Productid, passw)
    mqttc.loop_start()

    while True:
        payload = generate_angle_data()
        mqttc.publish(Pub_topic1, payload, qos=0)
        print("已发送角度数据:", payload)
        time.sleep(1)
    



if __name__ == '__main__':
    main()
   

四、查询最新数据

OneNET - 中国移动物联网开放平台

知晓响应示例方便调取其中某一个数值。

python

import requests

DeviceName = " "           # 设备名称
Productid = " "          # 产品ID
def get_device_last_data(product_id, device_name):

    url = "http://iot-api.heclouds.com/datapoint/current-datapoints"
    params = {
        "product_id": product_id,
        "device_name": device_name
    }
    headers = {
        "authorization": " "   #token
    }
    
    response = requests.get(url, params=params, headers=headers)
    return response.json()

if __name__ == "__main__":

    print(get_device_last_data(Productid, DeviceName))

五、查询历史数据

OneNET - 中国移动物联网开放平台

python

import requests
DeviceName = " "           # 设备名称
Productid = " "          # 产品ID
Limit=6000  #可要可不要
def get_device_data_by_time(product_id, device_name, limit, start_time, end_time):
    url = "https://iot-api.heclouds.com/datapoint/history-datapoints"
    
    params = {
        "product_id": product_id,
        "device_name": device_name,
        "limit": limit,
        "start": start_time,
        "end": end_time
    }
    headers = {
        "authorization": " "  #token
    }
    response = requests.get(url, params=params, headers=headers)
    return response.json()

if __name__ == "__main__":
    from datetime import datetime, timedelta
    
    
    # Convert to millisecond timestamps
    start_ts = datetime(2025, 5, 5, 15, 0, 35).strftime("%Y-%m-%dT%H:%M:%S")
    end_ts = datetime(2025, 5, 5, 16, 0, 35).strftime("%Y-%m-%dT%H:%M:%S")
    
    print(get_device_data_by_time(
        Productid, 
        DeviceName, 
        Limit, 
        start_ts, 
        end_ts
    ))

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐