资讯详情

Python 救救我!如何连接串口实现一个报警灯报警器

1. 背景说明

2.准备环境物料

3.初步调试测试

3.1 硬件产品测试

3.2 Python串口调试

4.源码开发

5.打包

6.效果演示


1. 背景说明

我们的一个项目已经在实施阶段,客户提出了一个新的要求:在特定的紧急情况下,将报警信号连接到他们(客户)的计算机设备上(报警灯闪烁) 发出蜂鸣警告)。

硬件开发可以追溯到我的大学生活。当时我们一开始就用汇编和C做单片机开发,后来再用。C 进行ARM开发,最后使用Z-Stack做物联网开发。可以离现在太远了,当初掌握的知识早已献给时间。

这个小要求摆在我面前,压力很大。我必须克服它,但我不打算为它付出很多代价。所以我的最终决定是:硬件设备部分,直接购买成型产品(暴露API或类似串口等调用方式);软件部分,与服务端建立websocket当接收到服务端推送的紧急信息时,调用硬件发送报警信息。

2.准备环境物料

当我在宝藏中搜索关键词报警灯 二次开发 api】这样的关键词很少找到合适的产品,只有少数外观和价格不令人满意。最后,当关键字被报警灯取代时 串口 开发】之后,我找到了我想要的东西(售价60米)。内建USB提供相应的串口指令集。

快速网购,物流,快!硬件到手,为了简单开发,我要用Python开发串口通信和程序。

鉴于此次开发和使用Python,在此列出开发环境:

  1. Windows7 64

  2. Python3.6

  3. python依赖模块包括:websocket-client(websocket)、pyserial(串口)、pyinstaller(打包exe)

3.初步调试测试

3.1 硬件产品测试

首先,需要测试收到的产品是否正常运行,并使用串口发送指令进行控制。

第一步是设备硬件usb插入电脑

第二步是安装驱动程序:卖方提供驱动程序

第三步是找到特定的串口号(右键计算机)->管理->设备管理器->端口),我的串口是COM10(注意,不同的设备会有不同的口号,不同的插座会有不同的设备)

第四步是使用产品附带控制软件进行测试。打开串口后,测试无异常

3.2 Python串口调试

接下来进行使用Python控制硬件的调试。

01 05 00 00 ff 00 8C 3A 打开红灯(常亮) 01 05 00 00 00 00 CD CA 关闭红色  01 05 00 03 ff 00 7C 3A 打开蜂鸣器(一直响) 01 05 00 03 00 00 3D CA 关闭蜂鸣器   01 05 00 00 f0 00 89 CA 红灯闪烁(2Hz)  01 05 00 00 f1 00 88 5A 红灯闪烁(1Hz)  01 05 00 00 f2 00 88 AA 红灯闪烁(0.5Hz)  01 05 00 00 f3 00 89 3A 红灯闪烁(0.25Hz)   01 05 00 03 f0 00 79 CA 蜂鸣器模式1(2Hz)  01 05 00 03 f1 00 78 5A 蜂鸣器模式2(1Hz)  01 05 00 03 f2 00 78 AA 蜂鸣器模式2(0.5Hz)  01 05 00 03 f3 00 79 3A 蜂鸣器模式2(0.25Hz)

该设备提供上述指令集,串口和指令要求如下

串口设置: 9600,8,n,1。 控制命令,HEX格式发送(不能ASCII格式发送)

Python实现Demo如下

