Awesome Delay Queue Save

分布式延时队列,支持延时触发和计数触发。基于spi技术实现消息持久方案的扩展性,目前已实现基于redis的持久化方案。

Project README

背景

    延迟队列在互联网项目中比较常用,比如订单系统中的超时30分钟未付款,订单取消;用户打车后,48小时后自动评价为5星等等。
笔者以前做众包抢单项目用的是rabbitmq的死信队列,达到延迟处理效果。但是在把延迟时间变短后,
先过期的消息必须等它之前的消息过期后才能被处理。目前开源的分布式延时队列并不多(一些大公司有自研的),
并且很多都不完善,加上公司业务场景需要支持每个topic只能有一个job的限制,
以及同一个job操作计数达到某个值后就触发转为可执行job。
所以笔者打算依据现有的延时队列实现原理,一步一步实现一个分布式延时队列

常见的实现方案

    1. DelayQueue 
        原理:jdk中提供的使用优先队列实现的BlockingQueue,优先队列比较的是时间,内部存储的是实现Delayed接口的对象。 
              只有在对象过期后才能从队列中获取对象。
        优点:效率高,任务触发时间延迟低
        缺点: 数据是保存在内存,需要自己实现持久化;不具备分布式能力,需要自己实现高可用
    2. RabbitMq 死信队列
        原理:RabbitMQ可以对队列和消息设置x-message-tt、expiration来控制消息的存活时间,如果超时,消息变为死信。
              RabbitMQ可以对队列设置x-dead-letter-exchange和x-dead-letter-routing-key两个参数。
              当消息在一个队列中变成死信后会按这两个参数路由,消息就可以重新被消费。
        优点:
            高效,可以利用RabbitMQ的分布式特性轻易进行横向扩展,且支持持久化
        缺点:
            不支持对已发送的消息进行管理
            一个消息比在同一队列中在它之前的其他消息提前过期,提前过期的消息也不会优先进入死信队列。比如
            
    3. Redis实现
        原理:将延迟任务加到Sorted Set,将延迟时间设为score;启动一个线程不断判断Sorted Set中第一个元素的score是否大于当前时间
            如果大于,从Sorted Set中移除任务并添加到执行队列中;如果小于,进行短暂休眠后重试
        
        优点:实现简单,任务可管理(可删除、修改任务)
        缺点:需要有短轮询线程不断判断第一个元素是否过期,造成CPU空耗
              分布式场景中,容易引起多个节点读取到相同任务
    4. 时间轮
        单表时间轮、分层时间轮
        原理:时间轮是一种环形的数据结构,分成多个格,每个格代表一段时间,时间越短,精度越高。
              每个格上用一个链表保存在该格的过期任务。
              指针随着时间一格一格转动,并执行相应格子中的到期任务。
        优点:
            高性能(插入任务、删除任务的时间复杂度均为O(1),DelayQueue由于涉及到排序,插入和移除的复杂度是O(logn))
        缺点:
            数据是保存在内存,需要自己实现持久化
            不具备分布式能力,需要自己实现高可用
            延迟任务过期时间受时间轮总间隔限制

希望达到的目标是?

    精确性(可在指定时间触发任务处理)
    通用性
    高性能
    高可用(支持多实例部署)
    可伸缩(增加和减少服务时,任务会重新分配)
    可重试(任务失败可重试)
    多协议(支持http\dubbo调用)
    可管理(业务使用方可修改、删除任务)
    能告警(失败次数达到阈值可触发告警)
    统一视图(方便查看任务执行情况,可手动干预任务执行)

当前设计

结构图如下

关键元素

1. topic
    主题,可以包含一个或多个job。其结构为:topic,capacity,subJobLimit.
    topic:主题名称,不同的业务场景应有不同的topic,如订单支付超时、自动评价、作业点等
    capacity: 容量。topic中job数量的限制。默认为Integer.maxValue
    subJobLimit:job的计数触发值,客户端需要调用接口通知job的计数值,job的计数值达到这个数据则触发job转到就绪队列。
1. job
    任务单元,其结构为:id,topic,delayTime,expireTime,triggerType,data
    id: job的唯一标示,使用方应保证不同的job,其id不同
    topic: 主题,区分不同的场景,比如下单支付场景topic可为orderPay,超时自动评价的topic可为autoComment.
    delayTime: 延迟时间,秒为单位。
    expireTime:过期时间,绝对时间。等于接收到job的系统时间+delayTime
    triggerType:触发类型,目前有过期触发和计数触发。计数触发是指计数达到某一个数值而触发操作。
    data: job的业务数据,消费者根据此数据进行业务处理。
    
2. bucket
    bucket只存储job的id,类似链表结构。delay bucket存储触发类型为过期触发的job,countdown bucket存储触发类型为计数的job.
    
3. timer
    调度器,用来将到期和到达计数的job取出,按照topic发送到reday queue.
    
4. reday queue
    reday queue,就绪队列。按照topic区分,一个topic对应一个reday queue.消费者可通过轮询拉取就绪队列中的消息。
    考虑以后实现push方式。

几个关键点

1. 如何持久化job数据?
    为了防止停机造成job数据丢失,应当将job信息和bucket信息持久化存储。第一版使用redis存储,需要redis开启持久化机制,做好主备。
    使用spi,抽象存储功能,方便接入其他持久化方案,比如数据库、自定义文件系统等。
    
2. 如何获取延时结束或者倒数结束的job ?
    Timer定时轮询delay bucket 和 countdown bucket,将已过期和已到达计数控制的bucket中的job取出,放入对应的reday queue.
    抽象调度功能,后续实现更高效的方案。

3. 消费端如何获取就绪数据?
    提供获取数据的http接口,消费端轮询获取数据。后续考虑增加push方式。

road map

1.0 基本功能
1.1 与spring的集成
1.2 持久化、调度、消息消费等的优化
2.x 集群功能
Open Source Agenda is not affiliated with "Awesome Delay Queue" Project. README Source: xuqinghua91/awesome-delay-queue
Stars
32
Open Issues
0
Last Commit
5 years ago
License
MIT

Open Source Agenda Badge

Open Source Agenda Rating