懒松鼠Flink-Boot 脚手架让Flink全面拥抱Spring生态体系,使得开发者可以以Java WEB开发模式开发出分布式运行的流处理程序,懒松鼠让跨界变得更加简单。懒松鼠旨在让开发者以更底上手成本(不需要理解分布式计算的理论知识和Flink框架的细节)便可以快速编写业务代码实现。为了进一步提升开发者使用懒松鼠脚手架开发大型项目的敏捷的度,该脚手架默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中经常用到的框架集成进来,进一步提升开发速度。比如集成Mybatis ORM框架,Hibernate Validator校验框架,Spring Retry重试框架等,具体见下面的脚手架特性。
技术 | 名称 | 状态 |
---|---|---|
Spring Framework | 容器 | 已集成 |
Spring 基于XML方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解声明方法重试机制 | Retry注解 | 已集成 |
Spring 基于注解声明方法缓存 | Cache注解 | 已集成 |
Hibernate Validator | 校验框架 | 已集成 |
Dubbole消费者 | 服务消费者 | 已集成 |
Dubbo服务提供者 | 分布式服务框架 | 已集成 |
DRPC接口 | flink对外暴露dprc接口提供接口服务 | 已集成 |
Apollo配置中心 | 携程阿波罗配置中心 | 已集成 |
Druid | 数据库连接池 | 已集成 |
MyBatis | ORM框架 | 已集成 |
Kafka | 消息队列 | 已集成 |
Redis | 分布式缓存数据库 | 已集成 |
分布式锁 | 基于Redis的分布式锁实现 | 已集成 |
HDFS | 分布式文件系统 | 已集成 |
Hbase | No-Sql数据库 | 已集成 |
groovy | 规则表达式 | 已集成 |
Junit | 单元测试 | 已集成 |
MDC | 分布式日志链路追踪 | 已集成 |
ZooKeeper | 分布式协调服务 | 进行中 |
Solr & Elasticsearch | 分布式全文搜索引擎 | 进行中 |
Ehcache | 进程内缓存框架 | 进行中 |
sequence | 分布式高效ID生产 | 进行中 |
Spring eurake消费者 | 服务消费者 | 进行中 |
Spring Config配置中心 | Spring Cloud Config配置中心 | 进行中 |
你的现状
static Map<String,String> cache=new HashMap<String,String>();
public String findUUID(FlowData flowData) {
String value=cache.get(flowData.getSubTestItem());
if(value==null)
{
String uuid=userMapper.findUUID(flowData);
cache.put(uuid,value);
return uuid;
}
return value;
}
你想要的是这样
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")
public String findUUID(FlowData flowData) {
return userMapper.findUUID(flowData);
}
你的现状
public void insertFlow(FlowData flowData) {
try{
userMapper.insertFlow(flowData);
}Cache(Exception e)
{
Thread.sleep(10000);
userMapper.insertFlow(flowData);
}
}
你想要的是这样
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
@Override
public void insertFlow(FlowData flowData) {
userMapper.insertFlow(flowData);
}
你的现状
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7)
{
return null;
}
if(flowData.getBillNumber()==null)
{
return null;
}
你想要的是这样
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);
if (validate != null) {
System.out.println(validate);
return null;
}
public class FlowData {
private String uuid;
//声明该参数的校验规则字符串长度必须在7到20之间
@Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间")
private String subTestItem;
//声明该参数的校验规则字符串不能为空
@NotBlank(message = "billNumber不能为空")
private String billNumber;
}
Flink-Boot
├── Flink-Base -- Flink-Boot工程基础模块/Engineering basic module
├── Flink-Client -- Flink-Boot 客户端模块/Client module
├── flink-annotation -- 注解生效模块/Annotation effective module
├── flink-mybatis -- mybatis orm模块/mybatis orm module
├── flink-retry -- 注解重试机制模块/Annotation retry mechanism module
├── flink-validate -- 校验模块/validate module
├── flink-sql -- Flink SQL解耦至XML配置模块/SQL decoupling to XML configuration module
├── flink-cache-annotation -- 接口缓冲模块/Interface buffer module
├── flink-dubbo-comsumer -- Dubbo 消费组模块/Dubbo comsumer module
├── flink-hbase -- Hbase 模块/Hbase module
├── flink-redis -- redis 模块/redis module
├── flink-drpc -- Drpc 模块/DRPC module
├── flink-other-service -- 组件原生运行模块
├── flink-junit -- 单元测试模块/Unit test module
├── flink-apollo -- 阿波罗配置客户端模块/Apollo configuration client module
下面是集成Spring生态的基础手册,通过目录中的联系我标题可以找到我已获取更详细的开发手册,当然技术过硬自己摸索也只需3小时即可上手所有模块。
该容器模式配置了JdbcTemplate实例,数据库连接池采用Druid,在业务方法中只需要获取容器中的JdbcTemplate实例便可以快速与关系型数据库进行交互,dataService实例封装了一些访问数据库表的方法。
<beans ......
default-lazy-init="true" default-init-method="init">
<context:property-placeholder location="classpath:config.properties"/>
<bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="${jdbc.url}"></property>
<property name="username" value="${jdbc.user}"></property>
<property name="password" value="${jdbc.password}"></property>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg ref="druidDataSource"></constructor-arg>
</bean>
<bean id="dataService" class="com.intsmaze.flink.base.service.DataService">
<property name="jdbcTemplate" ref="jdbcTemplate"></property>
</bean>
</beans>
jdbc.user = intsmaze
jdbc.password = intsmaze
jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8
如下是SimpleClient(com.intsmaze.flink.client.SimpleClient)类的示例代码,该类继承了BaseFlink,可以看到对应实现的方法中分别设置如下:
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版书籍《深入理解Flink核心设计与实践原理》
*
* @auther: intsmaze(刘洋)
* @date: 2020/10/15 18:33
*/
public class SimpleClient extends BaseFlink {
public static void main(String[] args) throws Exception {
SimpleClient topo = new SimpleClient();
topo.run(ParameterTool.fromArgs(args));
}
@Override
public String getTopoName() {
return "SimpleClient";
}
@Override
public String getConfigName() {
return "topology-base.xml";
}
@Override
public String getPropertiesName() {
return "config.properties";
}
@Override
public void createTopology(StreamExecutionEnvironment builder) {
DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource());
DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction());
processDataStream.print("输出结果");
}
}
采用自定义数据源,用户需要编写自定义DataSource类,该类需要继承XXX抽象类,实现如下方法。
public class SimpleDataSource extends CommonDataSource {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
......
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
...//构造读取各类外部系统数据的连接实例
}
@Override
public String sendMess() throws InterruptedException {
Thread.sleep(1000);
......
MainData mainData = new MainData();
......//通过外部系统数据的连接实例读取外部系统数据,封装进MainData对象中,然后返回即可。
return gson.toJson(mainData);
}
}
本作业计算的业务逻辑在Flink转换操作符中进行实现,一般来说开发者只需要实现flatMap算子即可以满足大部分算子的使用。
用户编写的自定义类需要继承com.intsmaze.flink.base.transform.CommonFunction抽象类,均需实现如下方法。
public class SimpleFunction extends CommonFunction {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
@Override
public String execute(String message) throws Exception {
FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {
}.getType());
String flowUUID = dataService.findUUID(flowData);
if (StringUtils.isBlank(flowUUID)) {
flowUUID = UUID.randomUUID().toString();
flowData.setUuid(flowUUID);
dataService.insertFlow(flowData);
}
return gson.toJson(flowData);
}
}
CommonFunction抽象类中默认在open方法中通过BeanFactory对象获取到了Spring容器中对于的dataService实例,对于Spring中的其他实例同理在SimpleFunction类中的open方法中获取即可。
public abstract class CommonFunction extends RichFlatMapFunction<String, String> {
private IntCounter numLines = new IntCounter();
protected DataService dataService;
protected ApplicationContext beanFactory;
@Override
public void open(Configuration parameters) {
getRuntimeContext().addAccumulator("num-FlatMap", this.numLines);
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);
dataService = beanFactory.getBean(DataService.class);
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
this.numLines.add(1);
String execute = execute(value);
if (StringUtils.isNotBlank(execute)) {
out.collect(execute);
}
}
public abstract String execute(String message) throws Exception;
}
可以根据情况选择重写open(Configuration parameters)方法,同时重写的open(Configuration parameters)方法的第一行要调用父类的open(Configuration parameters)方法。
public void open(Configuration parameters){
super.open(parameters);
......
//获取在Spring配置文件中配置的实例
XXX xxx=beanFactory.getBean(XXX.class);
}
在自定义的Topology类编写Main方法,创建自定义的Topology对象后,调用对象的run(...)方法。 public class SimpleClient extends BaseFlink {
/**
* 本地启动参数 -isLocal local
* 集群启动参数 -isIncremental isIncremental
*/
public static void main(String[] args) throws Exception {
SimpleClient topo = new SimpleClient();
topo.run(ParameterTool.fromArgs(args));
}
.......
演示地址: 框架快速演示视频
3.Flink集成Spring,创建对象一个注解搞定(再也不用new)
4.Flink集成Spring Retry注解,方式重试一个注解搞定
5. Flink集成Spring Cache注解,一行代码实现方法缓存
6.Flink集成mybatis框架,提效sql维表查询开发能力
8.Flink 计算服接口化--DRPC 对外提供接口化服务
9.Flink 借助JAVA语言三大特性实现动态规则引擎实现
在开发cep系统中,遇到某些规则需要用http的协议向远程服务发送请求获取某些结果后,在运用EL表达式进行校验。这个时候,我么需要编写新的java类来支持这一功能,但是编写java类需要重新停机发布,如何解决停机发布的问题就本架构解决方案。
一致性hash算法的实现,没有找到比较权威的官方API,百度网友博客的算法应用生产系统毕竟有风险,我发现jedis客户端封装了一致性hash算法的实现。所以我将jeds的一致性hash算法提取出来,进行保障以供生产系统使用。 关于一致性hash算法没有热点问题,我通过执行代码发现这个观点有待商榷。 把jedis的源码提取出来后,跑了一下,发现没有热点问题。原因不是采用的算法问题,而是一个物理节点对应的虚拟节点的数量的问题导致使用hash算法后是否存在有热点问题。 jedis源码物理节点对应虚拟节点时160,而网上大部分代码的虚拟节点都是10以下,所以导致了热点问题,这也告诉我们,实现一致性Hash算法时,不要太吝啬,虚拟节点设置的大点,热点问题就不会再有。
本地IDEA运行出现NoClassDefFoundError,在idea的run configurations中勾选 include dependenencies with "Provided" scope.具体见下图箭筒1所指。
java.lang.NoClassDefFoundError: org/apache/flink/runtime/state/StateBackend
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.runtime.state.StateBackend
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main"
本地IDEA运行出现如下异常,在idea的run configurations中program arguments文本框中输入-isLocal local。具体见上图箭筒2所指。
Exception in thread "main" java.lang.NullPointerException: isIncremental is null
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at com.intsmaze.flink.base.env.BaseFlink.init(BaseFlink.java:97)
at com.intsmaze.flink.base.env.BaseFlink.run(BaseFlink.java:213)
at com.intsmaze.flink.client.RetryClient.main(RetryClient.java:29)
本地和集群运行出现 No qualifying bean异常,缺少spring核心配置参数,找我索要配置参数即可解决。
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.intsmaze.flink.base.service.DataService' available
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:348)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:335)
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1101)
at com.intsmaze.flink.base.transform.BuiltinRichFlatMapFunction.open(BuiltinRichFlatMapFunction.java:50)
at com.intsmaze.flink.retry.task.RetryFlatMap.open(RetryFlatMap.java:39)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
首先谢谢大家支持,如果你希望参与开发,欢迎通过Github上fork本项目,并Pull Request您的commit。
微信号:intsmaze 微信二维码无法显示可跳转该页面扫码,微信转账即可 88.8元会员你可以获得什么?
这个项目已经卖了200份了,出售的安装手册已经很完善了,你按照安装手册的步骤,这个项目绝对跑的起来,如果还报错就看《常见问题汇总》,基本是使用姿势不对。 这一套项目没有卖几千几百,就卖几十块钱,也没指望靠它发财,所以拿到安装手册,请安装步骤让项目的所有功能run起来后再加入自己主观意识的逻辑,否则因为你自己 原因跑失败来频繁问我,耽误我精力。 一个普通程序员1W6工资,一个小时工资也是100块,为你这几十块以及你自己的失误(不是我项目原因)和你频繁沟通浪费我半个小时精力我真的做不到,希望你能换位思考,不要用PUA的思想。 如果你是下面这种人,也请你不要加我微信购买,不想为了几十块钱影响我几个小时的心情。 图片如果渲染不出来,见image目录下消息X的四个图片(聊天记录)
目前一些在一些互联网公司的开发小哥都加我微信买了配置文件,BAT等的开发小哥都有购买配置文件进行学习。
详细文档目录结构如下: