资讯详情

MQTT入门

1、简述

MQTT(Message Queuing Telemetry Transport,基于发布/订阅的消息队列遥测传输协议)(publish/subscribe)模式的"轻量级"该协议构建在通信协议中TCP/IP协议上,由IBM在1999年发布。MQTT最大的优点是为连接远程设备提供实时可靠的信息服务,代码少,带宽有限。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT基于客户端-服务器的消息发布/订阅传输协议。MQTT协议重量轻、简单、开放、易于实现,使其应用广泛。在许多情况下,包括有限的环境,如机器和机器(M2M)通信和物联网(IoT)。广泛应用于卫星链路通信传感器、偶尔拨号的医疗设备、智能家居和一些小型化设备。

2、设计规范

因为物联网的环境很特殊,所以MQTT遵循以下设计原则:

  • (1)简化,不添加可有可无的功能;
  • (2)发布/订阅(Pub/Sub)传感器之间传输方便的模式;
  • (3)允许用户动态创建主题,零运维成本;
  • (4)尽量减少传输量,提高传输效率;
  • (5)考虑低带宽、高延迟、不稳定网络等因素;
  • (6)支持连续会话控制;
  • (7)理解客户端计算能力可能很低;
  • (8)提供服务质量管理;
  • (9)假设数据未知,不强制传输数据的类型和格式,以保持灵活性。

3、主要特性

MQTT协议设计为远程传感器和控制设备通信,具有以下主要特点:

(1)利用发布/订阅消息模式提供一对多的消息发布,解除应用程序耦合。

?这与此非常相似XMPP,但是MQTT信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。

(2)屏蔽负载内容的消息传输。

(3)使用TCP/IP提供网络连接。

?主流的MQTT是基于TCP数据推送连接,但也是基于UDP版本叫MQTT-SN。由于这两个版本的连接方式不同,优缺点自然也不同。

(4)发布服务质量有三种新闻:

?最多一次的消息发布完全依赖于底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP如果您的智能设备在新闻推送过程中没有联网,则推送过去没有收到,则无法再次联网。

?至少一次确保信息到达,但可能会重复。

?只有一次,以确保信息达到一次。这种级别可用于一些严格的计费系统。在计费系统中,重复或丢失消息会导致错误的结果。这种最高质量的新闻发布服务也可用于即时通讯APP推送,确保用户只收到一次。

(5)小传输,费用小(固定长度的头部为2字节),协议交换最小化,减少网络流量。

?这就是为什么在介绍中说它非常适合在物联网领域,传感器和服务器的通信和信息收集。我们应该知道,嵌入式设备的计算能力和带宽相对较弱,这是非常适合使用该协议传输信息的。

(6)使用Last Will和Testament特征通知各方客户端异常中断机制。

Last Will:即遗言机制,用于通知同一主题下其他设备发送遗言的设备已断开连接。

Testament:遗嘱机制与功能相似Last Will。

5.协议中的订阅、主题和会话

订阅包括主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会和会话(Session)关联。一个会话可以包含多个订阅。每个会话中的每个订阅都有不同的主题筛选器。

每个客户端与服务器建立连接后,都是客户端与服务器之间的状态交互。会话存在于网络之间,也可能跨越客户端与服务器之间的多个连续网络连接。

与服务器订阅相匹配的标签连接到应用程序消息的标签上。服务器将消息发送给订阅匹配标签的每个客户端。

在订阅表达式中使用主题名通配符筛选器,表示订阅匹配的多个主题。

具体内容由消息订阅者接收。

编程实例

利用mqtt.fx软件实现聊天功能,fx订阅"up"主题,程序订阅"down"主题使用如下json通信协议。

{ "name": "zhangsan", "age": 16, "msg": "hello world" }

代码实现:

#include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" #include "cJSON.h"  #define ADDRESS     "tcp://192.168.1.250:1883" #define CLIENTID    "ExampleClientSub" #define TOPIC       "up" #define TOPICSEND   "down" #define PAYLOAD     "Hello World!" #define QOS         1 #define TIMEOUT     10000L    volatile MQTTClient_deliveryToken deliveredtoken;  void delivered(void *context, MQTTClient_deliveryToken dt) {     printf("Message with token value %d delivery confirmed\n", dt);     deliveredtoken = dt; }  int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) {     int i;     char* payloadptr;     cJSON *root = NULL;      printf("Message arrived\n");     printf("     topic: %s\n", topicName);     printf("   message: ");      payloadptr = message->payload;     for(i=0; i<message->payloadlen; i  )     {         putchar(*payloadptr  );     }     putchar('\n');      root = cJSON_Parse(message->payload);   if (!root)      {         printf("Error before: [%s]\n",cJSON_GetErrorPtr());     }     else     {         printf("%s\n", cJSON_Print(root));     }     MQTTClient_freeMessage(&message);     MQTTClient_free(topicName);     return 1; }  void connlost(void *context, char *cause) {     printf("\nConnection lost\n");     printf("     cause: %s\n", cause); }  int main(int argc, char* argv[]) {     MQTTClient client;     MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;     MQTTClient_message pubmsg = MQTTClient_message_initializer;     MQTTClient_deliveryToken token;      cJSON *cjson_test = NULL;     int rc;     int ch;     char buf1[32] = {0};     char buf2[32] = {0};     char buf3[32] = {0};     char buf[521] = {0};      MQTTClient_create(&client, ADDRESS, CLIENTID,     MQTTCLIENT_PERSISTENCE_NONE, NULL);     conn_opts.keepAliveInterval = 20;     conn_opts.cleansession = 1;      MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);      if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)     {         printf("Failed to connect, return code %d\n", rc);        exit(EXIT_FAILURE);
    }
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    MQTTClient_subscribe(client, TOPIC, QOS);

   
     do 
    {
        if(ch!='Q' && ch != 'q')
        {
            ch = getchar();
        }else
        {
            break;
        }
        
        ch = getchar();
        fgets(buf1,sizeof(buf1),stdin);
        fgets(buf2,sizeof(buf2),stdin);
        fgets(buf3,sizeof(buf3),stdin);

        buf1[strlen(buf1)-1] = '\0';
        buf2[strlen(buf2)-1] = '\0';
        buf3[strlen(buf3)-1] = '\0';

        cjson_test = cJSON_CreateObject();
        cJSON_AddStringToObject(cjson_test, "name", buf1);
        cJSON_AddStringToObject(cjson_test, "age", buf2);
        cJSON_AddStringToObject(cjson_test, "msg", buf3);
        strcpy(buf,cJSON_Print(cjson_test));

        pubmsg.payload = buf;
        pubmsg.payloadlen = (int)strlen(buf);
        pubmsg.qos = QOS;
        pubmsg.retained = 0;
        MQTTClient_publishMessage(client, TOPICSEND, &pubmsg, &token);
        printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), buf, TOPICSEND, CLIENTID);
        rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
        printf("Message with delivery token %d delivered\n", token);
    } while(1);


    MQTTClient_unsubscribe(client, TOPIC);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

 本代码部分取自MQTT代码实例具体安装实现可从官网下载库文件

官方网址:GitHub - eclipse/paho.mqtt.c at v1.3.0

 

标签: ch2563流量传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台