资讯详情

MQTT简介 利用mosquitto函数实现本机DS18B20发布订阅温度 linux c编程

文章目录

    • 1.MQTT简介
    • 2.MQTT安装与搭建
    • 3.mosquitto常用库函数
    • 4.基于DS18B20订阅并发布本机温度实例

1.MQTT简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"该协议构建在通信协议中TCP/IP协议上,由IBM在1999年发布。MQTT最大的优点是为连接远程设备提供实时可靠的信息服务,代码少,带宽有限。它作为一种低成本、低带宽占用的即时通信协议,广泛应用于物联网、小型设备和移动应用。 MQTT机器对机器(M2M)/物联网(IoT)连接协议。它被设计成一个极其轻量级的发布/订阅信息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使协议成为新兴的机器到机器(M2M)或物联网(IoT)世界连接设备,以及带宽和电池功率高的移动应用的理想选择。例如,它已被用于传感器通过卫星链理通信的传感器、与医疗服务提供商的拨号连接、一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,信息可以有效地分配给一个或多个接收器。MQTT实现模型如下。 在这里插入图片描述 ?MQTT协议设计为远程传感器和控制设备通信,具有以下主要特点:

(1)利用发布/订阅消息模式提供一对多的消息发布,解除应用程序耦合。

这与此非常相似XMPP,但是MQTT信息冗余远小于XMPP,,因为XMPP使用XML传递数据的格式文本。

(2)屏蔽负载内容的消息传输。

(3)使用TCP/IP提供网络连接。

主流的MQTT是基于TCP数据推送连接,但也是基于UDP版本叫MQTT-SN。由于这两个版本的连接方式不同,优缺点自然也不同。

(4)发布服务质量有三种新闻:

最多一次的消息发布完全依赖于底层TCP/IP网络。消息会丢失或重复。这个级别可以用于以下情况。环境传感器数据丢失一次读取记录并不重要,因为很快就会有第二次发送。这种方法主要是普通的APP如果您的智能设备在新闻推送过程中没有联网,则推送过去没有收到,则无法再次联网。

至少一次确保信息到达,但可能会重复。

只有一次,以确保信息达到一次。这种级别可用于一些严格的计费系统。在计费系统中,重复或丢失消息会导致错误的结果。这种最高质量的新闻发布服务也可用于即时通讯APP推送,确保用户只收到一次。

(5)小传输,费用小(固定长度的头部为2字节),协议交换最小化,减少网络流量。

这就是为什么在介绍中说它非常适合在物联网领域,传感器和服务器的通信和信息收集。我们应该知道,嵌入式设备的计算能力和带宽相对较弱,这是非常适合使用该协议传输信息的。

(6)使用Last Will和Testament特征通知各方客户端异常中断机制。

Last Will:即遗言机制,用于通知同一主题下其他设备发送遗言的设备已断开连接。

Testament:遗嘱机制与功能相似Last Will。 ?实现MQTT在通信过程中,需要完成客户端和服务器端的通信,MQTT协议中有三种身份:出版商:(Publish)、代理(Broker)(服务器),订阅者(Subscribe)。其中,新闻发布者和订阅者都是客户端,新闻代理是服务器,新闻发布者可以同时是订阅者。

MQTT传输的信息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为新闻类型,订阅者订阅(Subscribe)之后,您将收到主题的消息内容(payload);

(2)payload,可以理解为新闻的内容是指订阅者想要使用的内容 ?MQTT底层网络传输将建立:它将建立客户端与服务器的连接,并提供基于字节流的有序、无损的双向传输。

通过应用数据MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。 ?一个使用MQTT该协议的应用程序或设备总是建立在服务器的网络连接上。客户端可以:

(1)发布其他客户可能订阅的信息;

(2)订阅其他客户发布的消息;

(3)退订或删除应用程序消息;

(4)与服务器断开连接。 ?MQTT服务器被称为消息代理(Broker),它可以是应用程序或设备。它位于新闻发布者和订阅者之间,可以:

(1)接受来自客户的网络连接;

(2)接收客户发布的应用信息;

(3)处理客户端的订阅和退订请求;

(4)将应用程序消息转发给订阅客户 一、订阅(Subscription)

订阅包括主题筛选器(Topic Filter)以及最大的服务质量(QoS)。订阅会和会话(Session)关联。一个会话可以包含多个订阅。每个会话中的每个订阅都有不同的主题筛选器。

二、会话(Session)

每个客户端与服务器建立连接后,都是客户端与服务器之间的状态交互。会话存在于网络之间,也可能跨越客户端与服务器之间的多个连续网络连接。

三、主题名(Topic Name)

与服务器订阅相匹配的标签连接到应用程序消息的标签上。服务器将消息发送给订阅匹配标签的每个客户端。

