MQTT

MQTT (Message Queue Telemetry Transport,消息队列遥测传输协议),是一种基于发布-订阅(publish-subscribe)模式的轻量级通讯协议,该协议构建于 TCP/IP 协议上。

MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

网络受限:网络带宽较低且传输不可靠

终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

MQTT 协议概要

发布-订阅模式

发布-订阅模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。

该协议将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的 MQ 有点类似。

发布-订阅模式最重要的方面是消息的发布者与订阅者的解耦。这种解耦有几个维度:

  • 空间解耦:发布者和订阅者不需要相互了解(例如,不交换 IP 地址和端口)。
  • 时间解耦:发布者和订阅者不需要同时运行。
  • 同步解耦:两个组件的操作在发布或接收时不需要中断。

总之,发布-订阅模式消除了传统客户端-服务器之间的直接通信,把通信这个操作交给了 broker 进行代理,并在空间、时间、同步三个维度上进行了解藕。

发布-订阅模式比传统的客户端-服务器模式有了更好的扩展性:

  • broker 高度并行化,基于事件驱动;
  • 消息的缓存和消息的智能路由;
  • 可以通过集群代理来实现数百万的连接,使用负载均衡器将负载分配到更多的单个服务器上。

消息过滤

很明显,broker 在发布-订阅过程中起着举足轻重的作用。但是代理如何过滤所有消息,以便每个订阅者只接收感兴趣的消息?broker 有几个可以过滤的选项:

  • 基于主题的过滤:

    此过滤基于属于每条消息的主题。接收客户端向代理订阅感兴趣的主题,订阅后,broker 就会确保客户端收到发布到 topic 中的消息。

  • 基于内容的过滤:

    在基于内容的过滤中,broker 会根据特定的内容过滤消息,接受客户端会经过过滤他们感兴趣的内容。这种方法的一个显著的缺点就是必须事先知道消息的内容,不能加密或者轻易修改。

  • 基于类型的过滤:

    当使用面向对象的语言时,基于消息(事件)的类型/类进行过滤是一种常见做法。例如,订阅者可以收听所有类型为 Exception 或任何子类型的消息。

MQTT 与消息队列的区别

  • 消息队列存储消息直到消息被消费:

    使用消息队列时,每条传入消息都存储在队列中,直到被客户端(通常称为消费者)接收。如果没有客户端接收到消息,消息将保持在队列中并等待被消费。在消息队列中,不会存在消息没有客户端消费的情况,但是在 MQTT 中,却存在 topic 无 subscriber 订阅的情况。

  • 一条消息只被一个客户端消费:

    在传统的消息队列中,一条消息只能被一个消费者处理。负载分布在队列的所有消费者之间。在 MQTT 中,行为完全相反:订阅主题的每个订阅者都会收到消息,每个订阅者有相同的负载。

  • 队列是命名的,必须显式创建:

    队列比主题严格得多。在使用队列之前,必须使用单独的命令显式创建队列。只有在队列命名和创建之后,才可以发布或消费消息。相比之下,MQTT 主题非常灵活,可以即时创建。

MQTT 重要概念

MQTT Client

publishersubscriber 都属于 MQTT Client。发布者和订阅者是一种相对的概念,指的是当前客户端是在发布消息还是在接收消息。发布和订阅的功能也可以由同一个 MQTT Client 实现。

MQTT Broker

与 MQTT Client 对应的就是 MQTT Broker。 Broker 是任何发布-订阅协议的核心,根据实现的不同,代理可以处理多达数百万连接的 MQTT Client。

Broker 的责任:

  • 接收所有消息,过滤消息,确定是哪个 Client 订阅了消息,并将消息发送给对应的 Client;
  • 保存会话数据,这些数据包括订阅的和错过的消息;
  • 客户端的身份验证和授权。

MQTT Connection

TCP 协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于 TCP/IP 协议上。客户端和代理都需要有一个 TCP/IP 协议支持。

MQTT 连接始终位于一个客户端和代理之间。客户端从不直接相互连接。要发起连接,客户端向代理发送 CONNECT 消息。代理使用 CONNACK 消息和状态代码进行响应。建立连接后,代理将保持打开状态,直到客户端发送断开连接命令或连接中断。

消息列表

