文章目录
- 1. 前言
- 2. 简介
- 3. 实现方式
- 4. 特性
-
- 4.1 Qos 0的工作原理
- 4.2 Qos 1的工作原理
- 4.3 Qos 2的工作原理
- 5. 事件上报
- 6. 代码实现
-
- 6.1 发布者
- 6.2 订阅者
- 7. 实验现象
-
- 7.1 发布者
- 7.2 订阅者
- 参考文献
1. 前言
物联网 (Internet of things(IoT)), 不同物理对象通过各种信息传感器和无线网络技术互联。 物联网设备及PC性能低于服务器。 与局域网带宽相比,传输带宽小,速度低。 实现物联网各设备之间的信息传输,MQTT这是一个非常合适的工具。
2. 简介
物联网 (Internet of things(IoT)), 不同物理对象通过各种信息传感器和无线网络技术互联。 物联网设备及PC性能低于服务器。 与局域网带宽相比,传输带宽小,速度低。 实现物联网各设备之间的信息传输,MQTT这是一个非常合适的工具。
3. 实现方式
实现MQTT在通信过程中,需要完成客户端和服务器端的通信,MQTT协议中有三种身份:出版商:( publisher )、代理(broker)、订阅者(subscriber)。 其中,新闻发布者和订阅者都是客户端,新闻代理是服务器,新闻发布者可以同时是订阅者。 MQTT传输的信息分为:主题(Topic)和负载(payload)两部分: (1)Topic:订阅的主题,即channel,频道; (2)payload:新闻内容,出版商向订阅者发布的具体新闻。 以下场景为例,方便大家理解以上文字: 如上图所示,subscribers1和subscribers订阅的主题是topic1和topic2,publisher1向topic这个主题发送数据,所以只有subscribers1才能收到,subscribers2是收不到topic这个主题的消息。topic也是如此。 现在问题来了,怎么样MQTT如何保证信息的可靠传输?
4. 特性
MQTT支持QOS(quality of service 服务质量),提供可靠的信息传输。 Qos 0:信息最多发送一次。 Qos 1:消息至少发送一次。 Qos 消息只发一次。
4.1 Qos 0的工作原理
从上图可以看出,Qos最多发一次消息,发完信息就不管了。是一种尽力而为的服务类型。
4.2 Qos 1的工作原理
从上图可以看出,Qos 1.工作时会有确认机制,出版商会向代理服务器发送信息。如果收到代理服务器,需要回应PUBACK报纸表示你收到的数据。当出版商收到它时PUBACK数据时,会将本地缓存的信息删除。 当然,如果代理没有收到数据,出版商也没有收到数据PUBACK发布者将在一段时间后重传响应信息。 代理和订阅者也是如此。 在使用Qos 1点,订阅者可能会收到重复的信息,因此Qos 1适用于客户端可以接收和处理重复信息的场景。
4.3 Qos 2的工作原理
如上图所示,Qos 发布者的信息只能发送一次,以避免重复。 同时,虽然信息只发送一次,但会有多种确认机制,以确保可靠性。 综上所述,从0到2,可靠性逐渐提高。
5. 事件上报
当虚拟机的生命周期发生变化时,会发生事件通知。但事件发生时,默认情况下不会传输到前端界面。因此,需要帮助mqtt协议,协助信息传递。
6. 代码实现
6.1 发布者
#include <stdio.h> #include <stdlib.h> #include <libvirt/libvirt.h> #include <pthread.h> #include <time.h> #include <string.h> #include "mosquitto.h" #define HOST "127.0.0.1" #define PORT 1883 #define KEEP_ALIVE 60 #define MSG_MAX_SIZE 512 static int myEventCallback(virConnectPtr conn, virDomainPtr dom, int event, int detail, void *opaque) { const char *name = virDomainGetName(dom); struct tm *currTime; time_t now; time(&now); currTime = localtime(&now); int ret; struct mosquitto *mosq; char buff[MSG_MAX_SIZE]; ret = mosquitto_lib_init(); mosq = mosquitto_new("Pub", true, NULL); char username[] = "username"; char password[] = "123456"; int authRet = mosquitto_username_pw_set(mosq, username, password); ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE); printf("Start!\n"); snprintf(buff, MSG_MAX_SIZE, "%d/%d/%d %d:%d:%d event(%d) occurred in the domain = < %s >.\n", currTime->tm_year 1900, currTime->tm_mon 1, currTime->tm_mday, currTime->tm_hour, currTime->tm_min, currTime->tm_sec, event, name); mosquitto_publish(mosq, NULL, "topic1", strlen(buff) 1, buff, 0, 0); mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); printf("End!\n"); return 0; } static void* eventThreadLoop() { while(1) { if(virEventRunDefaultImpl() < 0) { printf("Run errer.\n"); } } abort(); } int main() { virConnectPtr conn = NULL; int eventid = VIR_DOMAIN_EVENT_ID_LIFECYCLE; virEventRegisterDefaultImpl(); conn = virConnectOpen(NULL); pthread_t eventThread; pthread_create(&eventThread, NULL, eventThreadLoop, NULL); int id = virConnectDomainEventRegisterAny(conn, NULL, eventid, VIR_DOMAIN_EVENT_CALLBACK(myEventCallback), NULL, NULL); while(1) pause(); virConnectDomainEventDeregisterAny(conn, id); virConnectClose(conn); return 0; }
a id="62__161">6.2 订阅者
#include <stdio.h>
#include <stdlib.h>
#include "mosquitto.h"
#define HOST "127.0.0.1"
#define PORT 1883
#define KEEPALIVE 60
void my_connect_callback(struct mosquitto *mosq, void *obj, int rec) {
printf("Call the function: my_connect_callback.\n");
if(rec){
printf("On_connect error.\n");
exit(1);
} else {
if(mosquitto_subscribe(mosq, NULL, "topic1", 0)) {
printf("Set topic error.\n");
exit(1);
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rec) {
printf("Call the function: my_disconnect_callback.\n");
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int*granted_qos) {
printf("Call the function: my_subscribe_callback.\n");
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
printf("Call the function: my_message_callback.\n");
printf("Receive a message %s: %s", (char*)msg->topic, (char*)msg->payload);
}
int main() {
int ret = 0;
struct mosquitto *mosq;
ret = mosquitto_lib_init();
if(ret) {
printf("init lib is failed.\n");
return -1;
}
mosq = mosquitto_new("sub", true, NULL);
if(mosq == NULL) {
printf("create new mosquitto instance failed.\n");
mosquitto_lib_cleanup();
return -1;
}
const char username[] = "username";
const char password[] = "123456";
ret = mosquitto_username_pw_set(mosq, username, password);
if(ret) {
printf("username or password is wrong.\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
ret = mosquitto_connect(mosq, HOST, PORT, KEEPALIVE);
if(ret) {
printf("connect to broker is failed.\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Subscribe begin.\n");
int loop = mosquitto_loop_start(mosq);
if(loop) {
printf("mosquitto loop error.\n");
return -1;
}
const char cmd[10];
while(1) {
scanf("%s", cmd);
if(0 == strcmp(cmd, "quit")) {
running = 0;
}
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("Subscribe end.\n");
return 0;
}
7. 实验现象
7.1 发布者
对虚拟机进行操作 发布者显示信息
7.2 订阅者
接收发布者发送的虚拟机事件信息。
参考文献
[1] https://blog.p2hp.com/archives/4100 [2] https://blog.csdn.net/qq_33406883/article/details/107466430 [3] https://www.cnblogs.com/sxkgeek/p/9140180.html