Amazingwujun Mqttx Save

MQTTX Project 完整实现 mqttv3.1.1 协议,旨在提供易于使用且性能优异的 mqtt broker

Project README

MQTTX Project

license language

中文 | English

1 介绍

Mqttx 基于 MQTT v3.1.1 协议开发,旨在提供 易于使用性能优越mqtt broker

注意:分支 v1.2 要求 JDK17, 其它分支要求 JDK8

关联项目: Mqttx-Client 实现&使用极为简单的 mqttv3.1.1 客户端.

1.1 快速开始

想通过 docker 快速体验?见 docker 启动

  1. 打包

    • 开发模式:
      1. 启动 redis 实例
      2. 运行 mvnw -P dev -DskipTests=true clean package
  2. 运行

    1. 运行命令:java -jar mqttx-1.0.5.BETA.jar

图例:

快速开始

1.2 项目依赖

  • Redis: 集群消息、消息持久化
  • Kafka:桥接消息支持,集群消息(可选功能)

其它说明:

  1. 项目使用了 lombok,使用 ide 请安装对应的插件

开发工具建议使用 Intellij IDEA :blush:

举例:idea 需要安装插件 Lombok, settings > Build,Execution,Deployment > Compiler > Annotation Processor 开启 Enable annotation processing

1.3 线上实例

云服务到期,实例已经无法访问,有朋友赞助吗/(ㄒoㄒ)/~~

云端部署了一个 mqttx 单例服务,可供功能测试:

  1. 不支持 ssl
  2. 开启了 websocket, 可通过 http://ws.tool.tusk.link/ 测试,仅需将域名修改为:119.45.158.51(端口、地址不变)
  3. 支持共享订阅功能
  4. 部署版本 v1.0.6.RELEASE

websocket

2 架构

mqttx支持客户端认证、topic 发布/订阅鉴权功能,如果需要配套使用,建议的架构如下图:

架构图

客户认证服务由使用者自行实现

内部实现框架关系(仅列出关键项):

ak6mB6.png

2.1 目录结构

├─java
│  └─com
│      └─jun
│          └─mqttx
│              ├─broker         # mqtt 协议实现及处理包
│              │  ├─codec       # 编解码
│              │  └─handler     # 消息处理器(pub, sub, connn, etc)
│              ├─config         # 配置,主要是 bean 声明
│              ├─constants      # 常量
│              ├─consumer       # 集群消息消费者
│              ├─entity         # 实体类
│              ├─exception      # 异常类
│              ├─service        # 业务服务(用户认证, 消息存储等)接口
│              │  └─impl        # 默认实现
│              └─utils          # 工具类
└─resources                     # 资源文件(application.yml 在此文件夹)
    ├─META-INF                  # spring-configuration 辅助配置说明
    └─tls                       # ca 存放地址

3 docker 启动

镜像已上传至 docker-hub , 访问:fantasywujun/mqttx - Docker Hub 全部镜像

docker 环境安装好后,执行 docker-compose -f ./docker-compose.yml up 启动, 效果见下图:

y3R3tI.md.png

Docker Pull Command 说明
docker pull fantasywujun/mqttx:1.2.0 基于 jdk17.0.1mqttx:1.2.0 版本
docker pull fantasywujun/mqttx:1.2.1 基于 jdk17.0.1mqttx:1.2.1 版本
docker pull fantasywujun/mqttx:1.2.2 基于 jdk17.0.1mqttx:1.2.2 版本
docker pull fantasywujun/mqttx:1.2.3 基于 jdk17.0.1mqttx:1.2.3 版本

docker-compose 文件内容:

version: "2"
services:
  redis:
    container_name: redis-for-mqttx
    image: redis
  mqttx:
    container_name: mqttx
    image: fantasywujun/mqttx:1.2.2
    environment:
      mqttx.max-bytes-in-message: 10485760
      mqttx.web-socket.enable: false
    ports:
      - 1883:1883

4 功能说明

4.1 qos 支持

qos0 qos1 qos2
支持 支持 支持

为支持 qos1、qos2,引入 redis 作为持久层,这部分已经封装成接口,可自行替换实现(比如采用 mysql)。