import serial import time  #### 串口号,右键计算机->点击设备->点击端口->查看获取 SERIAL_PORT = 'COM10' ser=serial.Serial(SERIAL_PORT,9600,timeout指令声明 ## 红灯常亮 RED_LIGHT_CMD_ALWAYS = b'\x01\x05\x00\x00\xff\x00\x8C\x3A' ## 关闭红灯 RED_LIGHT_CMD_CLOSE = b'\x01\x05\x00\x00\x00\x00\xCD\xCA' ## 红灯闪烁2HZ RED_LIGHT_CMD_2HZ = b'\x01\x05\x00\x00\xf0\x00\x89\xCA' ## 红灯闪烁1HZ RED_LIGHT_CMD_1HZ = b'\x01\x05\x00\x00\xf1\x00\x88\x5A' ## 红灯闪烁0.5HZ RED_LIGHT_CMD_0_5HZ = b'\x01\x05\x00\x00\xf2\x00\x88\xAA' ## 红灯闪烁0.25HZ RED_LIGHT_CMD_0_25HZ = b'\x01\x05\x00\x00\xf3\x00\x89\x3A' ## 蜂鸣器常响 BUZZ_CMD_ALWAYS = b'\x01\x05\x00\x03\xff\x00\x7C\x3A' ## 关闭蜂鸣器 BUZZ_CMD_CLOSE = b'\x01\x05\x00\x03\x00\x00\x3D\xCA' ## 蜂鸣器模式2HZ BUZZ_CMD_2HZ = b'\x01\x05\x00\x03\xf0\x00\x79\xCA' ## 蜂鸣器模式1HZ BUZZ_CMD_1HZ = b'\x01\x05\x00\x03\xf1\x00\x78\x5A' ## 蜂鸣器模式0.5HZ BUZZ_CMD_0_5HZ = b'\x01\x05\x00\x03\xf2\x00\x78\xAA' ## 蜂鸣器模式0.25HZ BUZZ_CMD_0_25HZ = b'\x01\x05\x00\x03\xf3\x00\x79\x3A'  #### #   打开端口 #### def openSerial():     if not ser.isOpen():         ser.close()         ser.open()     print(成功连接到串口!')  #### #   向设备发送指令 #### def sendCmdToDevice(cmd, t=0.1):     openSerial()     ser.write(cmd)     ##默认延迟0.如果红灯亮10秒,可以控制指令的持续时间,防止指令叠加发送导致指令失效。可传t=10     time.sleep(t)     ser.close()  if __name__ == '__main__':     #sendCmdToDevice(RED_LIGHT_CMD_ALWAYS)     sendCmdToDevice(RED_LIGHT_CMD_CLOSE)     #sendCmdToDevice(RED_LIGHT_CMD_2HZ)     #sendCmdToDevice(RED_LIGHT_CMD_1HZ)     #sendCmdToDevice(RED_LIGHT_CMD_0_5HZ)     #sendCmdToDevice(RED_LIGHT_CMD_0_25HZ)     #sendCmdToDevice(BZZ_CMD_ALWAYS)
    #sendCmdToDevice(BUZZ_CMD_CLOSE)
    #sendCmdToDevice(BUZZ_CMD_2HZ)
    #sendCmdToDevice(BUZZ_CMD_1HZ)
    #sendCmdToDevice(BUZZ_CMD_0_5HZ)
    #sendCmdToDevice(BUZZ_CMD_0_25HZ)

如上,对所有的指令进行16进制转换后,使用函数打开端口,使用函数向端口写指令,发送指令后掉。对所有操作都测试无误后进入下阶段的开发。

4.源码开发

websocket的服务端使用Java开发,不再赘述。

接下来使用Python开发websocket客户端,监听到报警信息,调用设备发出报警。源码如下:

{
	"USER_ID": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
	"WARN_TIME": "3",
	"WEB_SOCKET_URL": "wss://{
    
      {DOMAIN}}/websocket/websocket/",
	"SERIAL_PORT": "COM10",
	"IF_BUZZ": "True"
}
  • USER_ID:用于区分socket的中的不同用户

  • WARN_TIME:警报持续时间(单位秒)

  • WEB_SOCKET_URL:websocket的服务端地址

  • SERIAL_PORT:串口号

  • IF_BUZZ:是否开启蜂鸣警报(True/False)

import websocket
import threading
import time
import serial
import json

#### 读取配置文件
jsonConfigStr = ''
with open("config.json", "r") as f:
    jsonConfigStr = f.read()
configData = json.loads(jsonConfigStr)

#### !!!!用户ID
DEPT_ID = configData['USER_ID']
#### !!!!警报持续时长(秒)
WARN_TIME = int(configData['WARN_TIME'])
WEB_SOCKET_URL = configData['WEB_SOCKET_URL']
WS_URL = WEB_SOCKET_URL + DEPT_ID + '-' + str(int(time.time()))

#### !!!!串口号,右键计算机->点击设备->点击端口->查看获取
SERIAL_PORT = configData['SERIAL_PORT']
ser=serial.Serial(SERIAL_PORT,9600,timeout=0.5)

