资讯详情

用python和tcp透传通过rs485获取传感器信息(项目实战经历)

大家好 这是我的第一篇文章 因为有很多朋友想参考,所以我把它寄到这里

首先要了解整体工程结构, 我在外包公司工作, 这个项目是外包项目, 公司只给了我三周的时间,所以我做这个项目的时候也很紧张, 自始至终只有我一个人。

本人技术背景:

我只是个负责人WebApi开发的普通程序员通常会写简单的业务界面。这个项目对我来说还是很难实现的,但还是成功了 相信很多人都会用其中的一些技术,希望大家能顺利发展,从中得到帮助。

项目的主要功能如下:

通过物联网设备对农业土壤信息和种植信息进行采集并且保存到云端提供调用

他的功能架构是:

1. 构建一套数据平台

2. 对接智能网关设备

3. 获取传感器数据

4. 清理和显示数据

5. 为公共接口开发提供第三方调用

项目整体架构

启动

  1. 会议当天回来 得知需求后 我的脑子还是一片空白,几天后 我在客户提供的基地去调研,看看怎么布电和设施,要采集的信息是那些。

    看了两个地方后 发现许多问题 比如强电不能进菜地等各种问题 最后选择了右边的种植基地。

  2. 现场调查后 开始购买设备,但我认为传感器应该非常成熟 所以我在淘宝上搜索项目所需的传感器设备,后来,我真的找到了合适的传感器 就是有点贵 一个500元,

    跟随传感器说明书 购买了rs485串口转USB的线 看了非常多RS485的文档 我试着写串口指令。 还是拿不到数据,后来问卖家 卖方写的指令可以使用 当时不知道为什么,但是成功拿到数据就好。

    但是当你得到传感器的数据时,你必须找到如何接收传感器的数据并发布传感器指令的方法?

  3. 在传感器购买页面下 我看到一种推荐的商品 叫做DTU 智能网关,马上买了一个回来研究 可惜研究了很久 都不知道如何使用python和该设备进行通讯,不断地查阅说明书和设备的官方文档后 发现设备的工作原理是

    通过tcp连接设备 → 转发tcp指令给串口 → 接收串口返回的数据 → 回传服务器

    所以我跟着设备教程 使用了TCP测试工具等 串口指令是手动编写的 传感器数据信息通过网关设备成功获取, 那是不是用python连接设备 你可以通过发送串口数据获取数据吗?但是我从来没有写过socket编程 很痛。

  4. 后来我在js找到了一篇好文章 开始尝试手写接口https://www.jianshu.com/p/c0b13dd11c6e

    # -*- coding:utf-8 -*-  import socket import threading from Config import TCP_SERVER_PORT from Main import client_handler from concurrent.futures import ThreadPoolExecutor  def Activate():     """输出服务器状态到控制台"""      while True:         time.sleep(5)         # print("[活动中的线程]",threading.activeCount())         print("[DB]", App.db)  if __name__ == '__main__':      # 创建服务器, 端口不支持复用     server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)     server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)      # 绑定端口, 打印操作信息     server_address = ('', TCP_SERVER_PORT)     server_sock.bind(server_address)     print('TCP Server starting up on port {}'.format(server_address[1]))      # 设置socket被动连接, 默认为主动连接     server_sock.listen(5)      # 线程池, 任务池     pool = ThreadPoolExecutor(max_workers=8)     task_pool = []      # # 监控服务器活动     # Activate = threading.Thread(target=Activate)     # Activate.start()      try:          while True:                      # 主流程仅用于监控新客户连接             print('Main Thread [{}], 等待客户端连接...'.format(                 threading.current_thread().name))              # client_sock专门为客户端服务socket,client_addr包括客户端IP元组和端口             client_sock, client_addr = server_sock.accept()             print('Main Thread [{}], client {} 已连接'.format(                 threading.current_thread().name, client_addr))              # 1.将新接收到的连接交给下一步             # client_handler(client_sock, client_addr)              # 2.多线程             # client = threading.Thread(target=client_handler, args=(client_sock, client_addr))             # client.setDaemon(True)             # client.start()              # 3.可控线程池             pool.submit(client_handler, client_sock, client_addr)             print("提交线程池")      except Exception as e:          # 异常结束         print(e)         server_sock.close()      finally:          # 关闭监听socket,不再响应其他客户端连接         server_sock.close()         print("关闭服务器") 

    上面的代码定义了一个socket服务器, 监听到的socket连接 通过pool.submit(client_handler, client_sock, client_addr) 将连接提交到线程池 通过以下方法验证网关设备是否合法连接 在网关设备中 网关握手时可以发送什么字符串? 作为注册包 所以我设计了一个表 用于存储网关设备的密钥 相当于 密钥是网关设备的身份证ID 下面的方法会把注册包的内容 通过请求服务器进行判断 密钥是否合法 如果合法 就会开启一个新的线程 threading.Thread(target=client_thread, args=(client_sock, client_addr, gatewaydata)) 专门负责保持该网关设备与服务器的通信

    def client_handler(client_sock, client_addr):
        """对连接进行校验, 处理注册包, 合法连接新建线程"""
    
        print("[收到新的客户端连接]处理连接", "--"*40)
    
        data = client_sock.recv(1024)
        print("注册包内容: ", data)
    
        # 转译bytes字符串
        data = str(data, encoding='utf-8')
        type, gatewaydata = AuthGateway(data)
    
        # 请求主服务器检验注册包
        
        if type:
    
            # 检验成功
    
            # 为每个新的客户连接创建一个线程,用来处理客户数据
            client = threading.Thread(target=client_thread, args=(client_sock, client_addr, gatewaydata))
            client.start()
    
            print("[注册包核验成功]合法连接, 新增子线程进行跟踪")
    
        else:
    
            # 检验失败
    
            client_sock.close()
    
            print("[注册包信息异常]非法连接, 服务器主动断开连接")
    
    

    图中新增的线程函数 client_thread 主要负责的工作是 获取该网关需要询问的指令 并且发送指令给串口 因为上面说到的 网关的工作方式是 负责发送数据给串口和接收串口返回的数据 以下代码的工作流程为 1. 生成任务id 该任务主要负责每隔多久执行一次 询问服务器获取指令并且获取数据发送给服务器 2. 创建任务调度器

    def client_thread(client_sock, client_addr, gatewaydata):
        '''处理下发任务'''
    
        try:
            # 生成任务ID
            TaskName = RandomStr()
            print("[生成定时任务ID]",TaskName)
    
        except Exception as e:
            print("[发生错误:地址1]",e)
            client_sock.close()
    
        try:
            # 任务参数
            Args = [{
                "client_sock":client_sock,
                "gatewayID":gatewaydata,
                "taskID":TaskName
            }]
    
            # 添加任务调度
            scheduler.add_job(
                TaskCheduler,
                "interval",
                id=TaskName,
                args=Args,
                seconds=60,
                jobstore="default",
                executor="default",
            )
    
        except Exception as e:
            print("[发生错误:地址2]",e)
            client_sock.close()
    

    任务调度器的工作: 1. GatAllGatewayInstructions(ChedulerData['gatewayID'])  通过当前连接的网关设备的网关ID 询问服务器获取该网关设备下的指令

    def TaskCheduler(ChedulerData):
        """任务调度器"""
    
        print("--"*40)
        print("执行任务")
    
        # 请求服务器获取指令集
        type, data = GatAllGatewayInstructions(ChedulerData['gatewayID'])
        list = []
        for i in data['data']:
            # 限制每条指令执行时间间隔为1秒
            time.sleep(1)
            # print()
            # print("指令:", i)
            s = i["data"]
            msg = s['busadd'] + " " + s['featurescode'] + " " + s['registeraddr_start'] + " " + s['registeraddr_end'] + " " + s['read_start'] + " " + s['read_end']
            # print("串口指令:",msg)
            msg = GeneratorMsg(msg, s['crccheck'])
            # print("计算结果:",msg)
            clienttype, data = ModbusRTUIO(ChedulerData['client_sock'],i['equipmentId'], msg)
            print(clienttype, data)
            if clienttype == False:
                scheduler.remove_job(ChedulerData['taskID'])
                print("设备连接异常 自动断开连接 销毁任务")
                return False
            list.append(data)
    
        # 推送数据
        PushData(list)
    
    def GatAllGatewayInstructions(gatewayID):
        """获取该网关全部指令"""
        print("[SDK.py][GatAllGatewayInstructions]=>获取该网关全部指令","GatewayID: ",gatewayID)
    
        request_url = ResultServer + '/iot/gateway/get/instructions'
        headers = {
            "Content-Type": "application/json"
        }
        data = {
            'gatewayID':gatewayID
        }
        
        result = requests.post(request_url, json=data, headers=headers)
        if result.status_code == 200:
            jsondata = result.json()
    
            if jsondata["code"] == 200:
                # print(jsondata)
                return True, jsondata
            else:
                return False, None
    
        return False, None
    

    图中为我做的后台, 里面设置的是我要询问的网关设备下的传感器的数据时所需的串口指令 基于RS485协议  

    class Equipment(BaseModel, db.Model):
        """设备参数"""
    
        __tablename__ = 'equipment'
        gateway_id = db.Column(db.Integer)      # 网关ID
        name = db.Column(db.String(255))        # 传感器名
        
        body = db.Column(db.Integer)            # body ID
        paramtemplateid = db.Column(db.Integer) # 参数模板
        # 总线信息
        modbus_busadd = db.Column(db.String(255))       # 从机地址(总线地址)
        modbus_featurescode = db.Column(db.String(255), default="4") # 功能码(功能号)
        # 寄存器起始地址
        modbus_registeraddr_start = db.Column(db.String(255))      # 起始位
        modbus_registeraddr_end = db.Column(db.String(255))        # 结束位置
        # 读取寄存器个数
        modbus_read_start = db.Column(db.String(255))              # 起始位
        modbus_read_end = db.Column(db.String(255))                # 结束位置
    
        # 包含crc校验码的计算完成的数据
        modbus_crccheck = db.Column(db.String(255))
    
        def toDict(self):
            paramTemplateitem = ParamTemplateItem.query.get(self.paramtemplateid)
            return dict(
                gateway_id = self.gateway_id,
                name = self.name,
                bodyid = self.body,
                modbus_busadd = self.modbus_busadd,
                modbus_featurescode = self.modbus_featurescode,
                modbus_registeraddr_start = self.modbus_registeraddr_start,
                modbus_registeraddr_end = self.modbus_registeraddr_end,
                modbus_read_start = self.modbus_read_start,
                modbus_read_end = self.modbus_read_end,
                modbus_crccheck = self.modbus_crccheck,
                paramtemplateid = self.paramtemplateid,
                paramtemplate = paramTemplateitem.toDict() if paramTemplateitem else {},
                **self._base()
            )
    
        def _toModbusRTU(self):
            return dict(
                busadd = self.modbus_busadd,
                featurescode = self.modbus_featurescode,
                registeraddr_start = self.modbus_registeraddr_start,
                registeraddr_end = self.modbus_registeraddr_end,
                read_start = self.modbus_read_start,
                read_end = self.modbus_read_end,
                crccheck = self.modbus_crccheck,
            )
    

    在获取到了串口数据后循环所有的数据 对串口参数进行组合 并且通过ModbusRTUIO()方法 把指令发送给对应的网关设备 并等待串口返回数据  

    def ModbusRTUIO(client_sock, equipmentId, msg):
        """单次指令IO操作"""
        # print()
        print("[ModbusRTU-IO]")
        # print("发出指令:",msg)
        try:
            client_sock.send(msg)
        except Exception as e:
            print(e)
            # print("0")
            return False, {}
    
        # 发送后等到1秒在监听接收接收
        time.sleep(1)
    
        # try:
        while True:
            data = client_sock.recv(1024)
            # print("接收到指令返回的结果:",data)
    
            if not data or data == "":
                return False, {}
    
            return True, {
                "equipmentid":equipmentId,
                "serialization": ModbusTcpSerialization(data)
            }
    

    以下是生成CRC校验码的方法 和 切割返回的串口数据的方法

    def GeneratorMsg(data, crc):
        """生成串口消息 并自动组合CRC校验码 转为16进制返回"""
        # crc = calc_crc16(str, 6)
        a = '%04x' % (int(crc))
        # print(a,bytearray.fromhex(a))
        # print(bytearray.fromhex(str + a))
        # print("高低位计算结果",a)
        return bytearray.fromhex(data + a)
    
    def ModbusTcpSerialization(data):
        return {
            "busadd":data[0],
            "featurescode": data[1],
            "effectivebit": data[2],
            "bit0": data[3],
            "bit2": data[4],
            "crch": data[5],
            "crcl": data[6],
            "data":data[3]*256+data[4]
        }
    

     

