Fountain is a Java based toolkit for syncing MySQL binlog and provide an easy API to process/publish events.
Fountain是监查、捕捉MySQL数据库的增量变化,分发数据变化给消费者处理的一套解决方案。
Fountain,英[ˈfaʊntən],是”源泉“的意思,MySQL数据库源源不断的下发增量,因此而得名。
任何需要快速、准确接收MySQL数据变化增量的场景均适用,例如
Fountain支持MySQL的row base binlog协议,稳定测试版本支持MySQL5.1-5.6。
Fountain是一个Java类库,不是一套服务或平台,通过模块依赖方式使用。
Fountain将自己伪装为MySQL的从库,通过binlog dump命令请求MySQL Server,通过主从复制协议获取到MySQL Server不断推送过来的row based binlog日志,解析出数据变化增量,封装为Java的POJO对象,供消费者消费使用。
整个生命周期分为三个阶段:1、握手阶段,2、验证阶段,3、接收阶段。如下图所示。
这里注意,Fountain要求MySQL的日志格式必须是row based格式,不能是statement或者mixed。
Fountain的处理流程进行了规范抽象化,如下架构图所示,其中每一步均是可定制扩展的、可组装的,符合设计模式中的开闭原则。
整体流程是:
JDK版本、Maven/Gradle依赖、日志配置以及使用说明请见参考链接
下面的所有例子请参考SingleShardBinlogSyncerTest和SingleShardBinlogSyncerV51Test
由于Fountain使用后台线程做同步,因此下面的例子都使用了CountDownLatch来做延迟,生产环境请使用守护线程,或者容器中运行。
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769").username("beidou").password("u7i8o9p0"))
.binlogDumpStrategy(dumpStrategy)
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
可以配置多个MySQL,当一个不可用时,默认切换为另外一个。
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769,10.94.37.23:8769")
.username("beidou,beidou")
.password("u7i8o9p0,beidou")
.slaveId("123,124"))
.binlogDumpStrategy(dumpStrategy)
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.0.0.1:3306,10.0.0.2:3306")
.username("beidou,beidou")
.password("u7i8o9p0,u7i8o9p0")
.slaveId("123,124"))
.binlogDumpStrategy(dumpStrategy)
.whiteTables("fountain_test.*")
.blackTables("abc.*")
.soTimeout(60000) //一旦超时,会自动切换另外一个数据源,fountain保证多数据源之间的HA
.transactionPolicy(new MiniTransactionPolicy()) //max 30000 row changes
.messageQueueSize(30000) // max 20000 events
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
上述的例子都是使用默认的EventConsumer,只打印一些简单信息,可以自定义EventConsumer。
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769").username("beidou").password("u7i8o9p0"))
.binlogDumpStrategy(dumpStrategy)
.consumer(new EventConsumer() {
@Override
public void onEvent(ChangeDataSet changeDataSet) {
printTableData(changeDataSet);
}
@Override
public void onSuccess(ChangeDataSet changeDataSet, DisposeEventPositionBridge positionBridge) {
//do nothing
}
@Override
public void onFail(ChangeDataSet changeDataSet, Throwable t) {
logger.info("Some problem occurred..." + t.getMessage());
}
})
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
// 自定义逻辑的方法
void printTableData(ChangeDataSet changeDataSet) {
Map<String, List<RowData>> tableData = changeDataSet.getTableData();
for (String tableName : tableData.keySet()) {
logger.info("{}->size={}", tableName, tableData.get(tableName).size());
for (RowData rowData : tableData.get(tableName)) {
logger.info("before:" + rowData.getBeforeColumnList());
logger.info("after:" + rowData.getAfterColumnList());
}
}
}
处理完增量如果成功,一般推荐在onSuccess中存储同步点,下面的例子存储同步点在本地。
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769").username("beidou").password("u7i8o9p0"))
.binlogDumpStrategy(dumpStrategy)
.disposeEventPosition(new LocalFileGtIdSetDisposeEventPosition("/Users/baidu/work/fountain-git/test"))
.consumer(new EventConsumer() {
@Override
public void onEvent(ChangeDataSet changeDataSet) {
printTableData(changeDataSet);
}
@Override
public void onSuccess(ChangeDataSet changeDataSet, DisposeEventPositionBridge positionBridge) {
positionBridge.getDisposeEventPosition(changeDataSet.getInstanceName())
.saveSyncPoint(changeDataSet.getSyncPoint());
}
@Override
public void onFail(ChangeDataSet changeDataSet, Throwable t) {
logger.info("Some problem occurred..." + t.getMessage());
}
})
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
AsyncFixedRateDisposeEventPosition eventPosition = new AsyncFixedRateDisposeEventPosition();
eventPosition.setInitDelayMs(10000);
eventPosition.setPeriodMs(15000);
eventPosition.setDelegate(new LocalFileGtIdSetDisposeEventPosition("/Users/baidu/work/fountain-git/test"));
eventPosition.init();
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769").username("beidou").password("u7i8o9p0"))
.binlogDumpStrategy(dumpStrategy)
.disposeEventPosition(eventPosition)
.consumer(new EventConsumer() {
@Override
public void onEvent(ChangeDataSet changeDataSet) {
printTableData(changeDataSet);
}
@Override
public void onSuccess(ChangeDataSet changeDataSet, DisposeEventPositionBridge positionBridge) {
positionBridge.getDisposeEventPosition(changeDataSet.getInstanceName())
.saveSyncPoint(changeDataSet.getSyncPoint());
}
@Override
public void onFail(ChangeDataSet changeDataSet, Throwable t) {
logger.info("Some problem occurred..." + t.getMessage());
}
})
.build();
syncer.start();
latch.await(120L, TimeUnit.SECONDS);
详细参考【高可用】使用Zookeeper做多实例热备以及存储同步点
CountDownLatch latch = new CountDownLatch(1);
BinlogGtIdV56DumpStrategy dumpStrategy = new BinlogGtIdV56DumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
SingletonZkClientProvider zkClient = new SingletonZkClientProvider();
zkClient.setZookeeperConnectionString("127.0.0.1:2181");
zkClient.setConnectionTimeoutMs(30000);
zkClient.setSessionTimeoutMs(30000);
ZkDisposeEventPosition zkEventPosition = new ZkDisposeEventPosition();
zkEventPosition.setZkRootPath("/fountain/eventposition/testha");
zkEventPosition.setZkClientProvider(zkClient);
zkEventPosition.setSyncPointFactory(new GtIdSyncPointFactory());
AsyncFixedRateDisposeEventPosition eventPosition = new AsyncFixedRateDisposeEventPosition();
eventPosition.setInitDelayMs(10000);
eventPosition.setPeriodMs(15000);
eventPosition.setDelegate(zkEventPosition);
eventPosition.init();
ZkHaGuard haGuard = new ZkHaGuard();
haGuard.setZkClientProvider(zkClient);
haGuard.setLatchPath("/fountain/leader/testha");
BinlogSyncer syncer = BinlogSyncBuilder.newBuilder()
.producerName("producer00")
.dataSource(DataSource.of("10.94.37.23:8769").username("beidou").password("u7i8o9p0"))
.binlogDumpStrategy(dumpStrategy)
.disposeEventPosition(eventPosition)
.haGuard(haGuard)
.consumer(new EventConsumer() {
@Override
public void onEvent(ChangeDataSet changeDataSet) {
printTableData(changeDataSet);
}
@Override
public void onSuccess(ChangeDataSet changeDataSet, DisposeEventPositionBridge positionBridge) {
positionBridge.getDisposeEventPosition(changeDataSet.getInstanceName())
.saveSyncPoint(changeDataSet.getSyncPoint());
}
@Override
public void onFail(ChangeDataSet changeDataSet, Throwable t) {
logger.info("Some problem occurred..." + t.getMessage());
}
})
.build();
syncer.start();
latch.await(300L, TimeUnit.SECONDS);
修改BinlogPositionDumpStrategy实现如下,
BinlogFileNamePositionDumpStrategy dumpStrategy = new BinlogFileNamePositionDumpStrategy();
dumpStrategy.setIsChecksumSupport(true);
如果使用本地同步点存储使用
LocalFileBinlogAndOffsetDisposeEventPosition
现在你已经具备了fountain的基本概念已经看过了Quick Start,更多内容内容索引见下。
MySQL 5.X对接-使用binlogname+position