四、主题筛选器(Topic Filter)

在订阅表达式中使用主题名通配符筛选器,表示订阅匹配的多个主题。

五、负载(Payload)

消息订阅者收到的具体内容 ?MQTT该协议定义了一些方法(也称为行动)来表示确定资源的操作。该资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。一般来说,资源是指服务器上的文件或输出。主要方法包括:

(1)Connect。等待与服务器连接。

(2)Disconnect。等待MQTT客户端完成工作,与服务器断开TCP/IP会话。

(3)Subscribe。等待完成订阅。

(4)UnSubscribe。一个或多个等待服务器取消客户端topics订阅。

(5)Publish。MQTT客户端发送消息请求,发送后返回应用程序线程。 ?在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)构成三部分。MQTT数据包结构如下:

(1)固定头(Fixed header)。存在于所有MQTT在数据包中,表示数据包的类型和分组标识。

(2)可变头(Variable header)。存在于部分MQTT在数据包中,数据包的类型决定了可变头是否存在及其具体内容。

(3)消息体(Payload)。存在于部分MQTT在数据包中,表示客户端收到的具体内容。 参考博客: https://blog.csdn.net/qq_28877125/article/details/78325003

2.MQTT安装与搭建

安装服务器端

sudo apt-get install mosquitto 

安装完成后,将建立服务器,系统将自动运行mosquitto,默认端口为1883。 2.安装客户端 前面的服务器端已经建成,但是客户端还没有安装。这一步是可选的,如果需要在终端上进行测试MQTT订阅/发布的通信需要执行此步骤,这里我们也安装了这些测试。

sudo apt 
       
        install mosquitto-clients 
       

3、查看运行状态

sudo systemctl status mosquitto

4、重启服务器程序 查看运行进程号:ps -aux | grep mosquitto 执行命令杀死进程:kill -9 进程号 启动:mosquitto -v -v 详细模式——启用所有日志记录类型。 关于启动参数:可以通过 --help 查看 5,测试(默认配置) 使用securecrt首先打开三个终端, 1、启动代理服务:mosquitto -v -v 详细模式 打印调试信息 2、订阅主题:mosquitto_sub -v -t hello -t 指定订阅的主题,主题为:hello -v 详细模式 打印调试信息 3、发布内容:mosquitto_pub -t hello -m world -t 指定订阅的主题,主题为:hello -m 指定发布的消息的内容 当发布者推送消息之后,订阅者获得其订阅的主题的内容,而代理服务器控制台中会出现——连接、消息发布和心跳等调试信息。通过代理服务器的调试输出可以对MQTT协议的相关过程有所了解。 1.安装mosquitto所需要依赖

sudo apt-get install libssl-dev
sudo apt-get install uuid-dev
sudo apt-get install cmake

2.下载源码包

wget http://mosquitto.org/files/source/mosquitto-2.0.14.tar.gz

下载地址:https://mosquitto.org/download/ 3、解压源码

tar -zxvf mosquitto-2.0.14.tar.gz

4.进入源码目录:

cd mosquitto-2.0.14/

5.编译与安装源码

make
sudo make install

6.可能遇到的问题: 【1】编译找不到openssl/ssl.h 【解决方法】——安装openssl sudo apt-get install libssl-dev 【2】编译过程g++命令未找到: sudo apt-get install g++ 【3】编译过程找不到ares.h sudo apt-get install libc-ares-dev 【4】编译过程找不到uuid/uuid.h sudo apt-get install uuid-dev 【5】使用过程中找不到libmosquitto.so.1 error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory 【解决方法】——修改libmosquitto.so位置 创建链接 sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1 更新动态链接库 sudo ldconfig 参考博客: https://blog.csdn.net/qq_33406883/article/details/107429946 https://blog.csdn.net/xukai871105/article/details/39252653 https://blog.csdn.net/lu_embedded/article/details/76305105

3.mosquitto常用库函数

1.mosquitto_lib_init

int mosquitto_lib_init(void)
  • 功能:使用mosquitto库函数前,要先初始化,使用之后就要清除。清除函数;int mosquitto_lib_cleanup();
  • 返回值:MOSQ_ERR_SUCCESS 总是
  1. mosquitto_new
struct mosquitto *mosquitto_new( const char * id, bool clean_session, void * obj )
  • 功能:创建一个新的mosquitto客户端实例,新建客户端 参数:
