文章目录
-
- 1.MQTT简介
- 2.MQTT安装与搭建
- 3.mosquitto常用库函数
- 4.基于DS18B20订阅并发布本机温度实例
1.MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"该协议构建在通信协议中TCP/IP协议上,由IBM在1999年发布。MQTT最大的优点是为连接远程设备提供实时可靠的信息服务,代码少,带宽有限。它作为一种低成本、低带宽占用的即时通信协议,广泛应用于物联网、小型设备和移动应用。 MQTT机器对机器(M2M)/物联网(IoT)连接协议。它被设计成一个极其轻量级的发布/订阅信息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使协议成为新兴的机器到机器(M2M)或物联网(IoT)世界连接设备,以及带宽和电池功率高的移动应用的理想选择。例如,它已被用于传感器通过卫星链理通信的传感器、与医疗服务提供商的拨号连接、一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,信息可以有效地分配给一个或多个接收器。MQTT实现模型如下。 ?MQTT协议设计为远程传感器和控制设备通信,具有以下主要特点:
(1)利用发布/订阅消息模式提供一对多的消息发布,解除应用程序耦合。
这与此非常相似XMPP,但是MQTT信息冗余远小于XMPP,,因为XMPP使用XML传递数据的格式文本。
(2)屏蔽负载内容的消息传输。
(3)使用TCP/IP提供网络连接。
主流的MQTT是基于TCP数据推送连接,但也是基于UDP版本叫MQTT-SN。由于这两个版本的连接方式不同,优缺点自然也不同。
(4)发布服务质量有三种新闻:
最多一次的消息发布完全依赖于底层TCP/IP网络。消息会丢失或重复。这个级别可以用于以下情况。环境传感器数据丢失一次读取记录并不重要,因为很快就会有第二次发送。这种方法主要是普通的APP如果您的智能设备在新闻推送过程中没有联网,则推送过去没有收到,则无法再次联网。
至少一次确保信息到达,但可能会重复。
只有一次,以确保信息达到一次。这种级别可用于一些严格的计费系统。在计费系统中,重复或丢失消息会导致错误的结果。这种最高质量的新闻发布服务也可用于即时通讯APP推送,确保用户只收到一次。
(5)小传输,费用小(固定长度的头部为2字节),协议交换最小化,减少网络流量。
这就是为什么在介绍中说它非常适合在物联网领域,传感器和服务器的通信和信息收集。我们应该知道,嵌入式设备的计算能力和带宽相对较弱,这是非常适合使用该协议传输信息的。
(6)使用Last Will和Testament特征通知各方客户端异常中断机制。
Last Will:即遗言机制,用于通知同一主题下其他设备发送遗言的设备已断开连接。
Testament:遗嘱机制与功能相似Last Will。 ?实现MQTT在通信过程中,需要完成客户端和服务器端的通信,MQTT协议中有三种身份:出版商:(Publish)、代理(Broker)(服务器),订阅者(Subscribe)。其中,新闻发布者和订阅者都是客户端,新闻代理是服务器,新闻发布者可以同时是订阅者。
MQTT传输的信息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为新闻类型,订阅者订阅(Subscribe)之后,您将收到主题的消息内容(payload);
(2)payload,可以理解为新闻的内容是指订阅者想要使用的内容 ?MQTT底层网络传输将建立:它将建立客户端与服务器的连接,并提供基于字节流的有序、无损的双向传输。
通过应用数据MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。 ?一个使用MQTT该协议的应用程序或设备总是建立在服务器的网络连接上。客户端可以:
(1)发布其他客户可能订阅的信息;
(2)订阅其他客户发布的消息;
(3)退订或删除应用程序消息;
(4)与服务器断开连接。 ?MQTT服务器被称为消息代理(Broker),它可以是应用程序或设备。它位于新闻发布者和订阅者之间,可以:
(1)接受来自客户的网络连接;
(2)接收客户发布的应用信息;
(3)处理客户端的订阅和退订请求;
(4)将应用程序消息转发给订阅客户 一、订阅(Subscription)
订阅包括主题筛选器(Topic Filter)以及最大的服务质量(QoS)。订阅会和会话(Session)关联。一个会话可以包含多个订阅。每个会话中的每个订阅都有不同的主题筛选器。
二、会话(Session)
每个客户端与服务器建立连接后,都是客户端与服务器之间的状态交互。会话存在于网络之间,也可能跨越客户端与服务器之间的多个连续网络连接。
三、主题名(Topic Name)
与服务器订阅相匹配的标签连接到应用程序消息的标签上。服务器将消息发送给订阅匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
在订阅表达式中使用主题名通配符筛选器,表示订阅匹配的多个主题。
五、负载(Payload)
消息订阅者收到的具体内容 ?MQTT该协议定义了一些方法(也称为行动)来表示确定资源的操作。该资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。一般来说,资源是指服务器上的文件或输出。主要方法包括:
(1)Connect。等待与服务器连接。
(2)Disconnect。等待MQTT客户端完成工作,与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。一个或多个等待服务器取消客户端topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送后返回应用程序线程。 ?在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)构成三部分。MQTT数据包结构如下:
(1)固定头(Fixed header)。存在于所有MQTT在数据包中,表示数据包的类型和分组标识。
(2)可变头(Variable header)。存在于部分MQTT在数据包中,数据包的类型决定了可变头是否存在及其具体内容。
(3)消息体(Payload)。存在于部分MQTT在数据包中,表示客户端收到的具体内容。 参考博客: https://blog.csdn.net/qq_28877125/article/details/78325003
2.MQTT安装与搭建
安装服务器端
sudo apt-get install mosquitto
安装完成后,将建立服务器,系统将自动运行mosquitto,默认端口为1883。 2.安装客户端 前面的服务器端已经建成,但是客户端还没有安装。这一步是可选的,如果需要在终端上进行测试MQTT订阅/发布的通信需要执行此步骤,这里我们也安装了这些测试。
sudo apt
install mosquitto-clients
3、查看运行状态
sudo systemctl status mosquitto
4、重启服务器程序 查看运行进程号:ps -aux | grep mosquitto 执行命令杀死进程:kill -9 进程号 启动:mosquitto -v -v 详细模式——启用所有日志记录类型。 关于启动参数:可以通过 --help 查看 5,测试(默认配置) 使用securecrt首先打开三个终端, 1、启动代理服务:mosquitto -v -v 详细模式 打印调试信息 2、订阅主题:mosquitto_sub -v -t hello -t 指定订阅的主题,主题为:hello -v 详细模式 打印调试信息 3、发布内容:mosquitto_pub -t hello -m world -t 指定订阅的主题,主题为:hello -m 指定发布的消息的内容 当发布者推送消息之后,订阅者获得其订阅的主题的内容,而代理服务器控制台中会出现——连接、消息发布和心跳等调试信息。通过代理服务器的调试输出可以对MQTT协议的相关过程有所了解。 1.安装mosquitto所需要依赖
sudo apt-get install libssl-dev
sudo apt-get install uuid-dev
sudo apt-get install cmake
2.下载源码包
wget http://mosquitto.org/files/source/mosquitto-2.0.14.tar.gz
下载地址:https://mosquitto.org/download/ 3、解压源码
tar -zxvf mosquitto-2.0.14.tar.gz
4.进入源码目录:
cd mosquitto-2.0.14/
5.编译与安装源码
make
sudo make install
6.可能遇到的问题: 【1】编译找不到openssl/ssl.h 【解决方法】——安装openssl sudo apt-get install libssl-dev 【2】编译过程g++命令未找到: sudo apt-get install g++ 【3】编译过程找不到ares.h sudo apt-get install libc-ares-dev 【4】编译过程找不到uuid/uuid.h sudo apt-get install uuid-dev 【5】使用过程中找不到libmosquitto.so.1 error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory 【解决方法】——修改libmosquitto.so位置 创建链接 sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1 更新动态链接库 sudo ldconfig 参考博客: https://blog.csdn.net/qq_33406883/article/details/107429946 https://blog.csdn.net/xukai871105/article/details/39252653 https://blog.csdn.net/lu_embedded/article/details/76305105
3.mosquitto常用库函数
1.mosquitto_lib_init
int mosquitto_lib_init(void)
- 功能:使用mosquitto库函数前,要先初始化,使用之后就要清除。清除函数;int mosquitto_lib_cleanup();
- 返回值:MOSQ_ERR_SUCCESS 总是
- mosquitto_new
struct mosquitto *mosquitto_new( const char * id, bool clean_session, void * obj )
- 功能:创建一个新的mosquitto客户端实例,新建客户端 参数:
①id :用作客户端ID的字符串。如果为NULL,将生成一个随机客户端ID。如果id为NULL,clean_session必须为true。
②clean_session:设置为true以指示代理在断开连接时清除所有消息和订阅,设置为false以指示其保留它们,客户端将永远不会在断开连接时丢弃自己的传出消息。调用mosquitto_connect或mosquitto_reconnect将导致重新发送消息。使mosquitto_reinitialise将客户端重置为其原始状态。如果id参数为NULL,则必须将其设置为true。简言之:就是断开后是否保留订阅信息true/false
③obj: 用户指针,将作为参数传递给指定的任何回调,(回调参数)
- 返回值:成功时返回结构mosquitto的指针,失败时返回NULL,询问errno以确定失败的原因:
ENOMEM 内存不足。
EINVAL 输入参数无效。
- mosquitto_connect
int mosquitto_connect( struct mosquitto * mosq, const char * host, int port, int keepalive )
- 功能: 连接到MQTT代理/服务器(主题订阅要在连接服务器之后进行)
- 参数:
①mosq : 有效的mosquitto实例,mosquitto_new()返回的mosq.
②host : 服务器ip地址
③port:服务器的端口号
④keepalive:保持连接的时间间隔, 单位秒。如果在这段时间内没有其他消息交换,则代理应该将PING消息发送到客户端的秒数。
- 返回:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_ERRNO 如果系统调用返回错误。变量errno包含错误代码
- mosquitto_disconnect
int mosquitto_disconnect( struct mosquitto * mosq )
- 功能:断开与代理/服务器的连接。
- 返回:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
- mosquitto_publish
int mosquitto_publish( struct mosquitto * mosq, int * mid, const char * topic, int payloadlen, const void * payload, int qos, bool retain )
- 功能:主题发布的函数
- 参数:
①mosq:有效的mosquitto实例,客户端
②mid:指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与发布回调一起使用,以确定何时发送消息。请注意,尽管MQTT协议不对QoS = 0的消息使用消息ID,但libmosquitto为其分配了消息ID,以便可以使用此参数对其进行跟踪。
③topic:要发布的主题,以null结尾的字符串
④payloadlen:有效负载的大小(字节),有效值在0到268,435,455之间;主题消息的内容长度
⑤payload: 主题消息的内容,指向要发送的数据的指针,如果payloadlen >0,则它必须时有效的存储位置。
⑥qos:整数值0、1、2指示要用于消息的服务质量。
⑦retain:设置为true以保留消息。
- 返回:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NOMEM 如果发生内存不足的情况
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
MOSQ_ERR_PROTOCOL 与代理进行通信时是否存在协议错误。
MOSQ_ERR_PAYLOAD_SIZE 如果payloadlen太大。
MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
MOSQ_ERR_QOS_NOT_SUPPORTED 如果QoS大于代理支持的QoS。
MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。
- mosquitto_subscribe
int mosquitto_subscribe( struct mosquitto * mosq, int * mid, const char * sub, int qos )
- 功能:订阅主题函数
- 参数:
①mosq:有效的mosquitto实例,客户端
②mid: 指向int的指针。如果不为NULL,则函数会将其设置为该特定消息的消息ID。然后可以将其与订阅回调一起使用,以确定何时发送消息。;主题的消息ID
③sub: 主题名称,订阅模式。
④qos : 此订阅请求的服务质量。
- 返回值:
MOSQ_ERR_SUCCESS 成功。
MOSQ_ERR_INVAL 如果输入参数无效。
MOSQ_ERR_NOMEM 如果发生内存不足的情况。
MOSQ_ERR_NO_CONN 如果客户端未连接到代理。
MOSQ_ERR_MALFORMED_UTF8 如果主题无效,则为UTF-8
MOSQ_ERR_OVERSIZE_PACKET 如果结果包大于代理支持的包。
参考博客: https://blog.csdn.net/weixin_53361650/article/details/116954595 https://mosquitto.org/api/files/mosquitto-h.html
4.基于DS18B20在本机实现温度的订阅和发布实例
/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_sub.c * Description: This file * * Version: 1.0.0(24/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "24/01/22 07:12:01" * ********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running =1;
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
printf("Call the function: on_connect\n");
if(rc)
{
printf("on_connect error!\n");
exit(1);
}
else
{
if(mosquitto_subscribe(mosq, NULL, "topic", 2))
{
printf("Set the topic error!\n");
exit(1);
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
printf("Call the function: my_disconnect_callback\n");
running = 0;
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
printf("Call the function: on_subscribe\n");
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
printf("Call the function: on_message\n");
printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload);
if(0 == strcmp(msg->payload, "quit"))
{
mosquitto_disconnect(mosq);
}
}
int main()
{
int ret;
struct mosquitto *mosq;
ret = mosquitto_lib_init();
if(ret)
{
printf("Init lib error!\n");
return -1;
}
mosq = mosquitto_new("sub_test", true, NULL);
if(mosq == NULL)
{
printf("New sub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
printf("creat a sub_er success!\n");
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
if(ret)
{
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Start!\n");
while(running)
{
mosquitto_loop(mosq, -1, 1);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
}
/********************************************************************************* * Copyright: (C) 2022 hubeiwuhan * All rights reserved. * * Filename: mosquitto_temp.c * Description: This file * * Version: 1.0.0(23/01/22) * Author: yanp <2405204881@qq.com> * ChangeLog: 1, Release initial version on "23/01/22 07:05:45" * ********************************************************************************/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <netdb.h>
#include <stdlib.h>
#include <fcntl.h>
#include <libgen.h>
#include <netinet/in.h>
#include <mosquitto.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <dirent.h>
#include <mosquitto.h>
#define BUF_SIZE 1024
#define ALIVE_Time 60
#define hostname "localhost"
int running=1;
void printf_usage(char *program);
int ds18b20_get_temper(float * temp);
void sig_handler(int SIG_NUM);
int get_time(char *tim);
int main(int argc,char **argv)
{
int daemon_run=0;
int port;
int opt;
char *topic = NULL;
char tim[32];
float temper;
char buf[512];
char tem[32];
char *user="yanpan";
struct mosquitto *mosq = NULL;
int mid;
char *program = basename(argv[0]);
struct option long_options[] =
{
{
"port",required_argument, NULL, 'p'},
{
"help",no_argument, NULL,'h'},
{
NULL, 0, NULL, 0}
};
while ((opt = getopt_long(argc, argv, "p:h", long_options, NULL)) != -1)
{
switch (opt)
{
case 'p':
port = atoi(optarg);
break;
case 'h':
printf_usage(argv[0]);
break;
default:
break;
}
}
if(!port)
{
printf_usage(program) ;
return 0 ;
}
signal(SIGUSR1, sig_handler);
while(running)
{
if(ds18b20_get_temper(&temper)<0)
{
printf("ds18b20_get_temper() failed\n") ;
return -1;
}
sprintf(tem,"\ttemperature:%5.3f",temper);
get_time(tim);
memset(buf,0,sizeof(buf));
snprintf(buf,sizeof(buf),"%s%s%s",user,tim,tem);
printf("%s\n",buf);
mosquitto_lib_init();
mosq = mosquitto_new("pub_test", true, NULL) ;
if(mosq == NULL)
{
printf("New sub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
printf("Create mosquitto sucessfully!\n");
if(mosquitto_connect(mosq,hostname,port,ALIVE_Time)!= MOSQ_ERR_SUCCESS)
{
printf("Mosq_Connect() failed: %s\n", strerror(errno) );
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -2;
}
printf("Connect %s:%d Sucessfully!\n", hostname, port);
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
printf("mosquitto loop error\n");
mosquitto_destroy(mosq) ;
mosquitto_lib_cleanup() ;
return -3;
}
if( mosquitto_publish(mosq,&mid,"topic",strlen(buf),buf,0,0) != MOSQ_ERR_SUCCESS )
{
printf("Mosq_Publish() error: %s\n", strerror(errno));
mosquitto_destroy(mosq) ;
mosquitto_lib_cleanup() ;
return -4;
}
else
printf("Publish information of temperature Ok!\n") ;
sleep(5);
}
}
void printf_usage(char *program)
{
printf("使用方法:%s【选项】 \n", program);
printf("\n传入参数\n");
printf(" -p[port ] 指定连接的端口号\n");
printf(" -h[help ] 打印帮助信息\n");
printf("\n例如: %s -b -p 8900\n", program);
return;
}
int ds18b20_get_temper(float * temp)
{
char w1_path[128]="/sys/bus/w1/devices/";
char f_name[64];
char buff[128];
char *data_p=NULL;
struct dirent *file=NULL;
DIR *dir=NULL;
int data_fd;
int found = -1;
if((dir=opendir(w1_path))<0)
{
printf("open w1_path failure:%s\n",strerror(errno));
return -1;
}
while((file=readdir(dir)