CONNECT

为了创建连接,客户端向代理发送此命令消息。如果此 CONNECT 消息格式错误或打开网络套接字和发送连接消息之间的时间过长,代理将关闭连接。

一个 MQTT 客户端发送一条 CONNECT 连接,这条 CONNECT 连接可能会包含下面这些信息:

  • clientId

    ClientId 的长度可以是 1-23 个字符,在一个服务器上 ClientId 不能重复。如果超过 23 个字符,则服务器返回 CONNACK 消息中的返回码为 Identifier Rejected。在 MQTT 3.1.1 中,如果不需要代理持有状态,可以发送一个空的 ClientId。空的 ClientId 导致连接没有任何状态。在这种情况下,clean session 标志必须设置为 true,否则代理将拒绝连接。

  • cleanSession

    Clean Session 标志告诉代理客户端是否要建立持久会话。在持久会话 (CleanSession = false) 中,代理存储客户端的所有订阅以及以服务质量(QoS)级别 1 或 2 订阅的客户端的所有丢失消息。 如果会话不是持久的 (CleanSession = true ),代理不为客户端存储任何内容,并清除任何先前持久会话中的所有信息。

  • username/password

    MQTT 可以发送用户名和密码进行客户端认证和授权。但是,如果此信息未加密或散列,则密码将以纯文本形式发送。我们强烈建议将用户名和密码与安全传输一起使用。像 HiveMQ 这样的代理可以使用 SSL 证书对客户端进行身份验证,因此不需要用户名和密码。

  • willMessage

    LastWillxxx 表示的是遗愿,client 在连接 broker 的时候将会设立一个遗愿,这个遗愿会保存在 broker 中,当 client 因为非正常原因断开与 broker 的连接时,broker 会将遗愿发送给订阅了这个 topic(订阅遗愿的 topic)的 client。

  • keepAlive

    keepAlive 是 client 在连接建立时与 broker 通信的时间间隔,通常以秒为单位。这个时间指的是 client 与 broker 在不发送消息下所能承受的最大时长。

CONNACK

当 broker 收到 CONNECT 消息时,它有义务回复 CONNACK 消息进行响应。CONNACK 消息包括两部分内容:

  • sessionPresent

    会话当前标志,这个标志会告诉 client 当前 broker 是否有一个持久性会话与 client 进行交互。SessionPresent 标志和 CleanSession 标志有关,当 client 在 CleanSession 设置为 true 的情况下连接时,SessionPresent 始终为 false,因为没有持久性会话可以使用。如果 CleanSession 设置为 false,则有两种可能性:

    • 如果 ClientId 的会话信息可用,并且 broker 已经存储了会话信息,那么 SessionPresent 为 true;
    • 否则如果没有 ClientId 的任何会话信息,那么 SessionPresent 为 false。

  • returnCode

    连接返回码,告诉客户端连接尝试是否成功。连接确认标志有下面这些选项:

PUBLISH

MQTT 客户端可以在连接到 broker 后立即发布消息。MQTT 使用的是基于 topic 主题的过滤。每条消息都必须包含一个主题,broker 可以使用该主题将消息转发给感兴趣的客户端。通常,每条消息都有一个负载(Payload),其中包含要以字节格式传输的数据。MQTT 是数据无关性的,也就是说数据是由发布者决定要发送的是 XML 、JSON 还是二进制数据、文本数据。

  • packetId

    这个 packetId 标识在 client 和 broker 之间唯一的消息标识。packetId 仅与大于零的 QoS 级别相关。

  • topicName

    主题名称是一个简单的字符串,它以正斜杠作为分隔符进行分层结构。例如,“我的家/客厅/温度”或“德国/慕尼黑/十月节/人”。

  • qos

    此数字表示消息的服务质量 (QoS)。有三个级别:0、1 和 2。服务级别决定了消息到达预期接收者(客户端或代理)的保证类型。

  • retainFlag

    此标志表示 broker 将最近收到的一条 RETAIN 标志位为 true 的消息保存在服务器端(内存或者文件)。

  • payload

    这个是每条消息的实际内容。MQTT 是数据无关性的。可以发送任何文本、图像、加密数据以及二进制数据。

  • dupFlag

    该标志表明该消息是重复的并且由于预期的接收者(客户端或代理)没有确认原始消息而被重新发送。仅与 QoS 大于 0 的级别相关。

