Mqtt Serialport Commander物联网通信脚本(ADXL355/KTC355)
/********* ADXL355 KTC175 通信脚本*********/ const mqtt = require('mqtt'); const server = 'mqtt:192.168.1.10:1883'; const client = mqtt.connect(server); const serialPort = require('serialport'); const SERIAL_PORT = new serialPort('/dev/ttyS2', { //COM5 baudRate: 9600, // 波特率 dataBits: 8, // 数据位 parity: 'none', // 奇偶校验 stopBits: 1, // 停止位 flowControl: false }, false); const mysql = require('mysql'); const program = require('commander'); program .version('0.1') .option('-g, --gwid <type>', 'gwid') // test: 10 .option('-p, --ipAdr <type>', 'ip adress') // test: 192.168.1.5 .parse(process.argv); program.gwid = 10; program.ipAdr = '192.168.1.5'; let index = 0; let timer = null; const pool = mysql.createPool({ host: '127.0.0.1', //'192.168.1.10' user: 'dbuser', password: 'dbuser', port: 3306 }); let channelList = []; // 通道及其对应的stype和sid let data_collect = []; let isCollect = false; const { gwid, ipAdr } = program; if (!(gwid && ipAdr)) { console.error('need params!'); return exit(); } //退出程序 function exit(sec = 2) { setTimeout(() => { process.exit(0); }, sec * 1000); } //从数据库查询 channel stype sid function getChannelId(ip) { return new Promise((resolve, reject) => { let sql = `SELECT channel_id channel, stype_id stype, sensorid sid FROM config.msc_channel WHERE bid = ? AND msc_ip = ? AND link_true_sensor = 0 AND channel_id != 1`; pool.query(sql, [gwid, ipAdr], (err, result) => { if (err) return reject(err); // let channels = result.map(({channel_id}) => parseInt(channel_id)); // channels = Array.from(new Set(channels)); resolve(result); }); }); } //生成请求数据 function createBuffer(channelList, index) { let i = index; let id = parseInt(channelList[i].channel, 16) let ID = Buffer.alloc(1); ID.writeUInt8(id); let START = Buffer.alloc(1); let END = Buffer.alloc(1); let TYPE = Buffer.alloc(1); let DATA = Buffer.alloc(1); START.writeUInt8(ANCHOR.FA); END.writeUInt8(ANCHOR.FD) TYPE.writeUInt8(ANCHOR.TYPE); DATA.writeUInt8(ANCHOR.DATA); let crc = ID[0] + TYPE[0] + DATA[0]; let CRC = Buffer.alloc(1); CRC.writeUInt8(crc); let buf = Buffer.concat([START, ID, TYPE, DATA, CRC, END]); return buf; } //生成topic function creatTopic(channelList, index) { // console.log(index,channelList) let i = index - 1; let stype = channelList[i].stype; let sid = channelList[i].sid; let topic = `d/${ program.gwid}/${ stype}/${ sid}`; return topic; } // Buffer 分为start, id, content, crc, end 部分 const ANCHOR = { FA: parseInt('fa', 16), FD: parseInt('fd', 16), TYPE: parseInt('01', 16), DATA: parseInt('0', 16) } //打印内容 function show_log(...infos) { console.log([new Date().toLocaleString()], ...infos); } //向serialport写入数据 function writeBufferToPort(payload_type) { SERIAL_PORT.write(payload_type, (err) => { if (err) { return show_log('Error on write:' + err.message); } }); } //发送请求数据 function sendRequest(channelList) { if (index > channelList.length - 1) { index = 0; console.log('completed end'); } console.log('channel:', channelList[index].channel); writeBufferToPort(createBuffer(channelList, index++)); timer = setTimeout(() => { sendRequest(channelList); //超时请求下一个通道 }, 5000); } //将ID TYPE转换为对应的十进制数 function transIdType(m) { m = m.toString(16); let idtype = parseInt(m, 16); return idtype; } // 两个低四位转换为一个十六进制对应的十进制数 function lowFourToDecimal(h, l) { h = h.toString(16); // 16进制 l = l.toString(16); // 16进制 let hex = parseInt(h + l, 16); // 小4位,字符串拼接,返回16进制对应的10进制数 return hex; } // 计算出crc (num=id+type+data) function decimalToOneByteHex(num) { return num % 256; } //拼接crc 并转化为16进制字符串 function transFormBufferCrc(buffer) { for (let i = 0; i < buffer.length - 1; i++) { let merge = lowFourToDecimal(buffer[i], buffer[i + 1]); return merge; } } // 将低四位Buffer转换为正常Buffer,且返回crc校验值(buffer不包含crc的两个字节) // parans {Buffer, Array} buffer 不含FA,FD,crc部分的buffer(数组为16进制转换为10进制的Array) function LowFourTobuffer(buffer) { let hexArray = []; let crc = 0; let crc_idtype = 0; for (let i = 0; i < 2; i++) { let decidtype = transIdType(buffer[i]); crc_idtype += decidtype; } for (let i = 2; i < buffer.length; i += 2) { let decimal = lowFourToDecimal(buffer[i], buffer[i + 1]); //123456 crc += decimal; hexArray.push(decimal); } 1 crc = decimalToOneByteHex(crc + crc_idtype); // crc: crc为16进制字符串,且只有最后一个字节; buffer: 不含crc的数据部分; return { crc, buffer: Buffer.from(hexArray) }; } //校验数据 function checkBuffer(buffer) { let len = buffer.length; let buffer_crc = buffer.slice(len - 2, len); let buffer_content = buffer.slice(0, len - 2); let contentObj = LowFourTobuffer(buffer_content); buffer_content = contentObj.buffer; let crc_count = contentObj.crc; let crc_check = transFormBufferCrc(buffer_crc); let crc = crc_count === crc_check; return { crc, buffer: buffer_content }; } //收集数据 function collectBuffer(buffer) { for (let i = 0; i < buffer.length; i++) { let hex = buffer[i]; if (hex === ANCHOR.FA) { isCollect = true; continue; } else if (hex === ANCHOR.FD) { clearTimeout(timer); setTimeout(() => { sendRequest(channelList); }, 2000); isCollect = false; let buffer = Buffer.from(data_collect); let checkRes = checkBuffer(buffer); data_collect = []; if (!checkRes.crc) { show_log('[crc fail]'); return; } // 发布
传感器response的有效数据 let send_buffer = checkRes.buffer; console.log('collect', send_buffer); // 发布数据 let topic = creatTopic(channelList, index); console.log('sendMessage:', '[topic]', topic, '[message]', send_buffer) // 发布数据 client.publish(topic, send_buffer); show_log('[send success]'); } if (isCollect) { data_collect.push(hex); } } } //sql请求成功 发送请求内容serioalport async function main() { try { channelList = await getChannelId(ipAdr); //发送第一次请求 sendRequest(channelList); } catch (err) { console.error(err); return exit(); } } main(); /********************************* SERIOALPORT **************************************/ // 错误监听 SERIAL_PORT.on('error', (err) => { console.log('[Error] ' + err); exit(); }); // 监听连接关闭状态 SERIAL_PORT.on('close', () => { console.log('[Close] '); exit(); }); //串口连接 SERIAL_PORT.on('open', () => { console.log('serial port connect ok'); }); //串口收到消息 SERIAL_PORT.on('data', (data) => { collectBuffer(data); }); /********************************* MQTT ********************************************/ client.on('connect', () => { console.log('mqtt connect ok'); }); client.on('message', function (topic, message) { console.log(topic, message); }); client.on('error', err => { console.log('client err:', err) exit(); }); client.on('close', () => { console.log('client close'); });
QW:1174208082