资讯详情

基于树莓派+传感器+阿里云IoT的智能家居管理(代码实现)

视频教程已经在B站 请三连我 虽然我没有稚晖君那么强壮,虽然我没有稚晖君那么强壮

教程!基于树莓派 传感器 阿里云IoT智能家居管理(2)

主文件

 #!/usr/bin/python3  import aliLink,mqttd,rpi import time,json import Adafruit_DHT import time import LCD1602 import flame_sensor import buzzer_1 import rain_detector import gas_sensor import relay from threading import Thread  pin = 19  # DHT11 温湿度传感器管脚定义 Buzzer = 20    # 定义有源蜂鸣器管脚  # GPIO口定义 sensor = Adafruit_DHT.DHT11     # 三元素(iot后台获取) ProductKey = 'a11lzCDSgZP' DeviceName = 'IU6aSETyiImFPSkpcywm' DeviceSecret = "2551eb5f630c372743c538e9b87bfe6d" # topic (iot后台获取) POST = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post'  # 向云报告消息 POST_REPLY = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post_reply' SET = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/service/property/set'  # 订阅云指令   #窗户开关 window = 0 window_status = 0 Thread(target=relay.close).start()  # 消息回调(云发消息回调函数) def on_message(client, userdata, msg):     #print(msg.payload)     Msg = json.loads(msg.payload)      global window,window_status     window = Msg['params']['window']     print(msg.payload)  # 开关值     if window_status != window:         window_status = window         if window 
       
        == 
        1
        : Thread
        (target
        =relay
        .
        open
        )
        .start
        (
        ) 
        else
        : Thread
        (target
        =relay
        .close
        )
        .start
        (
        ) 
        #连接回调(与阿里云建立链接后的回调函数) 
        def 
        on_connect
        (client
        , userdata
        , flags
        , rc
        )
        : 
        pass 
        # 链接信息 Server
        ,ClientId
        ,userNmae
        ,Password 
        = aliLink
        .linkiot
        (DeviceName
        ,ProductKey
        ,DeviceSecret
        ) 
        # mqtt链接 mqtt 
        = mqttd
        .MQTT
        (Server
        ,ClientId
        ,userNmae
        ,Password
        ) mqtt
        .subscribe
        (SET
        ) 
        # 订阅服务器下发消息topic mqtt
        .begin
        (on_message
        ,on_connect
        ) 
        # 信息获取上报,每2秒钟上报一次系统参数 
        while 
        True
        : 
        #获取指示灯状态 power_stats
        =
        int
        (rpi
        .getLed
        (
        )
        ) 
        if
        (power_stats 
        == 
        0
        )
        : power_LED 
        = 
        0 
        else
        : power_LED 
        = 
        1 
        # CPU 信息 CPU_temp 
        = 
        float
        (rpi
        .getCPUtemperature
        (
        )
        ) 
        # 温度 ℃ CPU_usage 
        = 
        float
        (rpi
        .getCPUuse
        (
        )
        ) 
        # 占用率 % 
        # RAM 信息 RAM_stats 
        =rpi
        .getRAMinfo
        (
        ) RAM_total 
        =
        round
        (
        int
        (RAM_stats
        [
        0
        ]
        ) 
        /
        1000
        ,
        1
        ) 
        # RAM_used 
        =
        round
        (
        int
        (RAM_stats
        [
        1
        ]
        ) 
        /
        1000
        ,
        1
        ) RAM_free 
        =
        round
        (
        int
        (RAM_stats
        [
        2
        ]
        ) 
        /
        1000
        ,
        1
        ) 
        # Disk 信息 DISK_stats 
        =rpi
        .getDiskSpace
        (
        ) DISK_total 
        = 
        float
        (DISK_stats
        [
        0
        ]
        [
        :
        -
        1
        ]
        ) DISK_used 
        = 
        float
        (DISK_stats
        [
        1
        ]
        [
        :
        -
        1
        ]
        ) DISK_perc 
        = 
        float
        (DISK_stats
        [
        3
        ]
        [
        :
        -
        1
        ]
        ) 
        #温度,湿度 humidity
        , temperature 
        = Adafruit_DHT
        .read_retry
        (sensor
        , pin
        ) 
        # LCD显示 LCD 
        = 
        0 
        try
        : LCD1602
        .init
        (
        0x27
        , 
        1
        ) 
        # 初始化显示屏 LCD1602
        .write
        (
        0
        , 
        0
        , 
        'humidity: ' 
        + 
        str
        (
        int
        (humidity
        )
        ) 
        + 
        '%'
        ) LCD1602
        .write
        (
        0
        , 
        1
        , 
        'temperature: ' 
        + 
        str
        (
        int
        (temperature
        )
        ) 
        + 
        '\''
        ) 
        # 在第二行显示world! LCD 
        = 
        1 
        except
        : 
        print
        (
        "显示屏连接不稳定,请检查"
        ) LCD 
        = 
        0 
        # 蜂鸣器 buzzer 
        = 
        0 
        # 火焰传感器 flame_sensor
        .setup
        (
        ) 
        if flame_sensor
        .fire
        (
        ) 
        == 
        0
        : flame 
        = 
        1 buzzer_1
        .buzzer_on
        (
        ) 
        #让铃声叫 buzzer 
        = 
        1 
        else
        : flame 
        = 
        0 buzzer_1
        .buzzer_off
        (
        ) 
        #铃声不叫 buzzer 
        = 
        0 
        # 烟雾传感器 gas 
        = gas_sensor
        .gas
        (
        ) 
        # 雨滴传感器 rain_detector
        .setup
        (
        ) 
        if rain_detector
        .rain
        (
        ) 
        == 
        0
        : rain 
        = 
        1 
        else
        : rain 
        = 
        0 
        # 构建与云端模型一致的消息结构 updateMsn 
        = 
        { 
          
        'cpu_temperature'
        :CPU_temp
        , 
        'cpu_usage'
        :CPU_usage
        , 
        'RAM_total'
        :RAM_total
        , 
        'RAM_used'
        :RAM_used
        , 
        'RAM_free'
        :RAM_free
        , 
        'DISK_total'
        :DISK_total
        , 
        'DISK_used_space'
        :DISK_used
        , 
        'DISK_used_percentage'
        :DISK_perc
        , 
        'PowerLed'
        :power_LED
        , 
        'temperature'
        :temperature
        , 
        'humidity'
        :humidity
        , 
        'window'
        :window
        , 
        'LCD'
        :LCD
        , 
        'buzzer'
        :buzzer
        , 
        'flame'
        :flame
        , 
        'rain'
        :rain
        , 
        'gas'
        :gas 
        } JsonUpdataMsn 
        = aliLink
        .Alink
        (updateMsn
        ) 
        print
        (JsonUpdataMsn
        ) mqtt
        .push
        (POST
        ,JsonUpdataMsn
        ) 
        # 定时向阿里云IOT推送我们构建好的Alink协议数据 time
        .sleep
        (
        3
        ) 
       