①id :用作客户端ID的字符串。如果为NULL,将生成一个随机客户端ID。如果id为NULL,clean_session必须为true。
②clean_session:设置为true以指示代理在断开连接时清除所有消息和订阅,设置为false以指示其保留它们,客户端将永远不会在断开连接时丢弃自己的传出消息。调用mosquitto_connect或mosquitto_reconnect将导致重新发送消息。使mosquitto_reinitialise将客户端重置为其原始状态。如果id参数为NULL,则必须将其设置为true。简言之:就是断开后是否保留订阅信息true/false
③obj: 用户指针,将作为参数传递给指定的任何回调,(回调参数)
  • 返回值:成功时返回结构mosquitto的指针,失败时返回NULL,询问errno以确定失败的原因:
 ENOMEM  内存不足。
 EINVAL  输入参数无效。
  1. mosquitto_connect
int mosquitto_connect( struct mosquitto * mosq, const char * host, int port, int keepalive )
  • 功能: 连接到MQTT代理/服务器(主题订阅要在连接服务器之后进行)
  • 参数:
   ①mosq : 有效的mosquitto实例,mosquitto_new()返回的mosq.
   ②host : 服务器ip地址
   ③port:服务器的端口号
   ④keepalive:保持连接的时间间隔, 单位秒。如果在这段时间内没有其他消息交换,则代理应该将PING消息发送到客户端的秒数。
  • 返回:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_ERRNO 如果系统调用返回错误。变量errno包含错误代码
  1. mosquitto_disconnect
int mosquitto_disconnect( struct mosquitto * mosq )
  • 功能:断开与代理/服务器的连接。
  • 返回:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
  1. mosquitto_publish
int mosquitto_publish( struct mosquitto * mosq, int * mid, const char * topic, int payloadlen, const void * payload, int qos, bool retain )
  • 功能:主题发布的函数
  • 参数:
①mosq:有效的mosquitto实例,客户端
②mid:指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与发布回调一起使用,以确定何时发送消息。请注意,尽管MQTT协议不对QoS = 0的消息使用消息ID,但libmosquitto为其分配了消息ID,以便可以使用此参数对其进行跟踪。
③topic:要发布的主题,以null结尾的字符串
④payloadlen:有效负载的大小(字节),有效值在0到268,435,455之间;主题消息的内容长度
⑤payload: 主题消息的内容,指向要发送的数据的指针,如果payloadlen >0,则它必须时有效的存储位置。
⑥qos:整数值0、1、2指示要用于消息的服务质量。
⑦retain:设置为true以保留消息。
  • 返回:
 MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NOMEM 如果发生内存不足的情况
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
MOSQ_ERR_PROTOCOL 与代理进行通信时是否存在协议错误。
MOSQ_ERR_PAYLOAD_SIZE 如果payloadlen太大。
MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
MOSQ_ERR_QOS_NOT_SUPPORTED 如果QoS大于代理支持的QoS。
MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。
  1. mosquitto_subscribe
int mosquitto_subscribe( struct mosquitto * mosq, int * mid, const char * sub, int qos )
  • 功能:订阅主题函数
  • 参数:
 ①mosq:有效的mosquitto实例,客户端
②mid: 指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与订阅回调一起使用,以确定何时发送消息。;主题的消息ID
③sub: 主题名称,订阅模式。
④qos : 此订阅请求的服务质量。
  • 返回值:
  MOSQ_ERR_SUCCESS 成功。 
  MOSQ_ERR_INVAL 如果输入参数无效。
  MOSQ_ERR_NOMEM 如果发生内存不足的情况。
  MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
  MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
  MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。

参考博客: https://blog.csdn.net/weixin_53361650/article/details/116954595 https://mosquitto.org/api/files/mosquitto-h.html

4.基于DS18B20在本机实现温度的订阅和发布实例

