基于Akka实现的数据实时流式同步的应用,支持高可用
基于Akka实现的高性能数据实时流式同步的应用
hi,大家好 失踪人口回归
estuary
其实一直都没有停,但是闭源版和业务耦合太深,没法直接放出来,我这次准备慢慢整理出开源版,这次直接放出Mysql2Mysql,之后很快就会放出Mongo,Kafka等Source或者Sink
虽然闭源版的功能已经稳定,但是开源版还是需要测试验证,我会一点点补充测试用例的,绝不弃坑
注:以后专注维护3.x
estuary致力于提供一个数据实时同步的方案,实现从数据源到数据汇的端到端实时同步,旨在解决异构数据同步问题
数据在整个程序运行过程中分为三个阶段
顺序如下图
SOURCE
->TRANSFORM
->SINK
本程序以Akka为核心构建起来的,利用Akka驱动程序的逻辑流程 目前完成了Mysql Binlog 到 Mysql的实现
功能域目前拆解成两个大部分,分别是同步域
和元数据管理域
同步域是整个程序的核心
同步域是由若干个功能各不相同的worker
组合而成,worker对应的具体实现就是一个/一组Actor
每个worker都有其角色
SyncContoller是整个同步任务的发起者,是其他(除了SyncDaemon以外的)Actor的父Actor
数据/消息的流向如所示
对于整个同步流程来说,事务的等级与同步的效率成负相关
MOD
>= PRIMARY_KEY
>= DATABSE_TABLE
>> TRANSACTION
主要在三个阶段实现:
Estuary自带了功率控制功能,避免Vsource > Vsink产生问题
对于有schema信息的数据源,处理复杂schema变化情况下的相应schema变化
schema读取和更新的三个层次
元数据管理的生命周期
支持简单Mysql2Kafka 现已不维护
支持Mysql2kafka 支持元数据管理域 现已不维护
{
"batchThreshold": 1, //打包大小 默认1
"binlogJournalName":"mysql-bin.005539", //可不填 默认从最后
"binlogPosition":4, //可不填
"costing": true,
"counting": true,
"defaultConnectionTimeoutInSeconds": "3000",
"fetchDelay": 0,
"filterBlackPattern": "",
"filterPattern": "xxx\\..*",
"filterQueryDcl": false,
"filterQueryDdl": false,
"filterQueryDml": false,
"filterRows": false,
"filterTableError": false,
"kafkaAck": "1",
"kafkaBootstrapServers": "xxxx" //必填,
"kafkaLingerMs": "0",
"kafkaMaxBlockMs": "2000",
"kafkaRetries": "3",
"kafkaSpecficTopics": {},
"kafkaDdlTopic": "SrcMyDdl",
"kafkaTopic": "estuary1",
"listenRetrytime": 3,
"listenTimeout": 5000,
"powerAdapted": true,
"profiling": true,
"receiveBufferSize": 0,
"sendBufferSize": 0,
"syncTaskId": "xxxx",//必须填
"sync":false,
"taskType": 2,
"zookeeperServers": "", //必须填
"zookeeperTimeout": 20000
}
现在专注维护3.x
在这假定你使用Idea进行开发
将ANTLR的文件夹指定为source folder
mvn compile
cp application.properties.templete application.properties
cp application.conf.templete application.conf
编辑文件来配置你的属性
mvn package
./bin/start
调用接口,详情产看Swagger-ui.html
{
"mysql2MysqlRunningInfoBean": {
"batchThreshold": 1,
"batcherNameToLoad": {}, //选填 batcher动态加载
"batcherNum": 23,//不能小于1
"controllerNameToLoad": {},//选填,controller动态加载
"costing": true, //是否计数
"counting": true, //是否计算耗时
"fetcherNameToLoad": {}, //选填 fetcher动态加载
"mappingFormatName":"string",//选择加载的处理模式
"mysqlDatabaseNameList": [
"string"
], //选填,数据库名称
"needExecuteDDL": true, //是否执行ddl
"offsetZkServers": "string", //必填,zk地址
"partitionStrategy": "PRIMARY_KEY",// 分区策略
"powerAdapted": true,//是否功率调节
"profiling": true, //是否计算详细信息
"schemaComponentIsOn": true, //是否开启元数据管理模块
"sinkerNameToLoad": {},//选填 sinker 动态加载
"startPosition": { //可选
"included": true,
"journalName": "string",
"position": 0,
"serverId": 0,
"timestamp": 0
},
"syncStartTime": 0, //同步开始时间
"syncTaskId": "string" //必填,任务id
},
"mysqlSinkBean": {
"autoCommit": true, //是否自动提交
"connectionTimeout": 0, //选填 time时间
"credential": { //必填
"address": "string",
"defaultDatabase": "string",
"password": "string",
"port": 0,
"username": "string"
},
"maximumPoolSize": 0 //选填
},
"mysqlSourceBean": {
"concernedDatabase": [ //必填
"string"
],
"filterBlackPattern": "string", //选填,过滤
"filterPattern": "string", //选填,白名单
"filterQueryDcl": true, //选填
"filterQueryDdl": true, //..
"filterQueryDml": true,//..
"filterRows": true,//..
"filterTableError": true,//..
"ignoredDatabase": [
"string"
],
"master": { //必填
"address": "string",
"defaultDatabase": "string",
"password": "string",
"port": 0,
"username": "string"
}
}
}
//一个样例
{
"mysql2MysqlRunningInfoBean": {
"batcherNum": 31,
"offsetZkServers": "nbhd.aka.laplace.zookeeper.com:2181",
"partitionStrategy": "PRIMARY_KEY",
"syncTaskId": "nbhd",
"sinkerNameToLoad": {
"sinker": "com.neighborhood.aka.laplace.estuary.mysql.lifecycle.reborn.sink.MysqlBinlogInOrderMysqlRingBufferSinker" //推荐使用这个
},
"startPosition": {
"timestamp": 1548126793000 //binlog会从这个时间点消费
}
},
"mysqlSinkBean": {
"credential": {
"address": "localhost",
"defaultDatabase": "",
"password": "123456",
"port": 3306,
"username": "root"
}
},
"mysqlSourceBean": {
"concernedDatabase": [
"xxx"
],
"filterPattern": "xxx\\.yyy", //白名单过滤
"master": {
"address": "localhost",
"defaultDatabase": "",
"password": "123456",
"port": 3306,
"username": "root"
}
}
}