平台开发

主体 = 用户, 网关 = DTU 智能网关设备, 设备 = 传感器  

 

class Gateway(BaseModel, db.Model):
    """网关"""

    __tablename__ = 'gateway'
    principal_id = db.Column(db.Integer)    # 主体ID
    name = db.Column(db.String(255))            # 网关设备名
    accesskey = db.Column(db.Text)          # 请求许可证
    accesskeyhex = db.Column(db.Text)          # 请求许可证16进制

    def toDict(self):
        return dict(
            name = self.name,
            principal_id = self.principal_id,
            accesskey = self.accesskey,
            hex = self.accesskeyhex,
            **self._base()
        )

平台和TCPserver之间的通讯接口

TCPserver

import requests
from Config import ResultServer
import json

def AuthGateway(accesskey):
    print("[SDK.py][AuthGateway]=>校验注册包",accesskey)

    request_url = ResultServer + '/iot/gateway/auth'
    headers = {
        "Content-Type": "application/json"
    }
    data = {
        'accesskey':accesskey
    }
    
    result = requests.post(request_url, json=data, headers=headers)
    if result.status_code == 200:
        jsondata = result.json()

        if jsondata["code"] == 200:
            # print(jsondata)
            return True, jsondata["data"]['gatewayID']
        else:
            return False, None

    return False, None