4.2 topicFilter 支持

  1. 支持多级通配符 #与单级通配符 +
  2. 不支持以 /结尾的topic,比如 a/b/,请改为 a/b
  3. 其它规则见 mqtt v3.1.1 4.7 Topic Names and Topic Filters

mqttx 仅对订阅 topicFilter 进行校验,publish 的 topic 是没有做合法性检查的,可通过开启 4.5 topic 安全支持 限制客户端可发布的 topic。

举例:

topicFilter match topics
/a/b/+ /a/b/abc,/a/b/test
a/b/# a/b, a/b/abc, a/b/c/def
a/+/b/# a/nani/b/abc
/+/a/b/+/c /aaa/a/b/test/c

校验工具类为:com.jun.mqttx.utils.TopicUtils

4.3 集群支持

mqttx 依赖消息中间件分发消息实现集群功能,目前支持的中间件:

  • Kafka:可选配置。为了更好的性能,推荐 kafka 作为集群消息分发器
  • Redis:默认配置

实现原理如下图:

ak6nHK.png

  1. mqttx.cluster.enable:功能开关,默认 false
  2. mqttx.cluster.type: 消息中间件类型,默认 redis

注意事项:

  1. v1.0.5.RELEASE 之前的版本集群功能存在 bug,无法使用。

  2. 如需使用 kafka 实现集群消息,需要手动修改配置 application-*.yml, 可参考 application-dev.yml 中的配置示例 3. kafka 集群

4.4 ssl 支持

开启 ssl 你首先应该有了 ca(自签名或购买),然后修改 application.yml 文件中几个配置:

  1. mqttx.ssl.enable:功能开关,默认 false,同时控制 websocketsocket
  2. mqttx.ssl.key-store-location:keystore 地址,基于 classpath
  3. mqttx.ssl.key-store-password:keystore 密码
  4. mqttx.ssl.key-store-type:keystore 类别,如 PKCS12
  5. mqttx.ssl.client-auth:服务端是否需要校验客户端证书,默认 NONE

resources/tls 目录中的 mqttx.keystore 仅供测试使用, 密码: 123456

证书加载工具类:com/jun/mqttx/utils/SslUtils.java

4.5 topic 安全支持

为了对 client 订阅 topic 进行限制,加入topic 订阅&发布鉴权机制:

  1. mqttx.enable-topic-sub-pub-secure: 功能开关,默认 false

  2. broker 收到 conn 报文后,会抓取 {clientId, username, password} 发起请求给 mqttx.auth.url , 该接口返回对象中含有 authorizedSub,authorizedPub 存储 client 被授权订阅及发布的 topic 列表。

    详见 4.12 基础认证支持

  3. broker 在消息订阅及发布都会校验客户端权限

支持的主题类型:

  • 普通主题
  • 共享主题
  • 系统主题

4.6 共享主题支持

共享订阅是协议 mqtt5 规定的内容,MQTTX 参考协议标准实现。

  1. 格式: $share/{ShareName}/{filter}, $share 为前缀, ShareName 为共享订阅名, filter 就是非共享订阅主题过滤器。
  2. 支持如下两种消息分发规则
    1. round: 轮询
    2. random: 随机
  3. 支持客户端订阅按 ShareName 分组订阅.
  4. 详细内容请参考协议 MQTT Version 5.0 (oasis-open.org)

下图展示了共享主题与常规主题之间的差异:

share-topic

msg-a 消息分发策略取决于配置项 mqttx.share-topic.share-sub-strategy

可以配合 cleanSession = 1 的会话,共享主题的客户端断开连接后会被服务端移除订阅,这样共享主题的消息只会分发给在线的客户端。

CleanSession 介绍:mqtt3.1.1 协议规定当 cleanSession = 1 时,连接断开后与会话相关联的所有状态(不含 retained 消息)都会被删除(mqtt5 增加了会话超时设置,感兴趣的同学可以了解一下)。 mqttx v1.0.5.BETA 版本后(含),cleanSession = 1 的会话消息保存在内存中,具备极高的性能.

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].

The Session state in the Client consists of:

  • QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
  • QoS 2 messages which have been received from the Server, but have not been completely acknowledged.

The Session state in the Server consists of:

  • The existence of a Session, even if the rest of the Session state is empty.
  • The Client’s subscriptions.
  • QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
  • QoS 1 and QoS 2 messages pending transmission to the Client.
  • QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
  • Optionally, QoS 0 messages pending transmission to the Client.