rpi

# 树莓派数据与控制

import os
# Return CPU temperature as a character string 

def getCPUtemperature():
    res =os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))
 
# Return RAM information (unit=kb) in a list 
# Index 0: total RAM 
# Index 1: used RAM 
# Index 2: free RAM 
def getRAMinfo():
    p =os.popen('free')
    i =0
    while 1:
        i =i +1
        line =p.readline()
        if i==2:
            return(line.split()[1:4])
 
# Return % of CPU used by user as a character string 
def getCPUuse():
    data = os.popen("top -n1 | awk '/Cpu\(s\):/ {print $2}'").readline().strip()
    return(data)
 
# Return information about disk space as a list (unit included) 
# Index 0: total disk space 
# Index 1: used disk space 
# Index 2: remaining disk space 
# Index 3: percentage of disk used 
def getDiskSpace():
    p =os.popen("df -h /")
    i =0
    while True:
        i =i +1
        line =p.readline()
        if i==2:
            return(line.split()[1:5])
def  powerLed(swatch):
    led = open('/sys/class/leds/led1/brightness', 'w', 1)
    led.write(str(swatch))
    led.close()

# LED灯状态检测
def getLed():
	led = open('/sys/class/leds/led1/brightness', 'r', 1)
	state=led.read()
	led.close()
	return state
    
if __name__ == "__main__":
 
    # CPU informatiom
    CPU_temp =getCPUtemperature()
    CPU_usage =getCPUuse()
    print(CPU_usage)
    # RAM information
    # Output is in kb, here I convert it in Mb for readability
    RAM_stats =getRAMinfo()

    RAM_total = round(int(RAM_stats[0]) /1000,1)
    RAM_used = round(int(RAM_stats[1]) /1000,1)
    RAM_free = round(int(RAM_stats[2]) /1000,1)
    print(RAM_total,RAM_used,RAM_free)
    # Disk information
    DISK_stats =getDiskSpace()

    DISK_total = DISK_stats[0][:-1]
    DISK_used = DISK_stats[1][:-1]
    DISK_perc = DISK_stats[3][:-1]
    print(DISK_total,DISK_used,DISK_perc)

继电器

import RPi.GPIO as GPIO
import time
import asyncio

GPIO.setmode(GPIO.BCM)              # 管脚映射,采用BCM编码
GPIO.setwarnings(False)             # 忽略GPIO 警告

CH1 = 17
CH2 = 16 #继电器输入信号管脚


GPIO.setup(CH1,GPIO.OUT)
GPIO.setup(CH2,GPIO.OUT)

def open():
    GPIO.output(CH1, GPIO.LOW)
    time.sleep(10)
    GPIO.output(CH1, GPIO.HIGH)

def close():
    GPIO.output(CH2, GPIO.LOW)
    time.sleep(10)
    GPIO.output(CH2, GPIO.HIGH)

阿里云连接

import time,json,random
import hmac,hashlib