#### !!!!是否开启蜂鸣
IF_BUZZ = configData['IF_BUZZ'] == 'True'

######## 指令声明
## 红灯常亮
RED_LIGHT_CMD_ALWAYS = b'\x01\x05\x00\x00\xff\x00\x8C\x3A'
## 关闭红灯
RED_LIGHT_CMD_CLOSE = b'\x01\x05\x00\x00\x00\x00\xCD\xCA'
## 红灯闪烁2HZ
RED_LIGHT_CMD_2HZ = b'\x01\x05\x00\x00\xf0\x00\x89\xCA'
## 红灯闪烁1HZ
RED_LIGHT_CMD_1HZ = b'\x01\x05\x00\x00\xf1\x00\x88\x5A'
## 红灯闪烁0.5HZ
RED_LIGHT_CMD_0_5HZ = b'\x01\x05\x00\x00\xf2\x00\x88\xAA'
## 红灯闪烁0.25HZ
RED_LIGHT_CMD_0_25HZ = b'\x01\x05\x00\x00\xf3\x00\x89\x3A'
## 蜂鸣器常响
BUZZ_CMD_ALWAYS = b'\x01\x05\x00\x03\xff\x00\x7C\x3A'
## 关闭蜂鸣器
BUZZ_CMD_CLOSE = b'\x01\x05\x00\x03\x00\x00\x3D\xCA'
## 蜂鸣器模式2HZ
BUZZ_CMD_2HZ = b'\x01\x05\x00\x03\xf0\x00\x79\xCA'
## 蜂鸣器模式1HZ
BUZZ_CMD_1HZ = b'\x01\x05\x00\x03\xf1\x00\x78\x5A'
## 蜂鸣器模式0.5HZ
BUZZ_CMD_0_5HZ = b'\x01\x05\x00\x03\xf2\x00\x78\xAA'
## 蜂鸣器模式0.25HZ
BUZZ_CMD_0_25HZ = b'\x01\x05\x00\x03\xf3\x00\x79\x3A'

####
#   延时
####
def delay(s):
    try:
        time.sleep(s)
    except:
        delay(s)

####
#   打开端口
####
def openSerial():
    if not ser.isOpen():
        ser.close()
        ser.open()
    print('成功连接到串口!')

####
#   向设备发送指令
####
def sendCmdToDevice(cmd, t=0.1):
    openSerial()
    ser.write(cmd)
    delay(t)
    ser.close()

#####
#   心跳线程
#####
class heartBeatThread (threading.Thread):
    def __init__(self, ws):
        threading.Thread.__init__(self)
        self.ws = ws
    def run(self):
        while True:
            ws.send('ping')
            print('ping!')
            delay(3)

#####
#   警报线程:红灯2HZ闪烁
#####
class warnThread (threading.Thread):
    def __init__(self, ws):
        threading.Thread.__init__(self)
        self.ws = ws
    def run(self):
        sendCmdToDevice(RED_LIGHT_CMD_2HZ)
        if IF_BUZZ:
            sendCmdToDevice(BUZZ_CMD_2HZ)
        delay(WARN_TIME)
        sendCmdToDevice(RED_LIGHT_CMD_CLOSE)
        if IF_BUZZ:
            sendCmdToDevice(BUZZ_CMD_CLOSE)

###
#   websocket-client:接受消息回调
###
def on_message(ws, message):
    if 'warninfo' in message:
      	print('检测到报警信息')
        warnThread(ws).start()
    else:
        print(message)

###
#   websocket-client:发生错误回调
###
def on_error(ws, error):
    print("链接出现错误")

###
#   websocket-client:链接断开回调
###
def on_close(ws):
    print("链接已经断开")

###
#   websocket-client:链接建立回调
###
def on_open(ws):
    print('检测到链接建立')
    heartBeatThread(ws).start()
    

if __name__ == '__main__':
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(WS_URL, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
    try:
        ws.run_forever()
    except:
        ws.close()

 

截止到此,开发全部完成

以上涉及到的源码其实还是有不足和缺陷的,如果你发现的话,不妨在讨论区交流哦~

5.打包

pyinstaller -D -i i.ico daemon.py

在dist下生成了可执行文件和相关依赖

6.效果演示





收到报警信息后红灯闪烁并蜂鸣,点击查看视频效果

标签: f1压力继电器f0r连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台