/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_sub.c * Description: This file * * Version: 1.0.0(24/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "24/01/22 07:12:01" * ********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"

#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512

static int running =1;

void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{ 
        
    printf("Call the function: on_connect\n");
    if(rc)
    { 
        
        printf("on_connect error!\n");
        exit(1);
    }
    else
    { 
        
        if(mosquitto_subscribe(mosq, NULL, "topic", 2))
        { 
        
            printf("Set the topic error!\n");
            exit(1);
        }
    }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{ 
        
    printf("Call the function: my_disconnect_callback\n");
    running = 0;
}

void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{ 
        
    printf("Call the function: on_subscribe\n");
}

void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{ 
        
    printf("Call the function: on_message\n");
    printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload);
    if(0 == strcmp(msg->payload, "quit"))
    { 
        
        mosquitto_disconnect(mosq);
    }
}

int main()
{ 
        
    int ret;
    struct mosquitto *mosq;

    ret = mosquitto_lib_init();
    if(ret)
    { 
        
        printf("Init lib error!\n");
        return -1;
    }

    mosq =  mosquitto_new("sub_test", true, NULL);
    if(mosq == NULL)
    { 
        
        printf("New sub_test error!\n");
        mosquitto_lib_cleanup();
        return -1;
    }
    printf("creat a sub_er success!\n");

    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    
    ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
    if(ret)
    { 
        
                printf("Connect server error!\n");
                mosquitto_destroy(mosq);
                mosquitto_lib_cleanup();
                return -1;
    }
    printf("Start!\n");
    while(running)
    { 
        
        mosquitto_loop(mosq, -1, 1);
    }
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    printf("End!\n");
}

/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_temp.c * Description: This file * * Version: 1.0.0(23/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "23/01/22 07:05:45" * ********************************************************************************/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <netdb.h>
#include <stdlib.h>
#include <fcntl.h>
#include <libgen.h>
#include <netinet/in.h>
#include <mosquitto.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <dirent.h>
#include <mosquitto.h>

#define BUF_SIZE 1024
#define ALIVE_Time 60
#define hostname "localhost"

int running=1;

void printf_usage(char *program);
int ds18b20_get_temper(float * temp);
void sig_handler(int SIG_NUM);
int get_time(char *tim);

int main(int argc,char **argv)
{ 
        
    int     daemon_run=0;
    int     port;
    int     opt;
    char    *topic = NULL;
    char    tim[32];
    float temper;
    char buf[512];
    char tem[32];
    char *user="yanpan";
    struct mosquitto    *mosq = NULL;
    int mid;

    char *program = basename(argv[0]);
    
    struct option long_options[] =
    { 
        
        { 
        "port",required_argument, NULL, 'p'},
        { 
        "help",no_argument, NULL,'h'},
        { 
        NULL, 0, NULL, 0}
    };
    while ((opt = getopt_long(argc, argv, "p:h", long_options, NULL)) != -1)
    { 
        
         switch (opt)
        { 
        
             case 'p':
                port = atoi(optarg);
                break;
             case 'h':
                printf_usage(argv[0]);
                break;
            default:
                break;
        }
    }
    if(!port)
    { 
        
        printf_usage(program) ;
        return 0 ;
    }

    signal(SIGUSR1, sig_handler);
    while(running)
    { 
        
        if(ds18b20_get_temper(&temper)<0)
        { 
        
            printf("ds18b20_get_temper() failed\n") ;
            return -1;
        }
        sprintf(tem,"\ttemperature:%5.3f",temper);
        get_time(tim);
        memset(buf,0,sizeof(buf));
        snprintf(buf,sizeof(buf),"%s%s%s",user,tim,tem);
        printf("%s\n",buf);


        mosquitto_lib_init();
        mosq = mosquitto_new("pub_test", true, NULL) ;
        if(mosq == NULL)
        { 
        
            printf("New sub_test error!\n");
            mosquitto_lib_cleanup();
            return -1;
        }
        printf("Create mosquitto sucessfully!\n");
        
        if(mosquitto_connect(mosq,hostname,port,ALIVE_Time)!= MOSQ_ERR_SUCCESS)
        { 
        
            printf("Mosq_Connect() failed: %s\n", strerror(errno) );
            mosquitto_destroy(mosq);
            mosquitto_lib_cleanup();
            return -2;
        }
        printf("Connect %s:%d Sucessfully!\n", hostname, port);
        int loop = mosquitto_loop_start(mosq);
        if(loop != MOSQ_ERR_SUCCESS)
        { 
        
            printf("mosquitto loop error\n");
            mosquitto_destroy(mosq) ;
            mosquitto_lib_cleanup() ;
            return -3;
        }   
        if( mosquitto_publish(mosq,&mid,"topic",strlen(buf),buf,0,0) != MOSQ_ERR_SUCCESS ) 
        { 
        
            printf("Mosq_Publish() error: %s\n", strerror(errno));
            mosquitto_destroy(mosq) ;
            mosquitto_lib_cleanup() ;
            return -4;
        }
        else
             printf("Publish information of temperature Ok!\n") ;
        sleep(5);
    }

}

void printf_usage(char *program)
{ 
        
    printf("使用方法:%s【选项】 \n", program);
    printf("\n传入参数\n");
    printf(" -p[port ] 指定连接的端口号\n");
    printf(" -h[help ] 打印帮助信息\n");
    printf("\n例如: %s -b -p 8900\n", program);
    return;
}

int ds18b20_get_temper(float * temp)
{ 
        
    char w1_path[128]="/sys/bus/w1/devices/";
    char f_name[64];
    char buff[128];
    char *data_p=NULL;
    struct dirent   *file=NULL;
    DIR *dir=NULL;
    int data_fd;
    int found = -1;
    if((dir=opendir(w1_path))<0)
    { 
        
        printf("open w1_path failure:%s\n",strerror(errno));
        return -1; 
    }
    while((file=readdir(dir)

标签: sub连接器78pds流量传感器流量传感器p11231sn

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台