
通过ThingsBoard gateway将数据传输至ThingsBoard平台
此处配置是为了通过host和accessToken连接到ThingsBoard平台中指定的网关设备,然后通过connectors定义的连接器中的json文件,来对数据进行处理并传输到ThingsBoard平台的指定设备中。(3)进入EMQX的主题界面,添加主题,查看该主题的消息流入之后是流出还是被丢弃,如果被丢弃就重新检查前两项内容以及mqtt协议的版本(一般有3.1和5.0,默认使用的是5.0)
目录
1、MQTTX发送消息后ThingsBoard平台未收到消息
一、安装ThingsBoard gateway
1、下载项目文件
https://github.com/thingsboard/thingsboard-gateway
下载完成后使用IDEA打开,目录结构如下:
(其中thingsboard_gateway目录下的tb_gateway.py文件是项目启动文件)
2、配置IDEA的Python环境
(前置条件是系统已安装好Python,并配好环境变量)
(1)File -> Settings -> Plugins,搜索Python插件并安装;
(2)打开thingsboard_gateway目录下的tb_gateway.py文件,此时会弹出信息,点击Configure Python interpreter,选择Python的SDK:
(如果没有弹出信息,通过File->Project Structure进行配置)
(3)运行tb_gateway.py,根据报错信息添加缺失的模块。
二、MQTT Broker路线
1、工具准备(默认已搭建好ThingsBoard平台)
(1)EMQX:开源的MQTT消息服务器,具备设备接入、消息路由、主题监控等功能
- 下载地址:EMQX: 大规模分布式 MQTT 消息服务器
- 安装完成后进入bin文件夹,进入cmd终端,运行emqx start(每次电脑重启都需要重新start)
C:\tools\emqx-5.1.0-windows-amd64\bin>emqx start
- 浏览器进入: http://localhost:18083/ ,默认账号:admin,密码:public
(2)MQQTX:MQTT客户端,也可以使用MQTT.fx或者mosquiitto等
- 下载地址:MQTTX:全功能 MQTT 客户端工具
- 进入MQTTX后选择“New Connection”创建连接
- Name:自定义
- broker.emqx.io:EMQX的服务器地址,本地部署是localhost
- Username与Password:EMQX登录的账号与密码
- 指定Topic,注意消息体要使用json格式
(3)创建一个ThingsBoard网关设备
2、网关配置
(1)打开thingsboard_gateway/config/tb_gateway.yaml文件
需修改三处:
- host:ThingsBoard平台服务器地址
- accessToken:填入上述创建的网关设备的访问令牌
- connectors部分:查看是否有MQTT Broker Connector,若没有则按下述代码编写一个
此处配置是为了通过host和accessToken连接到ThingsBoard平台中指定的网关设备,然后通过connectors定义的连接器中的json文件,来对数据进行处理并传输到ThingsBoard平台的指定设备中。
thingsboard:
host: thingsboard.cloud
port: 1883
remoteShell: false
remoteConfiguration: false
statistics:
enable: true
statsSendPeriodInSeconds: 60
# configuration: statistics.json
deviceFiltering:
enable: false
filterFile: list.json
maxPayloadSizeBytes: 1024
minPackSendDelayMS: 200
minPackSizeToSend: 500
checkConnectorsConfigurationInSeconds: 60
handleDeviceRenaming: true
checkingDeviceActivity:
checkDeviceInactivity: false
inactivityTimeoutSeconds: 120
inactivityCheckPeriodSeconds: 10
security:
accessToken: PUT_YOUR_GW_ACCESS_TOKEN_HERE
qos: 1
storage:
type: memory
read_records_count: 100
max_records_count: 100000
# type: file
# data_folder_path: ./data/
# max_file_count: 10
# max_read_records_count: 10
# max_records_per_file: 10000
# type: sqlite
# data_file_path: ./data/data.db
# messages_ttl_check_in_hours: 1
# messages_ttl_in_days: 7
grpc:
enabled: false
serverPort: 9595
keepaliveTimeMs: 10000
keepaliveTimeoutMs: 5000
keepalivePermitWithoutCalls: true
maxPingsWithoutData: 0
minTimeBetweenPingsMs: 10000
minPingIntervalWithoutDataMs: 5000
connectors:
-
name: MQTT Broker Connector
type: mqtt
configuration: mqtt.json
(2)打开thingsboard_gateway/config/mqtt.json文件
如果mqtt.json文件不存在,则在tb_gateway.yaml同目录下创建一个。
- broker部分:MQTT服务器的连接配置,需修改下列内容
- host:EMQX服务器地址
- username/password:EMQX登录账号与密码
(如果使用其他的MQTT服务器,按照对应信息修改即可)
"broker": {
"name": "Default Local Broker",
"host": "127.0.0.1",
"port": 1883,
"clientId": "ThingsBoard_gateway",
"version": 5,
"maxMessageNumberPerWorker": 10,
"maxNumberOfWorkers": 100,
"sendDataOnlyOnChange": false,
"security": {
"type": "basic",
"username": "user",
"password": "password"
}
}
- mapping部分:mqtt连接所订阅的主题,其中规定了消息体格式,可按需修改,以第一个topic为例:
- topicFilter:主题过滤器,此处规定主题为“/sensor/data”时会进入这个过滤器中;
- converter:数据格式转换器,将type指定的json格式转换成ThingsBoard平台能够处理的格式;
-
deviceNameJsonExpression:设备名称,如果设备名称已存在则对其进行修改,如果不存在则自动创建设备;
-
deviceTypeJsonExpression:设备类型;
-
attributes:设备客户端属性;
-
timeseries:设备时序信息,也是遥测界面监测的信息;
"topicFilter": "/sensor/data",
"converter": {
"type": "json",
"deviceNameJsonExpression": "${serialNumber}",
"deviceTypeJsonExpression": "${sensorType}",
"sendDataOnlyOnChange": false,
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "model",
"value": "${sensorModel}"
},
{
"type": "string",
"key": "${sensorModel}",
"value": "on"
}
],
"timeseries": [
{
"type": "double",
"key": "temperature",
"value": "${temp}"
},
{
"type": "double",
"key": "humidity",
"value": "${hum}"
},
{
"type": "string",
"key": "combine",
"value": "${hum}:${temp}"
}
]
}
},
3、使用MQTTX发送消息进行测试
向/sensor/data主题发送的消息体格式如下:
如果一切正常,可以在ThingsBoard平台看到名为“MQTTDevice”的设备已经被创建(需要刷新页面才能显示)。
三、ODBC路线
1、工具准备
(1)MySQL数据库
- 安装地址(windows版):MySQL :: Download MySQL Installer,第一个是在线安装,第二个是离线安装,一般选择第二个文件进行安装。
- 安装下列工具:(注意必须安装Connector/ODBC,x64和x86选择一个安装即可,建议安装x64)
- 创建一个MySQL数据库,然后创建一个数据表用于后续测试(此处创建了数据库devicemanagetest以及数据表devicestate,测试字段如下)
(2)配置ODBC数据源
- 安装ODBC驱动程序,本例中使用的是MySQL附带的Connector/ODBC作为驱动程序。
(其他驱动程序还有Devart ODBC Driver for MySQL:https://www.devart.com/odbc/mysql/;
MariaDB Connector/ODBC:https://mariadb.com/products/connectors/odbc/;注意要与所用的数据库匹配)
- 打开:控制面板->windows工具->ODBC数据源(64位)->驱动程序,检查是否存在
- MySQL ODBC 8.0 ANSI Driver
- MySQL ODBC 8.0 Unicode Driver
若存在则说明驱动程序安装成功。上述两个驱动程序仅编码语言不同,Unicode使用的语言更广泛,在本例中选择两者均可。
- 进入系统DSN,点击添加,选择一个Driver
- Data Source Name:数据源名称,是数据源的标识,本例中不会用到,但有些情况需要通过名称来指定数据源;
- TCP/IP Server:MySQL服务器地址,本地部署MySQL地址为localhost
- User/Password:MySQL登录账号与密码
- Database:本数据源所连接的数据库名称
填写完信息后点击Test,若显示连接成功则说明配置正确。
2、网关配置
(1)thingsboard_gateway/config/tb_gateway.yaml文件
- 在connectors部分新增一个连接器。
connectors:
-
name: MQTT Broker Connector
type: mqtt
configuration: mqtt.json
-
name: ODBC Connector
type: odbc
configuration: odbc.json
- 如果网关设备有改变,记得修改accessToken。
(2)odbc.json文件
下列是需要修改的部分
- str(关键配置,注意检查不能出错)
- Driver:根据注册的数据源所使用的ODBC驱动程序来填写
- Server:MySQL服务器的地址
- Port:3306,默认MySQL端口,不需更改
- Database:需要读取的数据库名称
- Uid/Pwd:MySQL登录账号与密码
- polling:定义需要执行的SQL语句和过滤条件
- query:SQL语句,根据实际用到的数据表和需要查找的字段来修改,最后的LIMIT 1是限制每次只读取一条记录。
- iterator:过滤条件,每次选取主键iddevicestate最大的一条记录进行读取。
- mapping:指定数据传送的设备对象
- device:指定设备类型与设备名称,注意设备类型必须是odbc,设备名称不能与网关设备相同,需要在ThingsBoard平台新建一个设备。
- attributes:指定需要写入设备属性的字段。
- timeseries:指定需要写入设备遥测界面的字段。
{
"connection": {
"str": "Driver={MySQL ODBC 8.0 ANSI Driver};Server=localhost;Port=3306;Database=devicemanagetest;Uid=postgres;Pwd=postgres;",
"attributes": {
"autocommit": true,
"timeout": 0
},
"encoding": "utf-8",
"decoding": {
"char": "utf-8",
"wchar": "utf-8",
"metadata": "utf-16le"
},
"reconnect": true,
"reconnectPeriod": 60
},
"pyodbc": {
"pooling": false
},
"polling": {
"query": "SELECT * FROM devicestate WHERE iddevicestate > ? ORDER BY iddevicestate ASC LIMIT 1",
"period": 10,
"iterator": {
"column": "iddevicestate",
"query": "SELECT MAX(iddevicestate)-1 FROM devicestate",
"persistent": false
}
},
"mapping": {
"device": {
"type": "'odbc'",
"name": "'ODBC1'"
},
"sendDataOnlyOnChange": false,
"attributes": "*",
"timeseries": [
{
"name": "value",
"value": "[i for i in [str_v, long_v, dbl_v,bool_v] if i is not None][0]"
}
},
"serverSideRpc": {
"enableUnknownRpc": false,
"overrideRpcConfig": false,
"methods": [
"procedureOne",
{
"name": "procedureTwo",
"args": [
"One",
2,
3.0
]
}
]
}
}
(3)list.json文件
在allow部分添加新构建的ODBC连接器
{
"deny": {
"MQTT Broker Connector": [
"Temperature Device",
"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$)"
],
"Modbus Connector": [
"My Modbus Device"
]
},
"allow": {
"MQTT Broker Connector": [
"My Temperature Sensor"
],
"ODBC Connector": [
"My ODBC Device"
]
}
}
完成上述配置后,启动 tb_gateway.py项目,就可以在ThingsBoard平台收到从数据库读取的数据(需要刷新页面)。
3、多设备连接
(1)为每个设备创建一个连接器
connectors:
-
name: MQTT Broker Connector
type: mqtt
configuration: mqtt.json
-
name: ODBC Connector
type: odbc
configuration: odbc.json
- name: ODBC Connector
type: odbc
configuration: odbc_a.json
- name: ODBC Connector
type: odbc
configuration: odbc_b.json
(2)创建odbc_a.json、odbc_b.json等配置文件,文件内容参考odbc.json,通过修改device的内容就可以连接指定设备。
四、可能存在的问题
1、MQTTX发送消息后ThingsBoard平台未收到消息
(1)检查Topic,注意开头有没有‘\’;
(2)检查消息体,与mqtt.json文件中对应主题的消息体格式是否一致,Payload是否选择了JSON格式;
(3)进入EMQX的主题界面,添加主题,查看该主题的消息流入之后是流出还是被丢弃,如果被丢弃就重新检查前两项内容以及mqtt协议的版本(一般有3.1和5.0,默认使用的是5.0),若mqtt协议版本出错则修改MQTTX连接的配置。
2、ODBC驱动程序报错
报错信息如下:
|ERROR| - [odbc_connector.py] - odbc_connector - __init_connection - 342 -
[ODBC Connector] Failed to connect to database: ('IM003', '[IM003] 由于系统错误
126: 找不到指定的模块。 (MySQL ODBC 8.0 ANSI Driver,
C:\\Program Files\\MySQL\\Connector ODBC 8.0\\myodbc8a.dll),指定驱动程序无法加载。
(160) (SQLDriverConnect)')"
报错信息显示在指定目录下找不到myodbc8a.dll,解决方案如下:
(1)检查C:\\Program Files\\MySQL目录下是否有Connector ODBC 8.0文件夹,因为x86版本会默认安装在C:\\Program Files(x86)\\MySQL目录下;
(2)通过mysql安装程序将错误目录下的Connector ODBC卸载,再重新安装(建议只安装x64版本,其默认安装位置是正确的);
(3)重新前往数据面板配置数据源。
3、SQL语句编写错误
|ERROR| - [odbc_connector.py] - odbc_connector - run - 195 - [ODBC Connector]
Error while polling database: ('The SQL contains 0 parameter markers,
but 1 parameters were supplied', 'HY000')"
报错原因是设置了过滤条件,但是SQL语句中并没有使用到条件语句(WHERE语句),解决方案:
(1)检查odbc.json文件的polling部分,如果iterator设置了过滤条件,但是query语句并没有连接WHERE语句部分就会出现上述报错,只要添加上WHERE语句即可。iterator的过滤结果会作为参数传递给WHERE语句的‘?’。
4、找不到指定设备
|WARNING| - [odbc_connector.py] - odbc_connector - __process_row - 265 -
[ODBC Connector] Failed to process database row: name 'ODBC1' is not defined"
(1)检查ThingsBoard平台是否创建了该设备,以及设备类型是否是odbc;
(2)检查odbc.json的mapping部分中device的type和name是否在双引号内添加了单引号。
五、如何创建一个gateway未自带的连接路径
gateway内部自带了mqtt、odbc、ocpp、opcua等连接器,所以在使用时只需要在tb_gateway.yaml文件中引入一个连接器并配置相应的json即可,但是如果需要通过https或者SNMP等路径传递数据,就需要做出更多配置,下面会给出一个大概思路,具体是否可行未经过测试。
以https连接为例:
1、在thingsboard_gateway/gateway/tb_gateway_service.py文件中注册connector
2、构造Https Connector
参考thingsboard_gateway/connectors/odbc
(1)创建https_connector.py文件,此处仅分析odbc_connector.py文件内的方法,实际要使构造的方法需根据需求设计:
- open:启动connector
- close:断开connector
- get_name:获取connector的名称,需要现在__init__()方法中设置name属性值,此处的name也就是后续在tb_gateway.yaml文件中引入connector时用到的name属性。
- is_connected:判断connector是否已经连接了数据库
-
on_attributes_update:属性更新的处理方法
-
server_side_rpc_handler:处理服务器端的RPC请求
-
run:主循环方法,用于启动连接器并进行连接、轮询操作
-
__poll:数据库轮询方法,执行odbc.json中定义的SQL语句并处理查询返回的数据
-
__process_row:处理数据库中的一行数据,将其转换成ThingsBoard格式,并且读取odbc.json中定义的device信息,将数据发送到指定设备
-
row_to_dict:将数据库行对象转换为字典对象
-
__check_and_send:将数据发送给指定设备的详细逻辑
-
__init_connection:初始化数据库连接,连接到指定数据库并将连接对象保存
-
__resolve_iterator_file:解析迭代器文件名,根据连接到的数据库相关信息生成文件名
-
__init_iterator:初始化迭代器
(2)创建https_converter.py文件
该文件是一个数据转换器,将json格式消息转换成ThingsBoard格式,文件内仅提供一个HtttpsConverter类接口供调用。
(3)创建https_uplink_converter.py文件
继承https_converter.py的HtttpsConverter类,是数据转换器功能的具体实现。
3、在tb_gateway.yaml中引入转换器
需注意,name属性要与https_connector.py文件中设置的一致,type属性要与tb_gateway_service.py文件中注册的一致。
connectors:
-
name: HTTPS Connector
type: https
configuration: https.json
4、编写https.json文件
需根据https报文传输的特性来编写配置,以odbc.json文件为例,可以确定的是,json文件内的各个模块,例如connection、polling、mapping等,都会在odbc_connector.py中被使用到,json文件的主要功能是:
- 指定设备
- 对数据形式作出一定的规定

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)