java mqtt-server/broker 使用简单 扩展方便 集群稳定 轻松支持10万并发 已用于生产环境
轻量级物联网MQTT服务器, 快速部署, 支持集群.
基于java8+netty+springboot+redis+hazelcast技术栈实现
不支持topic如下
* 不支持为空
* 不支持以'/'开始或结束
* 不支持命名为'joRootTopic'
* 不支持分隔符之间无字符 例如:ab///c
* 不支持包含禁用字符 例如:ab/+11 或者c/12#1
jo-mqtt
├── mqtt-broker -- MQTT服务器功能的核心实现
├── mqtt-springboot -- springboot集成mqtt启动
├── mqtt-test -- MQTT服务器测试用例
- 参考MQTT3.1.1规范实现
- 完整的QoS服务质量等级实现
- 遗嘱消息, 保留消息及消息分发重试
- 心跳机制
- 集群功能
采用logback框架
集群默认使用RedisExtendProvider实现扩展,则集群间通信依赖redis的pubsub功能
#server config
#tcp端口配置
#-1表示不开启
mqtt.serverConfig.tcpPort=1883
#-1表示不开启
mqtt.serverConfig.tcpSslPort=1888
#webSocket配置
mqtt.serverConfig.webSocketPath=/joMqtt
#-1表示不开启
mqtt.serverConfig.webSocketPort=2883
#-1表示不开启
mqtt.serverConfig.webSocketSslPort=2888
mqtt.serverConfig.enableClientCA=false
mqtt.serverConfig.hostname=
#provider配置 默认有如下3中实现
#不支持集群间通信 不支持消息持久化
mqtt.serverConfig.extendProviderClass=joey.mqtt.broker.provider.MemoryExtendProvider
#支持集群间通信 支持消息持久化
#mqtt.serverConfig.extendProviderClass=joey.mqtt.broker.provider.RedisExtendProvider
#hazelcastProvider相关配置 支持集群间通信 不支持消息持久化
#mqtt.serverConfig.extendProviderClass=joey.mqtt.broker.provider.HazelcastExtendProvider
#mqtt.customConfig.hazelcastConfigFile=classpath:hazelcast/hazelcast-local.xml
#mqtt.customConfig.hazelcastConfigFile=file:/home/hazelcast-local.xml
#password 采用sha256hex加密 例子中密码明文和用户名一致
mqtt.serverConfig.enableUserAuth=true
mqtt.serverConfig.authUsers[0].userName=local
mqtt.serverConfig.authUsers[0].password=25bf8e1a2393f1108d37029b3df5593236c755742ec93465bbafa9b290bddcf6
mqtt.serverConfig.authUsers[1].userName=admin
mqtt.serverConfig.authUsers[1].password=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
#netty config
mqtt.nettyConfig.bossThreads=0
mqtt.nettyConfig.workerThreads=0
mqtt.nettyConfig.epoll=false
mqtt.nettyConfig.soBacklog=1024
mqtt.nettyConfig.soReuseAddress=true
mqtt.nettyConfig.tcpNoDelay=true
mqtt.nettyConfig.soSndBuf=65536
mqtt.nettyConfig.soRcvBuf=65536
mqtt.nettyConfig.soKeepAlive=true
mqtt.nettyConfig.channelTimeoutSeconds=200
#如果使用了RedisExtendProvider 则必须配置redisConfig
mqtt.customConfig.redisConfig.host=localhost
mqtt.customConfig.redisConfig.password=
mqtt.customConfig.redisConfig.port=6379
mqtt.customConfig.redisConfig.database=0
mqtt.customConfig.redisConfig.timeout=3000
mqtt.customConfig.redisConfig.pool.maxActive=50
mqtt.customConfig.redisConfig.pool.maxWait=1000
mqtt.customConfig.redisConfig.pool.maxIdle=50
mqtt.customConfig.redisConfig.pool.minIdle=20
#如果开启ssl 则必须配置如下信息
mqtt.customConfig.sslContextConfig.sslKeyFilePath=ssl/jomqtt-server.pfx
mqtt.customConfig.sslContextConfig.sslKeyStoreType=PKCS12
mqtt.customConfig.sslContextConfig.sslManagerPwd=jo_mqtt
mqtt.customConfig.sslContextConfig.sslStorePwd=jo_mqtt
#自定义节点名称 可以不配置 默认是UUID
#mqtt.customConfig.nodeName=jo_mqtt_1
#用户自定义扩展配置
mqtt.customConfig.extConfig.k1=v1
mqtt.customConfig.extConfig.k2=v2
mqtt.customConfig.extConfig.k3.k31=v31
mqtt.customConfig.extConfig.k3.k32=v32
修改配置文件mqtt.serverConfig.extendProviderClass=joey.mqtt.broker.provider.XXXXProvider
- 获取messageId存储实现: IMessageIdStore initMessageIdStore();
- 获取session存储实现: ISessionStore initSessionStore();
- 获取主题订阅存储实现: ISubscriptionStore initSubscriptionStore(ISessionStore sessionStore);
- 获取retain消息存储实现: IRetainMessageStore initRetainMessageStore();
- 获取pubMessage消息存储实现: IDupPubMessageStore initDupPubMessageStore();
- 获取pubRelMessage消息存储实现: IDupPubRelMessageStore initDupPubRelMessageStore();
- 获取授权管理实现: IAuth initAuthManager(List<AuthUser> userList);
- 获取集群间通信实现: IInnerTraffic initInnerTraffic(InnerPublishEventProcessor innerPublishEventProcessor);
- 获取事件监听器列表: List<IEventListener> initEventListeners();
- 获取当前实例节点名称 String getNodeName();
- 初始化sslContext: SslContext initSslContext(boolean enableClientCA) throws Exception;