视频教程已经在B站 请三连我 虽然我没有稚晖君那么强壮,虽然我没有稚晖君那么强壮
教程!基于树莓派 传感器 阿里云IoT智能家居管理(2)
主文件
#!/usr/bin/python3 import aliLink,mqttd,rpi import time,json import Adafruit_DHT import time import LCD1602 import flame_sensor import buzzer_1 import rain_detector import gas_sensor import relay from threading import Thread pin = 19 # DHT11 温湿度传感器管脚定义 Buzzer = 20 # 定义有源
蜂鸣器管脚 # GPIO口定义 sensor = Adafruit_DHT.DHT11 # 三元素(iot后台获取) ProductKey = 'a11lzCDSgZP' DeviceName = 'IU6aSETyiImFPSkpcywm' DeviceSecret = "2551eb5f630c372743c538e9b87bfe6d" # topic (iot后台获取) POST = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post' # 向云报告消息 POST_REPLY = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post_reply' SET = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/service/property/set' # 订阅云指令 #窗户开关 window = 0 window_status = 0 Thread(target=relay.close).start() # 消息回调(云发消息回调函数) def on_message(client, userdata, msg): #print(msg.payload) Msg = json.loads(msg.payload) global window,window_status window = Msg['params']['window'] print(msg.payload) # 开关值 if window_status != window: window_status = window if window
== 1 : Thread (target =relay . open ) .start ( ) else : Thread (target =relay .close ) .start ( ) #连接回调(与阿里云建立链接后的回调函数) def on_connect (client , userdata , flags , rc ) : pass # 链接信息 Server ,ClientId ,userNmae ,Password = aliLink .linkiot (DeviceName ,ProductKey ,DeviceSecret ) # mqtt链接 mqtt = mqttd .MQTT (Server ,ClientId ,userNmae ,Password ) mqtt .subscribe (SET ) # 订阅服务器下发消息topic mqtt .begin (on_message ,on_connect ) # 信息获取上报,每2秒钟上报一次系统参数 while True : #获取指示灯状态 power_stats = int (rpi .getLed ( ) ) if (power_stats == 0 ) : power_LED = 0 else : power_LED = 1 # CPU 信息 CPU_temp = float (rpi .getCPUtemperature ( ) ) # 温度 ℃ CPU_usage = float (rpi .getCPUuse ( ) ) # 占用率 % # RAM 信息 RAM_stats =rpi .getRAMinfo ( ) RAM_total = round ( int (RAM_stats [ 0 ] ) / 1000 , 1 ) # RAM_used = round ( int (RAM_stats [ 1 ] ) / 1000 , 1 ) RAM_free = round ( int (RAM_stats [ 2 ] ) / 1000 , 1 ) # Disk 信息 DISK_stats =rpi .getDiskSpace ( ) DISK_total = float (DISK_stats [ 0 ] [ : - 1 ] ) DISK_used = float (DISK_stats [ 1 ] [ : - 1 ] ) DISK_perc = float (DISK_stats [ 3 ] [ : - 1 ] ) #温度,湿度 humidity , temperature = Adafruit_DHT .read_retry (sensor , pin ) # LCD显示 LCD = 0 try : LCD1602 .init ( 0x27 , 1 ) # 初始化显示屏 LCD1602 .write ( 0 , 0 , 'humidity: ' + str ( int (humidity ) ) + '%' ) LCD1602 .write ( 0 , 1 , 'temperature: ' + str ( int (temperature ) ) + '\'' ) # 在第二行显示world! LCD = 1 except : print ( "显示屏连接不稳定,请检查" ) LCD = 0 # 蜂鸣器 buzzer = 0 # 火焰传感器 flame_sensor .setup ( ) if flame_sensor .fire ( ) == 0 : flame = 1 buzzer_1 .buzzer_on ( ) #让铃声叫 buzzer = 1 else : flame = 0 buzzer_1 .buzzer_off ( ) #铃声不叫 buzzer = 0 # 烟雾传感器 gas = gas_sensor .gas ( ) # 雨滴传感器 rain_detector .setup ( ) if rain_detector .rain ( ) == 0 : rain = 1 else : rain = 0 # 构建与云端模型一致的消息结构 updateMsn = { 'cpu_temperature' :CPU_temp , 'cpu_usage' :CPU_usage , 'RAM_total' :RAM_total , 'RAM_used' :RAM_used , 'RAM_free' :RAM_free , 'DISK_total' :DISK_total , 'DISK_used_space' :DISK_used , 'DISK_used_percentage' :DISK_perc , 'PowerLed' :power_LED , 'temperature' :temperature , 'humidity' :humidity , 'window' :window , 'LCD' :LCD , 'buzzer' :buzzer , 'flame' :flame , 'rain' :rain , 'gas' :gas } JsonUpdataMsn = aliLink .Alink (updateMsn ) print (JsonUpdataMsn ) mqtt .push (POST ,JsonUpdataMsn ) # 定时向阿里云IOT推送我们构建好的Alink协议数据 time .sleep ( 3 )
rpi
# 树莓派数据与控制
import os
# Return CPU temperature as a character string
def getCPUtemperature():
res =os.popen('vcgencmd measure_temp').readline()
return(res.replace("temp=","").replace("'C\n",""))
# Return RAM information (unit=kb) in a list
# Index 0: total RAM
# Index 1: used RAM
# Index 2: free RAM
def getRAMinfo():
p =os.popen('free')
i =0
while 1:
i =i +1
line =p.readline()
if i==2:
return(line.split()[1:4])
# Return % of CPU used by user as a character string
def getCPUuse():
data = os.popen("top -n1 | awk '/Cpu\(s\):/ {print $2}'").readline().strip()
return(data)
# Return information about disk space as a list (unit included)
# Index 0: total disk space
# Index 1: used disk space
# Index 2: remaining disk space
# Index 3: percentage of disk used
def getDiskSpace():
p =os.popen("df -h /")
i =0
while True:
i =i +1
line =p.readline()
if i==2:
return(line.split()[1:5])
def powerLed(swatch):
led = open('/sys/class/leds/led1/brightness', 'w', 1)
led.write(str(swatch))
led.close()
# LED灯状态检测
def getLed():
led = open('/sys/class/leds/led1/brightness', 'r', 1)
state=led.read()
led.close()
return state
if __name__ == "__main__":
# CPU informatiom
CPU_temp =getCPUtemperature()
CPU_usage =getCPUuse()
print(CPU_usage)
# RAM information
# Output is in kb, here I convert it in Mb for readability
RAM_stats =getRAMinfo()
RAM_total = round(int(RAM_stats[0]) /1000,1)
RAM_used = round(int(RAM_stats[1]) /1000,1)
RAM_free = round(int(RAM_stats[2]) /1000,1)
print(RAM_total,RAM_used,RAM_free)
# Disk information
DISK_stats =getDiskSpace()
DISK_total = DISK_stats[0][:-1]
DISK_used = DISK_stats[1][:-1]
DISK_perc = DISK_stats[3][:-1]
print(DISK_total,DISK_used,DISK_perc)
继电器
import RPi.GPIO as GPIO
import time
import asyncio
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码
GPIO.setwarnings(False) # 忽略GPIO 警告
CH1 = 17
CH2 = 16 #继电器输入信号管脚
GPIO.setup(CH1,GPIO.OUT)
GPIO.setup(CH2,GPIO.OUT)
def open():
GPIO.output(CH1, GPIO.LOW)
time.sleep(10)
GPIO.output(CH1, GPIO.HIGH)
def close():
GPIO.output(CH2, GPIO.LOW)
time.sleep(10)
GPIO.output(CH2, GPIO.HIGH)
阿里云连接
import time,json,random
import hmac,hashlib
def linkiot(DeviceName,ProductKey,DeviceSecret,server = 'iot-as-mqtt.cn-shanghai.aliyuncs.com'):
serverUrl = server
ClientIdSuffix = "|securemode=3,signmethod=hmacsha256,timestamp="
# 拼合
Times = str(int(time.time())) # 获取登录时间戳
Server = ProductKey+'.'+serverUrl # 服务器地址
ClientId = DeviceName + ClientIdSuffix + Times +'|' # ClientId
userNmae = DeviceName + "&" + ProductKey
PasswdClear = "clientId" + DeviceName + "deviceName" + DeviceName +"productKey"+ProductKey + "timestamp" + Times # 明文密码
# 加密
h = hmac.new(bytes(DeviceSecret,encoding= 'UTF-8'),digestmod=hashlib.sha256) # 使用密钥
h.update(bytes(PasswdClear,encoding = 'UTF-8'))
Passwd = h.hexdigest()
return Server,ClientId,userNmae,Passwd
# 阿里Alink协议实现(字典传入,json str返回)
def Alink(params):
AlinkJson = {
}
AlinkJson["id"] = random.randint(0,999999)
AlinkJson["version"] = "1.0"
AlinkJson["params"] = params
AlinkJson["method"] = "thing.event.property.post"
return json.dumps(AlinkJson)
if __name__ == "__main__":
pass
蜂鸣器
import RPi.GPIO as GPIO
import time
BuzzerPin = 20 # 有源蜂鸣器管脚定义
# GPIO设置函数
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False) # 关闭GPIO警告提示
GPIO.setup(BuzzerPin, GPIO.OUT) # 设置有源蜂鸣器管脚为输出模式
GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器
# 打开蜂鸣器
def buzzer_on():
GPIO.output(BuzzerPin, GPIO.LOW) # 蜂鸣器为低电平触发,所以使能蜂鸣器让其发声
# 关闭蜂鸣器
def buzzer_off():
GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器
# 控制蜂鸣器鸣叫
def beep(x):
buzzer_on() # 打开蜂鸣器控制
time.sleep(x) # 延时时间
buzzer_off() # 关闭蜂鸣器控制
time.sleep(x) # 延时时间
# 循环函数
def loop():
while True:
beep(1) # 控制蜂鸣器鸣叫,延时时间为500mm
def destroy():
GPIO.output(BuzzerPin, GPIO.HIGH) # 关闭蜂鸣器鸣叫
GPIO.cleanup() # 释放资源
# 程序入口
if __name__ == '__main__':
try: # 检测异常
loop() # 调用循环函数
except KeyboardInterrupt: # 当按下Ctrl+C时,将执行destroy()子程序。
destroy() # 释放资源
火焰传感器
import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math
DO = 26 # 火焰传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码
# 初始化工作
def setup():
ADC.setup(0x48) # 设置PCF8591模块地址
GPIO.setup(DO, GPIO.IN) # 设置火焰传感器数字IO口为输入模式
# 打印信息,打印出火焰传感器的状态值
def Print(x):
if x == 1: # 安全
print ('')
print (' *******************')
print (' * Makerobo Safe~ *')
print (' *******************')
print ('')
if x == 0: # 有火焰
print ('')
print (' ******************')
print (' * Makerobo Fire! *')
print (' ******************')
print ('')
# 功能函数
def fire():
status = 1 # 状态值
# 读取火焰传感器数字IO口
return GPIO.input(DO)
# 程序入口
if __name__ == '__main__':
try:
setup() # 初始化
fire()
except KeyboardInterrupt:
pass
烟雾传感器
import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math
DO = 18 # 烟雾传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码
GPIO.setwarnings(False) # 忽略GPIO 警告
# 初始化工作
def setup():
ADC.setup(0x48) # 设置PCF8591模块地址
GPIO.setup (DO,GPIO.IN) # 烟雾传感器数字IO口,设置为输入模式
# 打印信息,打印出是否检测到烟雾信息
def Print(x):
if x == 1: # 安全
print ('')
print (' ******************')
print (' * Makerobo Safe~ *')
print (' ******************')
print ('')
if x == 0: # 检测到烟雾
print ('')
print (' ************************')
print (' * Makerobo Danger Gas! *')
print (' ************************')
print ('')
# 循环函数
def gas():
setup()
return GPIO.input(DO) # 读取GAS烟雾传感器数字IO口值
# 程序入口
if __name__ == '__main__':
setup() # 初始化函数
loop() # 循环函数
LCD1602
import time
import smbus
BUS = smbus.SMBus(1)
# IIC LCD1602 液晶模块写入字
def write_word(addr, data):
global BLEN
temp = data
if BLEN == 1:
temp |= 0x08
else:
temp &= 0xF7
BUS.write_byte(addr ,temp) # 设置IIC L