4.7 websocket 支持

支持

4.8 系统主题

mqttx broker 内置部分系统主题,用户可酌情使用。

系统主题不支持如下特性:

  • 集群:系统主题不支持集群,包括消息及订阅
  • 持久化:系统主题消息不支持持久化,包括订阅关系
  • QoS: 不支持 QoS 1,2 仅支持 QoS 0

注意topic 安全机制 同样会影响客户端订阅系统主题, 未授权客户端将无法订阅系统主题

系统主题可分两种:

  1. 状态主题:反应 broker 自身状态的主题
  2. 功能主题:对外提供功能性支持的主题
4.8.1 状态主题

客户端可通过订阅系统主题获取 broker 状态,目前系统支持如下状态主题:

主题 描述
$SYS/broker/{brokerId}/status 触发方式:订阅此主题的客户端会定期(mqttx.sys-topic.interval)收到 broker 的状态,该状态涵盖下面所有主题的状态值.
注意:客户端连接断开后,订阅取消
$SYS/broker/activeConnectCount 立即返回当前的活动连接数量
触发:订阅一次触发一次
$SYS/broker/time 立即返回当前时间戳
触发:订阅一次触发一次
$SYS/broker/version 立即返回 broker 版本
触发:订阅一次触发一次
$SYS/broker/receivedMsg 立即返回 broker 启动到现在收到的 MqttMessage, 不含 ping
触发:订阅一次触发一次
$SYS/broker/sendMsg 立即返回 broker 启动到现在发送的 MqttMessage, 不含 pingAck
触发:订阅一次触发一次
$SYS/broker/uptime 立即返回 broker 运行时长,单位
触发:订阅一次触发一次
$SYS/broker/maxActiveConnectCount 立即返回 broker 运行至今的最大 tcp 连接数
触发:订阅一次触发一次

系统主题 $SYS/broker/{brokerId}/status 中的 brokerId 为配置项参数(见 6.1 配置项),可通过携带通配符的主题 $SYS/broker/+/status 订阅。

响应对象格式为 json 字符串:

{
    "activeConnectCount": 1,
    "maxActiveConnectCount": 2,
    "receivedMsg": 6,
    "sendMsg": 77,
    "timestamp": "2021-03-23T23:05:37.035",
    "uptime": 149,
    "version": "1.0.7.RELEASE"
}
field 说明
activeConnectCount 当前活跃连接数量
maxActiveConnectCount 最大活跃连接数量
receiveMsg 收到消息数量,不含 ping
sendMsg 发送消息数量,不含 pingAck
timestamp 时间戳;(yyyy-MM-dd HH:mm:ss)
uptime broker 上线时长,单位秒
version mqttx 版本
4.8.2 功能主题

此功能需求源自 issue: 监听MQTT客户端状态(在线、离线) · Issue #8 · Amazingwujun/mqttx (github.com)

主题 描述
$SYS/broker/{borkerId}/clients/{clientId}/connected 客户端上线通知主题
触发:当某个客户端上线后,broker 会发送消息给该主题
$SYS/broker/{borkerId}/clients/{clientId}/disconnected 客户端下线通知主题
触发:当某个客户端掉线后,broker 会发送消息给该主题

这两个系统主题支持通配符,举例:

  1. $SYS/broker/+/clients/#: 匹配客户端上下线通知主题
  2. $SYS/broker/+/clients/+/connected: 匹配客户端上线通知主题
  3. $SYS/broker/+/clients/+/disconnected: 匹配客户端下线通知主题

4.9 消息桥接支持

支持消息中间件:

  • kafka

消息桥接功能可方便的对接消息队列中间。

  1. mqttx.message-bridge.enable:开启消息桥接功能
  2. mqttx.bridge-topics:需要桥接消息的主题,主题必须符合 kafkatopic 的要求

mqttx 收到客户端 发布 的消息后,先判断桥接功能是否开启,然后再判断主题是否是需要桥接消息的主题,最后发布消息到 MQ

仅支持单向桥接:device(client) => mqttx => MQ

4.10 主题限流支持

使用基于令牌桶算法的 com.jun.mqttx.utils.RateLimiter 对指定主题进行流量限制。