def linkiot(DeviceName,ProductKey,DeviceSecret,server = 'iot-as-mqtt.cn-shanghai.aliyuncs.com'):
    serverUrl = server
    ClientIdSuffix = "|securemode=3,signmethod=hmacsha256,timestamp="

    # 拼合
    Times = str(int(time.time()))  # 获取登录时间戳
    Server = ProductKey+'.'+serverUrl    # 服务器地址
    ClientId = DeviceName + ClientIdSuffix + Times +'|'  # ClientId
    userNmae = DeviceName + "&" + ProductKey
    PasswdClear = "clientId" + DeviceName + "deviceName" + DeviceName +"productKey"+ProductKey + "timestamp" + Times  # 明文密码

    # 加密
    h = hmac.new(bytes(DeviceSecret,encoding= 'UTF-8'),digestmod=hashlib.sha256)  # 使用密钥
    h.update(bytes(PasswdClear,encoding = 'UTF-8'))
    Passwd = h.hexdigest()
    return Server,ClientId,userNmae,Passwd

# 阿里Alink协议实现(字典传入,json str返回)
def Alink(params):
    AlinkJson = { 
        }
    AlinkJson["id"] = random.randint(0,999999)
    AlinkJson["version"] = "1.0"
    AlinkJson["params"] = params
    AlinkJson["method"] = "thing.event.property.post"
    return json.dumps(AlinkJson)

if __name__ == "__main__":
    pass

蜂鸣器

import RPi.GPIO as GPIO
import time

BuzzerPin = 20    # 有源蜂鸣器管脚定义

# GPIO设置函数
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)                       # 关闭GPIO警告提示
GPIO.setup(BuzzerPin, GPIO.OUT)     # 设置有源蜂鸣器管脚为输出模式
GPIO.output(BuzzerPin, GPIO.HIGH)   # 蜂鸣器设置为高电平,关闭蜂鸟器

# 打开蜂鸣器
def buzzer_on():
	GPIO.output(BuzzerPin, GPIO.LOW)  # 蜂鸣器为低电平触发,所以使能蜂鸣器让其发声
# 关闭蜂鸣器
def buzzer_off():
	GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器

# 控制蜂鸣器鸣叫
def beep(x):
	buzzer_on()     # 打开蜂鸣器控制
	time.sleep(x)            # 延时时间
	buzzer_off()    # 关闭蜂鸣器控制
	time.sleep(x)            # 延时时间

# 循环函数
def loop():
	while True:
		beep(1) # 控制蜂鸣器鸣叫,延时时间为500mm

def destroy():
	GPIO.output(BuzzerPin, GPIO.HIGH) # 关闭蜂鸣器鸣叫
	GPIO.cleanup()                     # 释放资源

# 程序入口
if __name__ == '__main__':
	try:                            # 检测异常
		loop()                      # 调用循环函数
	except KeyboardInterrupt:  # 当按下Ctrl+C时,将执行destroy()子程序。
		destroy()              # 释放资源

火焰传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 26  # 火焰传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码

# 初始化工作
def setup():
	ADC.setup(0x48)    # 设置PCF8591模块地址
	GPIO.setup(DO, GPIO.IN) # 设置火焰传感器数字IO口为输入模式

# 打印信息,打印出火焰传感器的状态值
def Print(x):
	if x == 1:      # 安全
		print ('')
		print (' *******************')
		print (' * Makerobo Safe~ *')
		print (' *******************')
		print ('')
	if x == 0:     # 有火焰
		print ('')
		print (' ******************')
		print (' * Makerobo Fire! *')
		print (' ******************')
		print ('')

# 功能函数
def fire():
    status = 1      # 状态值
    # 读取火焰传感器数字IO口
    return GPIO.input(DO)

# 程序入口
if __name__ == '__main__':
	try:
		setup() # 初始化
		fire()
	except KeyboardInterrupt:
		pass	

烟雾传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import math

DO = 18                    # 烟雾传感器数字IO口
GPIO.setmode(GPIO.BCM)              # 管脚映射,采用BCM编码
GPIO.setwarnings(False)             # 忽略GPIO 警告

# 初始化工作
def setup():
	ADC.setup(0x48)                      # 设置PCF8591模块地址
	GPIO.setup	(DO,GPIO.IN)    # 烟雾传感器数字IO口,设置为输入模式

# 打印信息,打印出是否检测到烟雾信息
def Print(x):
	if x == 1:     # 安全
		print ('')
		print (' ******************')
		print (' * Makerobo Safe~ *')
		print (' ******************')
		print ('')
	if x == 0:    # 检测到烟雾
		print ('')
		print (' ************************')
		print (' * Makerobo Danger Gas! *')
		print (' ************************')
		print ('')

# 循环函数
def gas():
    setup()
    return GPIO.input(DO)  # 读取GAS烟雾传感器数字IO口值



# 程序入口
if __name__ == '__main__':
	setup()   # 初始化函数
	loop()    # 循环函数

LCD1602

import time
import smbus

BUS = smbus.SMBus(1)

# IIC LCD1602 液晶模块写入字
def write_word(addr, data):
	global BLEN
	temp = data
	if BLEN == 1:
		temp |= 0x08
	else:
		temp &= 0xF7
	BUS.write_byte(addr ,temp) # 设置IIC L

标签: pw传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台