当客户端向 MQTT broker 发送消息进行发布时,broker 读取消息、确认消息(根据 QoS 级别)并处理消息。broker 的处理包括确定哪些客户端订阅了主题并将消息发送给他们。

最初发布消息的客户端只关心将 PUBLISH 消息传递给 broker。一旦 broker 收到 PUBLISH 消息,broker 就有责任将消息传递给所有订阅者。发布客户端不会得到关于是否有人对发布的消息感兴趣或有多少客户端从 broker 收到消息的任何反馈。

SUBSCRIBE

client 会向 broker 发送 SUBSCRIBE 消息来接收有关感兴趣的 topic。

  • packetId

    消息的唯一标识符。与 PUBLISH 消息的 packetId 一样。

  • List of Subscriptions

    订阅列表。一个 SUBSCRIBE 消息可以包含一个客户端的多个订阅。每个订阅由一个主题和一个 QoS 级别组成。订阅消息中的主题可以包含通配符,使订阅主题模式而不是特定主题成为可能。如果一个客户端存在重叠订阅,则代理会传送该主题具有最高 QoS 级别的消息。

SUBACK

为了确认每个订阅,broker 向客户端发送一个 SUBACK 确认消息。

  • packetId

    消息的唯一标识符。与 SUBSCRIBE 消息的一样。

  • returnCode

    broker 为它在 SUBSCRIBE 消息中收到的每个主题/QoS 对发送一个返回代码。例如,如果 SUBSCRIBE 消息有五个订阅,则 SUBACK 消息包含五个返回码。返回码确认每个主题并显示 broker 授予的 QoS 级别。如果 broker 拒绝订阅,则 SUBACK 消息包含该特定主题的失败返回代码。例如,如果客户端没有足够的权限订阅主题或主题格式错误。

客户端成功发送 SUBSCRIBE 消息并收到 SUBACK 消息后,它会获取与 SUBSCRIBE 消息包含的订阅中的主题匹配的每条已发布消息。

UNSUBSCRIBE

SUBSCRIBE 消息的对应是 UNSUBSCRIBE 消息。此消息删除 broker 上客户端的现有订阅。

  • packetId

    消息的唯一标识符。与 PUBLISH 消息的 packetId 一样。

  • List of Topics

    主题列表。需要取消订阅的主题。

UNSUBACK

为了确认取消订阅,broker 向客户端发送一个 UNSUBACK 确认消息。

  • packetId

    消息的唯一标识符。与 UNSUBSCRIBE 消息的 packetId 一样。

客户端收到来自 broker 的 UNSUBACK 后,可以认为 UNSUBSCRIBE 消息中的订阅被删除了。

主题(Topic)

在 MQTT 中,主题指的是 broker 用于为每个连接的客户端过滤消息的 UTF-8 字符串。主题由一个或多个主题级别组成。每个主题级别由正斜杠(主题级别分隔符)分隔。

与消息队列相比,MQTT 主题非常轻量级。客户端在发布或订阅它之前不需要创建所需的主题。broker 接受每个有效主题而无需任何事先初始化。

通配符

当客户端订阅主题时,它可以订阅已发布消息的确切主题,也可以使用通配符同时订阅多个主题。通配符只能用于订阅主题,不能用于发布消息。有两种不同类型的通配符:单级和多级。

  • 单级:+

    单级通配符只替换一个主题级别。

    上述的主题匹配可以产生以下结果

  • 多级:#

    多级通配符涵盖多个主题级别。多级通配符必须作为主题中的最后一个字符放置,并以正斜杠开头。

    上述的主题匹配可以产生以下结果

以 $ 开头的主题

$ 主题保留用于 MQTT 代理的内部统计信息。客户端无法向这些主题发布消息。目前,此类主题尚无官方标准化,代理实现各不相同,但通常用 $SYS/ 来表示这类信息。MQTT GitHub wiki 中提供了对 $SYS-topics 的一项建议。

$SYS/broker/clients/connected

$SYS/broker/clients/disconnected

$SYS/broker/clients/total

$SYS/broker/messages/sent

$SYS/broker/uptime

参考资料