令牌桶算法参见:https://stripe.com/blog/rate-limiters

简单解释一下令牌桶概念:有一个最大容量为 capacity 的令牌桶,该桶以一定的速率补充令牌(replenish-rate),每次调用接口时消耗一定量(token-consumed-per-acquire)的令牌,令牌数目足够则请求通过。

主题限流仅适用于 qos 等于 0 的消息

配置举例:

mqttx:
  rate-limiter:
    enable: true
    topic-rate-limits:
      # 例一
      - topic: "/test/a"
        capacity: 9
        replenish-rate: 4
        token-consumed-per-acquire: 3
      # 例二
      - topic: "/test/b"
        capacity: 5
        replenish-rate: 5
        token-consumed-per-acquire: 2
  • capacity: 桶容量
  • replenish-rate: 令牌填充速率
  • token-consumed-per-acquire: 每次请求消耗令牌数量

QPS 计算公式:

  1. 最大并发数:公式为 QPS = capacity ÷ token-consumed-per-acquire
    1. 示例一:9 ÷ 3 = 3
    2. 示例二:5 ÷ 2 = 2.5
  2. 最大持续并发数:公式 QPS = replenish-rate ÷ token-consumed-per-acquire
    1. 示例一:4 ÷ 3 ≈ 1.3
    2. 示例二:5 ÷ 2 = 2.5

4.11 消息持久化支持

mqttx 的持久化依赖 redis , mqttx 会持久化 cleanSession = false & qos > 0 的消息, 消息被 Serializer 序列化为字节数组后存储在 redis

目前 mqttx 提供了两种序列化实现:

  1. JsonSerializer
  2. KryoSerializer

默认使用 JsonSerializer, 这是为了和之前的项目兼容;v1.0.6.release 版本后 KryoSerializer 将成为默认序列化实现。

可通过配置 mqttx.serialize-strategy 修改序列化实现。

4.12 基础认证支持

mqttx 提供基础客户端认证服务。

配置项:

  1. mqttx.auth.url: 提供认证服务的接口地址。
  2. mqttx.auth.timeout: HttpClient 请求超时
  3. mqttx.auth.is-mandatory: 是否强制要求校验用户名与密码

用户在配置文件中声明 mqtt.auth.url 后,对象 com.jun.mqttx.service.impl.DefaultAuthenticationServiceImpl 使用 HttpClient 发出 POST 请求给 mqttx.auth.url

请求内容为 mqtt conn 报文中的 username, password.

POST / HTTP/1.1
Host: mqttx.auth.url
Content-Type: application/json
Content-Length: 91

{
    "clientId": "device_id_test",
    "username": "mqttx",
    "password": "123456"
}

认证成功后响应对象为 json 格式字符串:

{
    "authorizedSub": [
        "subTopic1",
        "subTopic2"
    ],
    "authorizedPub": [
        "pubTopic1",
        "pubTopic2"
    ]
}

认证成功返回响应可配合 4.5 topic 安全支持 使用。

注意:

  • 接口返回 http status = 200 即表明认证成功, 其它状态值一律为认证失败

5 开发者说

  1. 感谢 Jetbrains 为开源项目提供的 License

    Jetbrains
  2. 长期更新维护的分支

    1. v1.0: 基于 jdk8 且 redis io 为 blocking 模式.
    2. v1.2: 基于 jdk17 且 redis io 为 non-blocking 模式.
  3. 为使 mqttx 项目变得更好,请使用及学习该项目的同学主动反馈使用情况给我(提 issue 或加群反馈)

  4. 后续工作

    • v1.0.8.RELEASE 版本开发
    • v1.1.0.RELEASE 版本开发
    • v1.2 版本开发
    • v2.0 版本开发
    • bug 修复
  5. v1.2 版本由 JDK8 升级至 JDK17

  6. v2.0 版本分支将作为 mqttv5 协议版本开始迭代

  7. 考虑使用 gossip 协议实现集群功能,集群功能不再依赖 redis or kafka

  8. 请作者喝杯 丝绒拿铁 😊

    coffee
  9. 交流群

    群二维码

6 附表

6.1 配置项

src/main/resources 目录下有三个配置文件:

  1. application.yml
  2. application-dev.yml
  3. application-prod.yml

