文章目录
说明
了解以下案例MQTT使用遥测、属性、单向双向连接器RPC。案例:有一个开关传感器,集成了一些温度和湿度,温度和湿度被上传到设备遥测模型上传到设备属性。RPC双向控制开关RPC获取开关状态。BUG:tb网关的MQTT连接器双向RPC有问题,不能成功,视频和文章将演示如何解决BUG。
过程演示
ThingsBoard网关 mqtt演示和双向连接器RPC的BUG修复
文字显示视频操作过程
修改网关配置
thingsboard_gateway/config/tb_gateway.yaml
thingsboard: host: 192.168.7.198 # TB mqtt ip port: 1883 # TB mqtt端口 remoteShell: false remoteConfiguration: false security: accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstoken qos: 0 storage: type: memory read_records_count: 100 max_records_count: 100000 connectors: # 打开MQTT连接器 - name: MQTT Broker Connector type: mqtt configuration: mqtt-test.json #指定mqtt连接器配置文件
MQTT连接器配置
{
"broker": {
"name":"EMQX Broker", "host":"192.168.7.190", "port":1883, "clientId": "ThingsBoard_gateway", "security": {
"type": "basic", "username":
"admin"
,
"password"
:
"admin"
}
}
,
"mapping"
:
[
{
"topicFilter"
:
"sensor/data"
,
"converter"
:
{
"type"
:
"json"
,
"deviceNameJsonExpression"
:
"${serialNumber}"
,
"deviceTypeJsonExpression"
:
"${sensorType}"
,
"timeout"
:
60000
,
"attributes"
:
[
{
"type"
:
"string"
,
"key"
:
"model"
,
"value"
:
"${sensorModel}"
}
,
{
"type"
:
"string"
,
"key"
:
"${sensorModel}"
,
"value"
:
"on"
}
]
,
"timeseries"
:
[
{
"type"
:
"double"
,
"key"
:
"temperature"
,
"value"
:
"${temp}"
}
,
{
"type"
:
"double"
,
"key"
:
"humidity"
,
"value"
:
"${hum}"
}
,
{
"type"
:
"boolean"
,
"key"
:
"occupancy"
,
"value"
:
"${occ}"
}
,
{
"type"
:
"int"
,
"key"
:
"state"
,
"value"
:
"${state}"
}
]
}
}
]
,
"connectRequests"
:
[
{
"topicFilter"
:
"sensor/connect"
,
"deviceNameJsonExpression"
:
"${serialNumber}"
}
]
,
"disconnectRequests"
:
[
{
"topicFilter"
:
"sensor/disconnect"
,
"deviceNameJsonExpression"
:
"${serialNumber}"
}
]
,
"attributeUpdates"
:
[
{
"deviceNameFilter"
:
".*"
,
"attributeFilter"
:
"uploadFrequency"
,
"topicExpression"
:
"sensor/${serialNumber}/${attributeKey}"
,
"valueExpression"
:
"{\"${attributeKey}\":\"${attributeValue}\"}"
}
]
,
"serverSideRpc"
:
[
{
"deviceNameFilter"
:
".*"
,
"methodFilter"
:
"getState"
,
"requestTopicExpression"
:
"sensor/${deviceName}/request/${methodName}"
,
"responseTopicExpression"
:
"sensor/${deviceName}/response/${methodName}"
,
"responseTimeout"
:
10000
,
"valueExpression"
:
"${params}"
}
,
{
"deviceNameFilter"
:
".*"
,
"methodFilter"
:
"setState"
,
"requestTopicExpression"
:
"sensor/${deviceName}/request/${methodName}"
,
"valueExpression"
:
"${params}"
}
]
}
JS模拟网关子设备
安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。
var mqtt = require('mqtt');
console.log('start mqtt_client,prepare connect');
//连接emqx broker
var client = mqtt.connect('mqtt://192.168.7.190:1883', {
username: 'admin',
password: 'admin',
});
//开关状态
state = 0;
var preTimer;
//设备连接回调
client.on('connect', function () {
console.log('connected');
//发送连接请求
publish('sensor/connect', {
serialNumber: 'SN-002',
});
//订阅服务端RPC
client.subscribe('sensor/+/request/+');
//订阅设备属性更新
client.subscribe('sensor/+/+');
//不断的上传遥测
preTimer = telemetry();
});
//收到消息回调
client.on('message', function (topic, message) {
console.log('on message:');
console.log(topic);
console.log(message.toString());
console.log('--------------------------------------');
let data = JSON.parse(message);
if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {
console.log('RegExp success');
var levels = topic.split('/');
if (levels[3] === 'getState') {
console.log('receive getState');
//回复 twoway RPC
publish('sensor/' + levels[1] + '/response/' + levels[3], {
state: state,
});
}
if (levels[3] === 'setState') {
console.log('receive setState');
state = data.stateValue;
//重新发送遥测 js没有双向绑定只能这样操作了
if (preTimer != null) {
clearInterval(preTimer);
}
preTimer = telemetry();
}
}
});
//发送遥测 返回定时器对象
function telemetry() {
return setInterval(() => {
publish('sensor/data', {
serialNumber: 'SN-002',
sensorType: 'default',
sensorModel: 'SN-model',
'SN-model': 'on',
temp: Math.random(),
hum: Math.random(),
occ: true,
state: state,
});
}, 3000);
}
//发送数据+打印日志
function publish(topic, message) {
client.publish(topic, JSON.stringify(message));
log(topic, message);
}
//控制台打印
function log(topic, message) {
console.log('send message:');
console.log(topic);
console.log(JSON.stringify(message));
console.log('===================================');
}
添加网关设备
启动网关
网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb
代表网关连接TB MQTT服务器成功。
""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"
启动js模拟设备
控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器
start mqtt_client,prepare connect
connected
接下来会发送一条连接请求,主题sensor/connect
到emqx broker,日志如下
send message:
sensor/connect
{
"serialNumber":"SN-002"}
===================================
对应mqtt-test.json
配置
"connectRequests": [
{
"topicFilter": "sensor/connect",
"deviceNameJsonExpression": "${serialNumber}"
}
],
网关收到打印如下,SN-002
就是网关解析的子设备名
""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"
此时打开tb页面,设备列表出现设备SN-002
。
接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。
创建开关小部件
创建Round switch``小部件
,图中的ADCDE部分一一解释。
A
机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)应该就是从遥测或属性中找state
数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?
B
通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。
C
通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。
D
参考B的解释
E
点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。
小部件创建好了,应该会自动发送一个getState双向RPC请求
来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!
MQTT连接器双向RPC的BUG修复
开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下:
Exception in thread Thread-6:
Traceback (most recent call last):
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3452, in _thread_main
self.loop_forever(retry_first_connection=True)
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1779, in loop_forever
rc = self.loop(timeout, max_packets)
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1181, in loop
rc = self.loop_read(max_packets)
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_read
rc = self._packet_read()
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_read
rc = self._packet_handle()
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2936, in _packet_handle
return self._handle_publish()
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3216, in _handle_publish
self._handle_on_message(message)
File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3444, in _handle_on_message
self.on_message(self, self._userdata, message)
File "/Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt/mqtt_connector.py", line 505, in _on_message
if message.topic in self.__gateway.rpc_requests_in_progress:
AttributeError: 'TBGatewayService' object has no attribute 'rpc_requests_in_progress'
关键就是最后一句'TBGatewayService' object has no attribute 'rpc_requests_in_progress'
说网关服务类没有属性rpc_requests_in_progress
打开网关服务类thingsboard_gateway/gateway/tb_gateway_service.py
找到这个属性看一下是这样的: self.__rpc_requests_in_progress = {}
python类中俩个下划线 __
代表私有属性,所以在报错位置thingsboard_gateway/connectors/mqtt/mqtt_connector.py
是调用不到这个属性的,报错是肯定的。 if message.topic in self.__gateway.rpc_requests_in_progress:
if message.topic in self.__gateway.rpc_requests_in_progress:
self.__gateway.rpc_with_reply_processing(message.topic, content)
return None
修复方法就是打开tb_gateway_service.py
将属性改为公开属性,即:替换所有self.__rpc_requests_in_progress
为self.rpc_requests_in_progress
你以为这就完事了? 下面俩处不解释了,照着改了,双向RPC就没问题了。
-
tb_gateway_service.py
的rpc_with_reply_processing
方法,注释图中一行 -
tb_gateway_service.py
的register_rpc_request_timeout
方法 没改之前是这样的 改成这样 到这里就好啦,再试试双向RPC吧!