资讯详情

ThingsBoard网关mqtt连接器案例及双向RPC的BUG修复

文章目录

  • 说明
  • 过程演示
  • 文字显示视频操作过程
    • 修改网关配置
    • MQTT连接器配置
    • JS模拟网关子设备
    • 添加网关设备
    • 启动网关
    • 启动js模拟设备
    • 创建开关小部件
      • A
      • B
      • C
      • D
      • E
  • MQTT连接器双向RPC的BUG修复

说明

了解以下案例MQTT使用遥测、属性、单向双向连接器RPC。案例:有一个开关传感器,集成了一些温度和湿度,温度和湿度被上传到设备遥测模型上传到设备属性。RPC双向控制开关RPC获取开关状态。BUG:tb网关的MQTT连接器双向RPC有问题,不能成功,视频和文章将演示如何解决BUG。

过程演示

ThingsBoard网关 mqtt演示和双向连接器RPC的BUG修复

文字显示视频操作过程

修改网关配置

thingsboard_gateway/config/tb_gateway.yaml

thingsboard:   host: 192.168.7.198 # TB mqtt ip   port: 1883 # TB mqtt端口   remoteShell: false   remoteConfiguration: false   security:     accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstoken   qos: 0 storage:   type: memory   read_records_count: 100   max_records_count: 100000 connectors: # 打开MQTT连接器   -     name: MQTT Broker Connector     type: mqtt     configuration: mqtt-test.json #指定mqtt连接器配置文件 

MQTT连接器配置

{ 
           "broker": { 
             "name":"EMQX Broker",     "host":"192.168.7.190",     "port":1883,     "clientId": "ThingsBoard_gateway",     "security": { 
               "type": "basic",       "username": 
       
        "admin"
        , 
        "password"
        : 
        "admin" 
        } 
        }
        , 
        "mapping"
        : 
        [ 
        { 
          
        "topicFilter"
        : 
        "sensor/data"
        , 
        "converter"
        : 
        { 
          
        "type"
        : 
        "json"
        , 
        "deviceNameJsonExpression"
        : 
        "${serialNumber}"
        , 
        "deviceTypeJsonExpression"
        : 
        "${sensorType}"
        , 
        "timeout"
        : 
        60000
        , 
        "attributes"
        : 
        [ 
        { 
          
        "type"
        : 
        "string"
        , 
        "key"
        : 
        "model"
        , 
        "value"
        : 
        "${sensorModel}" 
        }
        , 
        { 
          
        "type"
        : 
        "string"
        , 
        "key"
        : 
        "${sensorModel}"
        , 
        "value"
        : 
        "on" 
        } 
        ]
        , 
        "timeseries"
        : 
        [ 
        { 
          
        "type"
        : 
        "double"
        , 
        "key"
        : 
        "temperature"
        , 
        "value"
        : 
        "${temp}" 
        }
        , 
        { 
          
        "type"
        : 
        "double"
        , 
        "key"
        : 
        "humidity"
        , 
        "value"
        : 
        "${hum}" 
        }
        , 
        { 
          
        "type"
        : 
        "boolean"
        , 
        "key"
        : 
        "occupancy"
        , 
        "value"
        : 
        "${occ}" 
        }
        , 
        { 
          
        "type"
        : 
        "int"
        , 
        "key"
        : 
        "state"
        , 
        "value"
        : 
        "${state}" 
        } 
        ] 
        } 
        } 
        ]
        , 
        "connectRequests"
        : 
        [ 
        { 
          
        "topicFilter"
        : 
        "sensor/connect"
        , 
        "deviceNameJsonExpression"
        : 
        "${serialNumber}" 
        } 
        ]
        , 
        "disconnectRequests"
        : 
        [ 
        { 
          
        "topicFilter"
        : 
        "sensor/disconnect"
        , 
        "deviceNameJsonExpression"
        : 
        "${serialNumber}" 
        } 
        ]
        , 
        "attributeUpdates"
        : 
        [ 
        { 
          
        "deviceNameFilter"
        : 
        ".*"
        , 
        "attributeFilter"
        : 
        "uploadFrequency"
        , 
        "topicExpression"
        : 
        "sensor/${serialNumber}/${attributeKey}"
        , 
        "valueExpression"
        : 
        "{\"${attributeKey}\":\"${attributeValue}\"}" 
        } 
        ]
        , 
        "serverSideRpc"
        : 
        [ 
        { 
          
        "deviceNameFilter"
        : 
        ".*"
        , 
        "methodFilter"
        : 
        "getState"
        , 
        "requestTopicExpression"
        : 
        "sensor/${deviceName}/request/${methodName}"
        , 
        "responseTopicExpression"
        : 
        "sensor/${deviceName}/response/${methodName}"
        , 
        "responseTimeout"
        : 
        10000
        , 
        "valueExpression"
        : 
        "${params}" 
        }
        , 
        { 
          
        "deviceNameFilter"
        : 
        ".*"
        , 
        "methodFilter"
        : 
        "setState"
        , 
        "requestTopicExpression"
        : 
        "sensor/${deviceName}/request/${methodName}"
        , 
        "valueExpression"
        : 
        "${params}" 
        } 
        ] 
        } 
       

JS模拟网关子设备

安装依赖 npm install mqttjs文件名 mqtt_client.js 注意下面的日志格式,后面不在赘述。