后两个配置文件目的是区分不同环境下的配置,便于管理。

配置项说明:

配置 默认值 说明
mqttx.version 取自 pom.xml 版本
mqttx.broker-id 取自 pom.xml 应用标志, 唯一
mqttx.heartbeat 60s 初始心跳,会被 conn 消息中的 keepalive 重置
mqttx.host 0.0.0.0 监听地址
mqttx.so-backlog 512 tcp 连接处理队列
mqttx.enable-topic-sub-pub-secure false 客户订阅/发布主题安全功能,开启后将限制客户端发布/订阅的主题
mqttx.ignore-client-self-pub true 忽略 client 发送给自己的消息(当 client 发送消息给自己订阅的主题)
mqttx.max-bytes-in-message 8092 mqttx 允许接收的最大报文载荷,单位 byte.
mqttx.serialize-strategy json broker 采用的序列化策略,集群策略必须一致
mqttx.redis.cluster-session-hash-key mqttx.session.key redis map key;用于集群的会话存储
mqttx.redis.topic-prefix mqttx:topic: 主题前缀; topic <==> client 映射关系保存
mqttx.redis.retain-message-prefix mqttx:retain: 保留消息前缀, 保存 retain 消息
mqttx.redis.pub-msg-set-prefix mqttx:client:pubmsg: client pub消息 redis set 前缀; 保存 pubmsg,当收到 puback 获取 pubrec 后删除
mqttx.redis.pub-rel-msg-set-prefix mqttx:client:pubrelmsg: client pubRel 消息 redis set 前缀;保存 pubrel 消息 flag,收到 pubcom 消息删除
mqttx.redis.topic-set-key mqttx:alltopic topic 集合,redis set key 值;保存全部主题
mqttx.redis.message-id-prefix mqttx:messageId: cleanSession client 的 messageId, 使用 redis INCR 指令
mqttx.redis.client-topic-set-prefix mqttx:client:topicset: client 订阅的主题 redis set 前缀; 保存 client 订阅的全部主题
mqttx.cluster.enable false 集群开关
mqttx.cluster.inner-cache-consistancy-key mqttx:cache_consistence 应用启动后,先查询 redis 中无此 key 值,然后在检查一致性
mqttx.cluster.type redis 集群消息中间件类型
mqttx.ssl.enable false ssl 开关
mqttx.ssl.client-auth NONE 客户端证书校验
mqttx.ssl.key-store-location classpath: tls/mqttx.keystore keyStore 位置
mqttx.ssl.key-store-password 123456 keyStore 密码
mqttx.ssl.key-store-type pkcs12 keyStore 类别
mqttx.socket.enable true socket 开关
mqttx.socket.port 1883 socket 监听端口
mqttx.websocket.enable false websocket 开关
mqttx.websocket.port 8083 websocket 监听端口
mqttx.websocket.path /mqtt websocket path
mqttx.share-topic.share-sub-strategy round 负载均衡策略, 目前支持随机、轮询
mqttx.sys-topic.enable false 系统主题功能开关
mqttx.sys-topic.interval 60s 定时发布间隔
mqttx.message-bridge.enable false 消息桥接功能开关
mqttx.message-bridge.topics null 需要桥接消息的主题列表
mqttx.rate-limiter.enable false 主题限流开关
mqttx.rate-limiter.token-rate-limit 参见 主题限流支持 配置举例说明
mqttx.auth.url null mqtt conn username/password 认证服务接口地址
mqttx.auth.timeout 3s readTimeout
mqttx.auth.is-mandatory false 是否必须验证 conn 报文中的用户名与密码
mqttx.sharable-payload.payload-key-prefix mqttx:sharable-payload: 共享载荷存储 redis key prefix
mqttx.sharable-payload.unique-id-client-ids-set-prefix mqttx:unique-id:client-ids: 共享载荷关联的客户端 id 列表
mqttx.sharable-payload.clean-work-interval 1m 清洗定时间隔。共享载荷清理任务之间的间隔
mqttx.sharable-payload.threshould-in-message 128 共享载荷生效阈值;大于配置项阈值时,载荷共享。
Open Source Agenda is not affiliated with "Amazingwujun Mqttx" Project. README Source: Amazingwujun/mqttx

Open Source Agenda Badge

Open Source Agenda Rating