OneNET云平台查询最新数据和历史数据(数据流)
产品级key就是access_key,设备级key就是设备密钥。et是密码也就是token过期的时间。记住产品ID、设备名称、设备密钥、access_key。知晓响应示例方便调取其中某一个数值。三、MQTT协议上传模拟数据。我用的是MQTT+数据流。
·
一、创建产品设备

我用的是MQTT+数据流



记住产品ID、设备名称、设备密钥、access_key。
二、安全鉴权
token生成工具

产品级key就是access_key,设备级key就是设备密钥。et是密码也就是token过期的时间。


三、MQTT协议上传模拟数据




python
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import json
import base64
import hmac
import time
from urllib.parse import quote
import random
# 配置信息
ServerUrl = "mqtts.heclouds.com" # OneNET服务器地址
ServerPort = 1883 # 端口
DeviceName = " " # 设备名称
Productid = " " # 产品ID
accesskey = " "
# 主题定义
Pub_topic1 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json"
Sub_topic1 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json/accepted"
Sub_topic2 = "$sys/" + Productid + "/" + DeviceName + "/dp/post/json/rejected"
# 动态生成倾斜角度数据
def generate_angle_data():
angle = round(random.uniform(-15.5, 15.5), 1)
data = {
"id": int(time.time()), # 使用时间戳作为唯一ID
"dp": {
"Angle": [{"v": angle}] # 倾斜角度数据点
}
}
return json.dumps(data)
# MQTT 回调函数
def on_subscribe(client, userdata, mid, reason_code_list, properties):
if reason_code_list[0].is_failure:
print(f"Broker rejected subscription: {reason_code_list[0]}")
else:
print(f"Subscription QoS: {reason_code_list[0].value}")
def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
print("Unsubscribe succeeded")
else:
print(f"Unsubscribe failed: {reason_code_list[0]}")
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
print(f"连接失败: {reason_code}")
else:
print("连接成功:" + mqtt.connack_string(reason_code))
client.subscribe(Sub_topic1)
client.subscribe(Sub_topic2)
def on_message(client, userdata, message):
print("收到消息:", str(message.payload, 'utf-8'))
def on_publish(client, userdata, mid):
print(str(mid))
# 主程序
def main():
passw = " " #token
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, DeviceName)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)
mqttc.username_pw_set(Productid, passw)
mqttc.loop_start()
while True:
payload = generate_angle_data()
mqttc.publish(Pub_topic1, payload, qos=0)
print("已发送角度数据:", payload)
time.sleep(1)
if __name__ == '__main__':
main()
四、查询最新数据


知晓响应示例方便调取其中某一个数值。

python
import requests
DeviceName = " " # 设备名称
Productid = " " # 产品ID
def get_device_last_data(product_id, device_name):
url = "http://iot-api.heclouds.com/datapoint/current-datapoints"
params = {
"product_id": product_id,
"device_name": device_name
}
headers = {
"authorization": " " #token
}
response = requests.get(url, params=params, headers=headers)
return response.json()
if __name__ == "__main__":
print(get_device_last_data(Productid, DeviceName))
五、查询历史数据




python
import requests
DeviceName = " " # 设备名称
Productid = " " # 产品ID
Limit=6000 #可要可不要
def get_device_data_by_time(product_id, device_name, limit, start_time, end_time):
url = "https://iot-api.heclouds.com/datapoint/history-datapoints"
params = {
"product_id": product_id,
"device_name": device_name,
"limit": limit,
"start": start_time,
"end": end_time
}
headers = {
"authorization": " " #token
}
response = requests.get(url, params=params, headers=headers)
return response.json()
if __name__ == "__main__":
from datetime import datetime, timedelta
# Convert to millisecond timestamps
start_ts = datetime(2025, 5, 5, 15, 0, 35).strftime("%Y-%m-%dT%H:%M:%S")
end_ts = datetime(2025, 5, 5, 16, 0, 35).strftime("%Y-%m-%dT%H:%M:%S")
print(get_device_data_by_time(
Productid,
DeviceName,
Limit,
start_ts,
end_ts
))
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)