def GatAllGatewayInstructions(gatewayID):
    """获取该网关全部指令"""
    print("[SDK.py][GatAllGatewayInstructions]=>获取该网关全部指令","GatewayID: ",gatewayID)

    request_url = ResultServer + '/iot/gateway/get/instructions'
    headers = {
        "Content-Type": "application/json"
    }
    data = {
        'gatewayID':gatewayID
    }
    
    result = requests.post(request_url, json=data, headers=headers)
    if result.status_code == 200:
        jsondata = result.json()

        if jsondata["code"] == 200:
            # print(jsondata)
            return True, jsondata
        else:
            return False, None

    return False, None

def PushData(data):
    """上报数据"""
    print("[SDK.py][PushData]=>上报数据")

    request_url = ResultServer + '/iot/push/data'
    headers = {
        "Content-Type": "application/json"
    }
    data = data
    for i in data:
        print(i)
    result = requests.post(request_url, json=data, headers=headers)
    print("request status : ", result, result.status_code)
    if result.status_code == 200:
        jsondata = result.json()

        if jsondata["code"] == 200:
            print("api status:", jsondata)
            return True, jsondata
        else:
            return False, None

    return False, None

平台接口

from app.Models import Gateway, Equipment, Collection
from app.Extensions import db

def GatAllGatewayInstructions(request):
    print("[取出指令]")
    gatewayID = request.get('gatewayID',None)
    print(gatewayID)

    gateway = Gateway.query.get(gatewayID)

    if not gateway:
        return 9000, "网关不存在", {}

    equipment = Equipment.query.filter(Equipment.gateway_id == gateway.id).all()

    instructions = []

    for i in equipment:
        instructions.append({
            "equipmentId": i.id,
            "data": i._toModbusRTU()
        })

    return 200, "", instructions

def GatewayAuth(request):
    print("[核验连接的设备]")
    accesskey = request.get('accesskey',None)

    if not accesskey:
        return 403, "参数有误", {}

    gateway = Gateway.query.filter(Gateway.accesskey == str(accesskey)).first()

    if not gateway:
        return 9000, "网关不存在", {}

    return 200, "验证成功", {
        "gatewayID": gateway.id
    }

def PushData(request):
    print("[接收数据]")
    for i in request:
        print(i)
    db.session.execute(
        # Table_name为表名
        Collection.__table__.insert(),
        # 列表生成式,包含大量的字典
        [{'equipment_id' : i['equipmentid'], 'data' : i['serialization']['data'], 'modbustcpdata': i['serialization']} for i in request],
    )
    db.session.commit()
    return 200, "", {}

做梦都没想到我一个程序员有下工地当苦力的一天

考虑到东西要长期在室外暴露

我在网上买了一个电箱 用来安装设备

为了安全起见还是弄一套N+1P的漏开

开始安装

淘宝上买的户外防水接线盒

rs485接线

正常运行半年了 目前一切正常

 

 

标签: py2系列位移传感器py2

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台