• 周二. 10 月 8th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

MQTT3.1.1协议阅读笔记2——QoS

admin

11 月 28, 2021

接着上一篇之后 回顾上一篇,我们再来加深对 MQTT 的 QoS 的理解。

QoS 分级

QoS等级 特点 解释
0 最多分发一次 接收者不发送响应,发送者也不重新尝试。接收者只能收到一次消息,或者一次也收不到。
1 最少分发一次 确保消息至少一次抵达接收者。
2 精确一次分发

订阅和发布都可以设置 QoS,我们依次来研究

实战发布 QoS 消息

研究发布 QoS 的时候,我们先简化订阅的 QoS,都采用 QoS=0

打开一个 mqtt-spy.jar 的客户端,订阅一个 chatRoom 的主题:

查看 WireShark 报文,默认就是 至多投递一次 (QoS0):

※ MQTT Java 客户端代码:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Scanner;
import java.util.UUID;

public class Main {

    public static void main(String[] args) throws MqttException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
        client.connect();
        System.out.println(clientId + " has connected.");
        Scanner in = new Scanner(System.in);
        for (;;){
            String msg = null;
            if (in.hasNext()) {
                msg = in.next();
            }
            int qos = -1;
            if (in.hasNextInt()) {
                qos = in.nextInt();
            }
            if (msg != null && qos >= 0 && qos <= 2) {
                client.publish("chatRoom", msg.getBytes(StandardCharsets.UTF_8), qos, false);
            }
        }
    }
}

发布 QoS0 消息

首先,发送 QoS0 的消息:

  • Publish Message

发布 QoS1 消息

接着,发送 QoS1 的消息:

  • Publish Message

  • Publish Ack Message

发布 QoS2 消息

最后,发送 QoS2 的消息:

  • Publish Message

  • Publish Received

  • Publish Release

  • Publish Complete

固定报头的第一个字节 Flag Bits

结合报文,我们就能看出 :

  • MQTT 协议固定报头的第一个字节,前4个bit位用来表示消息的类型,后4个bit位,只有 PUBLISH 类型的报文才用到,其他类型的报文暂时保留这4个bit位。
  • PUBLISH 类型的 MQTT 消息包,后4个bit位
    • 第一个bit位DUP用来表示当前是不是重复发送的报文;
    • 第二三个bit位共同表示QoS,即当前 PULISH 报文的 QoS (服务质量)
    • 第三个bit位表示RETAIN,是否保留该消息,让新订阅的客户端收到该消息

本地缓存与QoS

以 Paho 客户端为例,在我写的 Java 代码示例中,使用 MemoryPersistence 作为本地缓存。

  • put保存消息

  • remove移除消息

QoS0 是用不到 MemoryPersistence 的;

QoS1

如果发布消息 QoS1:

  • 发送 PUBLISH 之前,包唯一标识 Message Identifier 和消息的映射将被保存;

  • 收到 PUBLISH ACK 之后,包唯一标识 Message Identifier 将被移除

QoS2

如果发布消息 QoS2:

  • 发布 PUBLISH 之前,PUBLISH 消息将被保存,比如此时包唯一标识 Message Identifier 为 2

  • 收到 PUBLISH RECEIVED 之后发送 PUBLISH RELEASE 之前,保存该消息,包唯一标识仍然是 2,但是前缀不一样了

  • 收到 PUBLISH COMPLETE 之后,之前缓存的 PUBLISH 消息要删除

  • 收到 PUBLISH COMPLETE 之后,之前缓存的 PUBLISH RELEASE 消息也要删除

一旦已经发送了相应的PUBREL包,就不能再重发PUBLISH包。

重发消息

QoS 涉及到消息的补发,Paho 客户端选择的补发时机是重连成功之后,即接收到 CONNACK 报文之后,执行org.eclipse.paho.client.mqttv3.internal.ClientState#restoreInflightMessages

参考文档

MQTT 3.1.1 协议中文版 阅读

Mqtt Qos 深度解读 阅读

发表回复