1、简述
MQTT(Message Queuing Telemetry Transport,基于发布/订阅的消息队列遥测传输协议)(publish/subscribe)模式的"轻量级"该协议构建在通信协议中TCP/IP协议上,由IBM在1999年发布。MQTT最大的优点是为连接远程设备提供实时可靠的信息服务,代码少,带宽有限。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT基于客户端-服务器的消息发布/订阅传输协议。MQTT协议重量轻、简单、开放、易于实现,使其应用广泛。在许多情况下,包括有限的环境,如机器和机器(M2M)通信和物联网(IoT)。广泛应用于卫星链路通信传感器、偶尔拨号的医疗设备、智能家居和一些小型化设备。
2、设计规范
因为物联网的环境很特殊,所以MQTT遵循以下设计原则:
- (1)简化,不添加可有可无的功能;
- (2)发布/订阅(Pub/Sub)传感器之间传输方便的模式;
- (3)允许用户动态创建主题,零运维成本;
- (4)尽量减少传输量,提高传输效率;
- (5)考虑低带宽、高延迟、不稳定网络等因素;
- (6)支持连续会话控制;
- (7)理解客户端计算能力可能很低;
- (8)提供服务质量管理;
- (9)假设数据未知,不强制传输数据的类型和格式,以保持灵活性。
3、主要特性
MQTT协议设计为远程传感器和控制设备通信,具有以下主要特点:
(1)利用发布/订阅消息模式提供一对多的消息发布,解除应用程序耦合。
?这与此非常相似XMPP,但是MQTT信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。
(2)屏蔽负载内容的消息传输。
(3)使用TCP/IP提供网络连接。
?主流的MQTT是基于TCP数据推送连接,但也是基于UDP版本叫MQTT-SN。由于这两个版本的连接方式不同,优缺点自然也不同。
(4)发布服务质量有三种新闻:
?最多一次的消息发布完全依赖于底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP如果您的智能设备在新闻推送过程中没有联网,则推送过去没有收到,则无法再次联网。
?至少一次确保信息到达,但可能会重复。
?只有一次,以确保信息达到一次。这种级别可用于一些严格的计费系统。在计费系统中,重复或丢失消息会导致错误的结果。这种最高质量的新闻发布服务也可用于即时通讯APP推送,确保用户只收到一次。
(5)小传输,费用小(固定长度的头部为2字节),协议交换最小化,减少网络流量。
?这就是为什么在介绍中说它非常适合在物联网领域,传感器和服务器的通信和信息收集。我们应该知道,嵌入式设备的计算能力和带宽相对较弱,这是非常适合使用该协议传输信息的。
(6)使用Last Will和Testament特征通知各方客户端异常中断机制。
Last Will:即遗言机制,用于通知同一主题下其他设备发送遗言的设备已断开连接。
Testament:遗嘱机制与功能相似Last Will。
5.协议中的订阅、主题和会话
订阅包括主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会和会话(Session)关联。一个会话可以包含多个订阅。每个会话中的每个订阅都有不同的主题筛选器。
每个客户端与服务器建立连接后,都是客户端与服务器之间的状态交互。会话存在于网络之间,也可能跨越客户端与服务器之间的多个连续网络连接。
与服务器订阅相匹配的标签连接到应用程序消息的标签上。服务器将消息发送给订阅匹配标签的每个客户端。
在订阅表达式中使用主题名通配符筛选器,表示订阅匹配的多个主题。
具体内容由消息订阅者接收。
编程实例
利用mqtt.fx软件实现聊天功能,fx订阅
"up"
主题,程序订阅"down"
主题使用如下json通信协议。{ "name": "zhangsan", "age": 16, "msg": "hello world" }
代码实现:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" #include "cJSON.h" #define ADDRESS "tcp://192.168.1.250:1883" #define CLIENTID "ExampleClientSub" #define TOPIC "up" #define TOPICSEND "down" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { int i; char* payloadptr; cJSON *root = NULL; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; for(i=0; i<message->payloadlen; i ) { putchar(*payloadptr ); } putchar('\n'); root = cJSON_Parse(message->payload); if (!root) { printf("Error before: [%s]\n",cJSON_GetErrorPtr()); } else { printf("%s\n", cJSON_Print(root)); } MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; cJSON *cjson_test = NULL; int rc; int ch; char buf1[32] = {0}; char buf2[32] = {0}; char buf3[32] = {0}; char buf[521] = {0}; MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
do
{
if(ch!='Q' && ch != 'q')
{
ch = getchar();
}else
{
break;
}
ch = getchar();
fgets(buf1,sizeof(buf1),stdin);
fgets(buf2,sizeof(buf2),stdin);
fgets(buf3,sizeof(buf3),stdin);
buf1[strlen(buf1)-1] = '\0';
buf2[strlen(buf2)-1] = '\0';
buf3[strlen(buf3)-1] = '\0';
cjson_test = cJSON_CreateObject();
cJSON_AddStringToObject(cjson_test, "name", buf1);
cJSON_AddStringToObject(cjson_test, "age", buf2);
cJSON_AddStringToObject(cjson_test, "msg", buf3);
strcpy(buf,cJSON_Print(cjson_test));
pubmsg.payload = buf;
pubmsg.payloadlen = (int)strlen(buf);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPICSEND, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), buf, TOPICSEND, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
} while(1);
MQTTClient_unsubscribe(client, TOPIC);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
本代码部分取自MQTT代码实例具体安装实现可从官网下载库文件
官方网址:GitHub - eclipse/paho.mqtt.c at v1.3.0