賊好用,這款精準(zhǔn)定時(shí)任務(wù)和延時(shí)隊(duì)列框架
項(xiàng)目簡(jiǎn)述
Mykit體系中提供的簡(jiǎn)單、穩(wěn)定、可擴(kuò)展的延遲消息隊(duì)列框架,提供精準(zhǔn)的定時(shí)任務(wù)和延遲隊(duì)列處理功能
項(xiàng)目模塊說明
mykit-delay-common: mykit-delay 延遲消息隊(duì)列框架通用工具模塊,提供全局通用的工具類
mykit-delay-config: mykit-delay 延遲消息隊(duì)列框架通用配置模塊,提供全局配置
mykit-delay-queue: mykit-delay 延遲消息隊(duì)列框架核心實(shí)現(xiàn)模塊,目前所有主要的功能都在此模塊實(shí)現(xiàn)
mykit-delay-controller: mykit-delay 延遲消息隊(duì)列框架Restful接口實(shí)現(xiàn)模塊,對(duì)外提供Restful接口訪問,兼容各種語言調(diào)用
mykit-delay-core: mykit-delay 延遲消息隊(duì)列框架的入口,整個(gè)框架的啟動(dòng)程序在此模塊實(shí)現(xiàn)
mykit-delay-rpc:mykit-delay延時(shí)消息隊(duì)列的RPC模塊,支持Dubbo、brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等主流RPC的實(shí)現(xiàn)
mykit-delay-test: mykit-delay 延遲消息隊(duì)列框架通用測(cè)試模塊,主要提供Junit單元測(cè)試用例
需求背景
用戶下訂單后未支付,30分鐘后支付超時(shí)
在某個(gè)時(shí)間點(diǎn)通知用戶參加系統(tǒng)活動(dòng)
業(yè)務(wù)執(zhí)行失敗之后隔10分鐘重試一次
類似的場(chǎng)景比較多 簡(jiǎn)單的處理方式就是使用定時(shí)任務(wù) 假如數(shù)據(jù)比較多的時(shí)候 有的數(shù)據(jù)可能延遲比較嚴(yán)重,而且越來越多的定時(shí)業(yè)務(wù)導(dǎo)致任務(wù)調(diào)度很繁瑣不好管理。
隊(duì)列設(shè)計(jì)
整體架構(gòu)設(shè)計(jì)如下圖所示。
開發(fā)前需要考慮的問題
及時(shí)性 消費(fèi)端能按時(shí)收到
同一時(shí)間消息的消費(fèi)權(quán)重
可靠性 消息不能出現(xiàn)沒有被消費(fèi)掉的情況
可恢復(fù) 假如有其他情況 導(dǎo)致消息系統(tǒng)不可用了 至少能保證數(shù)據(jù)可以恢復(fù)
可撤回 因?yàn)槭茄舆t消息 沒有到執(zhí)行時(shí)間的消息支持可以取消消費(fèi)
高可用 多實(shí)例 這里指HA/主備模式并不是多實(shí)例同時(shí)一起工作
消費(fèi)端如何消費(fèi)
當(dāng)然初步選用Redis作為數(shù)據(jù)緩存的主要原因是因?yàn)閞edis自身支持zset的數(shù)據(jù)結(jié)構(gòu)(score 延遲時(shí)間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時(shí)間維度去判定執(zhí)行的順序 同時(shí)也支持Map list數(shù)據(jù)結(jié)構(gòu)。
簡(jiǎn)單定義一個(gè)消息數(shù)據(jù)結(jié)構(gòu)
private String topic;/***topic**/private String id;/***自動(dòng)生成 全局惟一 snowflake**/private String bizKey;private long delay;/***延時(shí)毫秒數(shù)**/private int priority;//優(yōu)先級(jí)private long ttl;/**消費(fèi)端消費(fèi)的ttl**/private String body;/***消息體**/private long createTime=System.currentTimeMillis();private int status= Status.WaitPut.ordinal();
運(yùn)行原理
用Map來存儲(chǔ)元數(shù)據(jù)。id作為key,整個(gè)消息結(jié)構(gòu)序列化(JSON/…)之后作為value,放入元消息池中。
將id放入其中(有N個(gè))一個(gè)zset有序列表中,以createTime delay priority作為score。修改狀態(tài)為正在延遲中
使用timer實(shí)時(shí)監(jiān)控zset有序列表中top 10的數(shù)據(jù) 。 如果數(shù)據(jù)score<=當(dāng)前時(shí)間毫秒就取出來,根據(jù)topic重新放入一個(gè)新的可消費(fèi)列表(list)中,在zset中刪除已經(jīng)取出來的數(shù)據(jù),并修改狀態(tài)為待消費(fèi)
客戶端獲取數(shù)據(jù)只需要從可消費(fèi)隊(duì)列中獲取就可以了。并且狀態(tài)必須為待消費(fèi) 運(yùn)行時(shí)間需要<=當(dāng)前時(shí)間的 如果不滿足 重新放入zset列表中,修改狀態(tài)為正在延遲。如果滿足修改狀態(tài)為已消費(fèi)。或者直接刪除元數(shù)據(jù)。
客戶端
因?yàn)樯婕暗讲煌绦蛘Z言的問題,所以當(dāng)前默認(rèn)支持http訪問方式。
添加延時(shí)消息添加成功之后返回消費(fèi)唯一ID POST /push {……消息體}
刪除延時(shí)消息 需要傳遞消息ID GET /delete?id=
恢復(fù)延時(shí)消息 GET /reStore?expire=true|false expire是否恢復(fù)已過期未執(zhí)行的消息。
恢復(fù)單個(gè)延時(shí)消息 需要傳遞消息ID GET /reStore/id
獲取消息 需要長(zhǎng)連接 GET /get/topic
用Nginx暴露服務(wù),配置為輪詢 在添加延遲消息的時(shí)候就可以流量平均分配。
目前系統(tǒng)中客戶端并沒有采用HTTP長(zhǎng)連接的方式來消費(fèi)消息,而是采用MQ的方式來消費(fèi)數(shù)據(jù)這樣客戶端就可以不用關(guān)心延遲消息隊(duì)列。只需要在發(fā)送MQ的時(shí)候攔截一下 如果是延遲消息就用延遲消息系統(tǒng)處理。
消息可恢復(fù)
實(shí)現(xiàn)恢復(fù)的原理 正常情況下一般都是記錄日志,比如mysql的binlog等。
這里我們直接采用mysql數(shù)據(jù)庫(kù)作為記錄日志。
目前創(chuàng)建以下2張表:
消息表 字段包括整個(gè)消息體
消息流轉(zhuǎn)表 字段包括消息ID、變更狀態(tài)、變更時(shí)間、zset掃描線程Name、host/ip
定義zset掃描線程N(yùn)ame是為了更清楚的看到消息被分發(fā)到具體哪個(gè)zset中。前提是zset的key和監(jiān)控zset的線程名稱要有點(diǎn)關(guān)系 這里也可以是zset key。
支持消息恢復(fù)
假如redis服務(wù)器宕機(jī)了,重啟之后發(fā)現(xiàn)數(shù)據(jù)也沒有了。所以這個(gè)恢復(fù)是很有必要的,只需要從表1也就是消息表中把消息狀態(tài)不等于已消費(fèi)的數(shù)據(jù)全部重新分發(fā)到延遲隊(duì)列中去,然后同步一下狀態(tài)就可以了。
當(dāng)然恢復(fù)單個(gè)任務(wù)也可以這么干。
數(shù)據(jù)表設(shè)計(jì)
這里,我就直接給出創(chuàng)建數(shù)據(jù)表的SQL語句。
DROP TABLE IF EXISTS `mykit_delay_queue_job`;CREATE TABLE `mykit_delay_queue_job` ( `id` varchar(128) NOT NULL, `bizkey` varchar(128) DEFAULT NULL, `topic` varchar(128) DEFAULT NULL, `subtopic` varchar(250) DEFAULT NULL, `delay` bigint(20) DEFAULT NULL, `create_time` bigint(20) DEFAULT NULL, `body` text, `status` int(11) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `update_time` datetime(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`), KEY `mykit_delay_queue_job_STATUS` (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for mykit_delay_queue_job_log-- ----------------------------DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;CREATE TABLE `mykit_delay_queue_job_log` ( `id` varchar(128) NOT NULL, `status` int(11) DEFAULT NULL, `thread` varchar(60) DEFAULT NULL, `update_time` datetime(3) DEFAULT NULL, `host` varchar(128) DEFAULT NULL, KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
關(guān)于高可用
分布式協(xié)調(diào)還是選用zookeeper。
如果有多個(gè)實(shí)例最多同時(shí)只能有1個(gè)實(shí)例工作 這樣就避免了分布式競(jìng)爭(zhēng)鎖帶來的壞處,當(dāng)然如果業(yè)務(wù)需要多個(gè)實(shí)例同時(shí)工作也是支持的,也就是一個(gè)消息最多只能有1個(gè)實(shí)例處理,可以選用zookeeper或者redis就能實(shí)現(xiàn)分布式鎖了。
最終做了一下測(cè)試多實(shí)例同時(shí)運(yùn)行,可能因?yàn)闀?huì)涉及到鎖的問題性能有所下降,反而單機(jī)效果很好。所以比較推薦基于docker的主備部署模式。
運(yùn)行模式
支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk集群地址列表
支持 cluster 會(huì)涉及到分布式鎖競(jìng)爭(zhēng) 效果不是很明顯 分布式鎖采用redis的 setNx實(shí)現(xiàn)
StandAlone
目前,經(jīng)過測(cè)試,推薦使用master slave的模式,并且,在升級(jí)版本中,進(jìn)一步增強(qiáng)了Master Slave模式。后期會(huì)優(yōu)化Cluster模式。
如何接入
為了提供一個(gè)統(tǒng)一的精準(zhǔn)定時(shí)任務(wù)和延時(shí)隊(duì)列框架,mykit-delay提供了HTTP Rest接口和RPC方式供其他業(yè)務(wù)系統(tǒng)調(diào)用,接口使用簡(jiǎn)單方便,只需要簡(jiǎn)單的調(diào)用接口,傳遞相應(yīng)的參數(shù)即可。
RPC方式調(diào)用,后續(xù)支持的方式有:
Dubbo(已實(shí)現(xiàn))
brpc(預(yù)留支持)
grpc(預(yù)留支持)
Motan(預(yù)留支持)
Sofa(預(yù)留支持)
SpringCloud(預(yù)留支持)
SpringCloud Alibaba(預(yù)留支持)
HTTP方式接入
消息體
以JSON數(shù)據(jù)格式參數(shù) 目前提供了http 協(xié)議。
body 業(yè)務(wù)消息體
delay 延時(shí)毫秒 距createTime的間隔毫秒數(shù)
id 任務(wù)ID 系統(tǒng)自動(dòng)生成 任務(wù)創(chuàng)建成功返回
status 狀態(tài) 默認(rèn)不填寫
topic 標(biāo)題
subtopic 保留字段
ttl 保留字段
createTime 創(chuàng)建任務(wù)時(shí)間 非必填 系統(tǒng)默認(rèn)
啟動(dòng)HTTP Rest服務(wù)
首先,從gitHub Clone項(xiàng)目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進(jìn)入mykit-delay框架目錄。
cd mykit-delay
執(zhí)行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進(jìn)入 mykit-delay-core 的 target 目錄下,運(yùn)行如下命令。
java -jar mykit-delay-core-xxx.jar
其中,xxx是版本號(hào),以實(shí)際下載的版本號(hào)為準(zhǔn)。
接下來,就可以調(diào)用HTTP Restful接口來使用mykit-delay框架了。
添加任務(wù)
/push POST application/json{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}
刪除任務(wù)
刪除任務(wù) 需要記錄一個(gè)JobId
/delete?jobId=xxx GET
恢復(fù)單個(gè)任務(wù)
用于任務(wù)錯(cuò)亂 腦裂情況 根據(jù)日志恢復(fù)任務(wù)
/reStore?expire=true GET
參數(shù)expire 表示是否需要恢復(fù)已過期還未執(zhí)行的數(shù)據(jù)
清空隊(duì)列數(shù)據(jù)
根據(jù)日志中未完成的數(shù)據(jù)清空隊(duì)列中全部數(shù)據(jù)。清空之后 會(huì)刪除緩存中的所有任務(wù)
/clearAll GET
Dubbo方式接入
消息體
以JSON數(shù)據(jù)格式參數(shù) 目前提供了http 協(xié)議。
body 業(yè)務(wù)消息體
delay 延時(shí)毫秒 距createTime的間隔毫秒數(shù)
id 任務(wù)ID 系統(tǒng)自動(dòng)生成 任務(wù)創(chuàng)建成功返回
status 狀態(tài) 默認(rèn)不填寫
topic 標(biāo)題
subtopic 保留字段
ttl 保留字段
createTime 創(chuàng)建任務(wù)時(shí)間 非必填 系統(tǒng)默認(rèn)
啟動(dòng)Dubbo服務(wù)
首先,從GitHub Clone項(xiàng)目到本地
git clone https://github.com/sunshinelyz/mykit-delay.git
然后進(jìn)入mykit-delay框架目錄。
cd mykit-delay
執(zhí)行Maven命令
mvn clean package -Dmaven.test.skip=true
接下來,進(jìn)入 mykit-rpc-dubbo模塊下的 mykit-rpc-dubbo-server服務(wù) 的 target 目錄下,運(yùn)行如下命令。
mykit-rpc-dubbo-server-xxx.jar
其中,xxx是版本號(hào),以實(shí)際下載的版本號(hào)為準(zhǔn)。
引入mykit-delay依賴
以Dubbo方式接入mykit-delay,需要引入mykit-delay的依賴,如下所示。
<dependency> <groupId>io.mykit.delay</groupId> <artifactId>mykit-rpc-dubbo-common</artifactId> <version>1.0-SNAPSHOT</version></dependency>
然后,在需要調(diào)用Dubbo服務(wù)的類中以如下方式注入MykitDelayDubboInterface。
@DubboReference(version = "1.0.0")private MykitDelayDubboInterface mykitDelayDubboInterface;
其中,MykitDelayDubboInterface接口的定義如下所示。
/** * @author binghe * @version 1.0.0 * @description 發(fā)布的Dubbo接口 */public interface MykitDelayDubboInterface { /** * 推送消息 */ ResponseMessage push(JobWrapp jobMsg); /** * 刪除任務(wù) */ ResponseMessage delete(String jobId); /** * 完成任務(wù) */ ResponseMessage finish(String jobId); /** * 恢復(fù)單個(gè)任務(wù) */ ResponseMessage reStoreJob(String jobId); /** * 提供一個(gè)方法 假設(shè)緩存中間件出現(xiàn)異常 以及數(shù)據(jù)錯(cuò)亂的情況 提供恢復(fù)功能 * @param expire 過期的數(shù)據(jù)是否需要重發(fā) true需要, false不需要 默認(rèn)為true */ ResponseMessage reStore(Boolean expire); /** * 清除所有的任務(wù) */ ResponseMessage clearAll();}
接下來,就可以以Dubbo方式接入mykit-delay框架了。
注意:無論是以HTTP方式,還是以RPC方式啟動(dòng)mykit-delay服務(wù),都需要通過如下方式加載基本配置信息。
StartGetReady.ready(ConsumeQueueProvider.class.getName());
客戶端獲取隊(duì)列方式
目前默認(rèn)實(shí)現(xiàn)了RocketMQ與ActiveMQ的推送方式。依賴MQ的方式來實(shí)現(xiàn)延時(shí)框架與具體業(yè)務(wù)系統(tǒng)的解耦。同時(shí),框架已SPI的形式加載相應(yīng)的MQ,也就是說,集成MQ的方式是可擴(kuò)展的。
消息體中消息與RocketMQ和 ActiveMQ 消息字段對(duì)應(yīng)關(guān)系
mykit-delay RocketMQ ActiveMQ 備注
topic topic topic 點(diǎn)對(duì)點(diǎn)發(fā)送隊(duì)列名稱或者主題名稱
subtopic subtopic subtopic 點(diǎn)對(duì)點(diǎn)發(fā)送隊(duì)列子名稱或者主題子名稱
body 消息內(nèi)容 消息內(nèi)容 消息內(nèi)容
關(guān)于系統(tǒng)配置
延遲框架與具體執(zhí)行業(yè)務(wù)系統(tǒng)的交互方式通過延遲框架配置實(shí)現(xiàn),具體配置文件位置為mykit-delay-config項(xiàng)目下的resources/properties/starter.properties文件中。
測(cè)試
需要配置好數(shù)據(jù)庫(kù)地址和Redis的地址 如果不是單機(jī)模式 也需要配置好Zookeeper
運(yùn)行mykit-delay-test模塊下的測(cè)試類io.mykit.delay.test.PushTest添加任務(wù)到隊(duì)列中
啟動(dòng)mykit-delay-test模塊下的io.mykit.delay.TestDelayQueue消費(fèi)前面添加數(shù)據(jù) 為了方便查詢效果 默認(rèn)的消費(fèi)方式是consoleCQ 控制臺(tái)輸出
擴(kuò)展
支持zset隊(duì)列個(gè)數(shù)可配置,避免大數(shù)據(jù)帶來高延遲的問題。進(jìn)一步增強(qiáng)框架的高可用。
近期規(guī)劃
brpc、grpc、Motan、Sofa、SpringCloud、SpringCloud Alibaba等RPC擴(kuò)展
支持RabbitMQ、Kafka等消息中間件
分區(qū)(buck)支持動(dòng)態(tài)設(shè)置
redis與數(shù)據(jù)庫(kù)數(shù)據(jù)一致性的問題 (重要)
實(shí)現(xiàn)自己的推拉機(jī)制
支持可切換實(shí)現(xiàn)方式,目前只是依賴Redis實(shí)現(xiàn),后續(xù)待優(yōu)化,支持更多的可配置選項(xiàng)
支持Web控制臺(tái)管理隊(duì)列
實(shí)現(xiàn)消息消費(fèi)TTL機(jī)制
增加對(duì)框架和定時(shí)任務(wù)的監(jiān)控