var mqtt = require('mqtt');
console.log('start mqtt_client,prepare connect');
//连接emqx broker
var client = mqtt.connect('mqtt://192.168.7.190:1883', { 
        
	username: 'admin',
	password: 'admin',
});

//开关状态
state = 0;
var preTimer;
//设备连接回调
client.on('connect', function () { 
        
	console.log('connected');
	//发送连接请求
	publish('sensor/connect', { 
        
		serialNumber: 'SN-002',
	});
	//订阅服务端RPC
	client.subscribe('sensor/+/request/+');
	//订阅设备属性更新
	client.subscribe('sensor/+/+');

	//不断的上传遥测
	preTimer = telemetry();
});

//收到消息回调
client.on('message', function (topic, message) { 
        
	console.log('on message:');
	console.log(topic);
	console.log(message.toString());
	console.log('--------------------------------------');

	let data = JSON.parse(message);

	if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) { 
        
		console.log('RegExp success');
		var levels = topic.split('/');
		if (levels[3] === 'getState') { 
        
			console.log('receive getState');
			//回复 twoway RPC
			publish('sensor/' + levels[1] + '/response/' + levels[3], { 
        
				state: state,
			});
		}
		if (levels[3] === 'setState') { 
        
			console.log('receive setState');
			state = data.stateValue;
			//重新发送遥测 js没有双向绑定只能这样操作了
			if (preTimer != null) { 
        
				clearInterval(preTimer);
			}
			preTimer = telemetry();
		}
	}
});

//发送遥测 返回定时器对象
function telemetry() { 
        
	return setInterval(() => { 
        
		publish('sensor/data', { 
        
			serialNumber: 'SN-002',
			sensorType: 'default',
			sensorModel: 'SN-model',
			'SN-model': 'on',
			temp: Math.random(),
			hum: Math.random(),
			occ: true,
			state: state,
		});
	}, 3000);
}
//发送数据+打印日志
function publish(topic, message) { 
        
	client.publish(topic, JSON.stringify(message));
	log(topic, message);
}

//控制台打印
function log(topic, message) { 
        
	console.log('send message:');
	console.log(topic);
	console.log(JSON.stringify(message));
	console.log('===================================');
}

添加网关设备

image.png

启动网关

网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb 代表网关连接TB MQTT服务器成功。

""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"

启动js模拟设备

控制台执行 node mqtt_client.js前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器

start mqtt_client,prepare connect
connected

接下来会发送一条连接请求,主题sensor/connect到emqx broker,日志如下

send message:
sensor/connect
{ 
        "serialNumber":"SN-002"}
===================================

对应mqtt-test.json配置

"connectRequests": [
    { 
        
      "topicFilter": "sensor/connect",
      "deviceNameJsonExpression": "${serialNumber}"
    }
  ],

网关收到打印如下,SN-002就是网关解析的子设备名

""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"

此时打开tb页面,设备列表出现设备SN-002


接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。

创建开关小部件

创建Round switch``小部件,图中的ADCDE部分一一解释。

A

机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)应该就是从遥测或属性中找state数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?

B

通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。

C

通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。

D

参考B的解释

E

点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。


小部件创建好了,应该会自动发送一个getState双向RPC请求来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!

MQTT连接器双向RPC的BUG修复

开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下:

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3452, in _thread_main
    self.loop_forever(retry_first_connection=True)
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1779, in loop_forever
    rc = self.loop(timeout, max_packets)
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1181, in loop
    rc = self.loop_read(max_packets)
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 1572, in loop_read
    rc = self._packet_read()
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2310, in _packet_read
    rc = self._packet_handle()
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 2936, in _packet_handle
    return self._handle_publish()
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3216, in _handle_publish
    self._handle_on_message(message)
  File "/Library/Python/3.7/site-packages/paho/mqtt/client.py", line 3444, in _handle_on_message
    self.on_message(self, self._userdata, message)
  File "/Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt/mqtt_connector.py", line 505, in _on_message
    if message.topic in self.__gateway.rpc_requests_in_progress:
AttributeError: 'TBGatewayService' object has no attribute 'rpc_requests_in_progress'

关键就是最后一句'TBGatewayService' object has no attribute 'rpc_requests_in_progress' 说网关服务类没有属性rpc_requests_in_progress 打开网关服务类thingsboard_gateway/gateway/tb_gateway_service.py找到这个属性看一下是这样的: self.__rpc_requests_in_progress = {} python类中俩个下划线 __代表私有属性,所以在报错位置thingsboard_gateway/connectors/mqtt/mqtt_connector.py是调用不到这个属性的,报错是肯定的。 if message.topic in self.__gateway.rpc_requests_in_progress:

if message.topic in self.__gateway.rpc_requests_in_progress:
  self.__gateway.rpc_with_reply_processing(message.topic, content)
  return None

修复方法就是打开tb_gateway_service.py将属性改为公开属性,即:替换所有self.__rpc_requests_in_progressself.rpc_requests_in_progress 你以为这就完事了? 下面俩处不解释了,照着改了,双向RPC就没问题了。

  1. tb_gateway_service.pyrpc_with_reply_processing方法,注释图中一行

  2. tb_gateway_service.pyregister_rpc_request_timeout方法 没改之前是这样的 改成这样 到这里就好啦,再试试双向RPC吧!

标签: 1连接器d连接器型号10075025b1501连接器971c连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台