Cronner 是一个分布式定时任务框架,支持作业依赖、分片、失效转移、集中配置和监控。
Cronner 是一个分布式定时任务框架,支持作业依赖、作业分片、失效转移、集中配置和监控。
- JDK 1.7 or above
- 编译工具 Maven
以java -jar的方式启动需要配置插件
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<requiresUnpack>
<dependency>
<groupId>org.jfaster.cronner</groupId>
<artifactId>cronner-executor</artifactId>
</dependency>
</requiresUnpack>
</configuration>
</plugin>
添加依赖.
<dependency>
<groupId>org.jfaster.cronner</groupId>
<artifactId>spring-boot-starter-cronner</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.jfaster.cronner</groupId>
<artifactId>spring-boot-starter-zookeeper</artifactId>
<version>1.0.1</version>
</dependency>
创建一个作业。
src/main/java/cronner/jfaster/org/example/job/listener/JobListenerExample.java
package cronner.jfaster.org.example.job.listener;
import cronner.jfaster.org.executor.ShardingContexts;
import cronner.jfaster.org.job.api.listener.CronnerJobListener;
/**
* @author fangyanpeng
*/
public class JobListenerExample implements CronnerJobListener{
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println(String.format("----SpringBoot Job: %s begin----",shardingContexts.getJobName()));
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println(String.format("----SpringBoot Job: %s end----",shardingContexts.getJobName()));
}
}
src/main/java/cronner/jfaster/org/example/job/SimpeCronnerJobSpringboot.java
package cronner.jfaster.org.example.job;
import cronner.jfaster.org.job.annotation.Job;
import cronner.jfaster.org.example.job.listener.JobListenerExample;
import cronner.jfaster.org.job.api.ShardingContext;
import cronner.jfaster.org.job.api.simple.SimpleJob;
import java.util.concurrent.TimeUnit;
/**
* @author fangyanpeng
*/
@Job(name = "cronner-simple-job",listener = JobListenerExample.class)
public class SimpeCronnerJobSpringboot implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("jobName=%s,jobParameter=%s,shardingItem=%s,shardingParameter=%s",shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingItem(),shardingContext.getShardingParameter()));
}
}
src/main/java/cronner/jfaster/org/example/job/DataflowCronnerJobSpring.java
package cronner.jfaster.org.example.job;
import cronner.jfaster.org.job.annotation.Job;
import cronner.jfaster.org.example.job.listener.JobListenerExample;
import cronner.jfaster.org.job.api.ShardingContext;
import cronner.jfaster.org.job.api.dataflow.DataflowJob;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author fangyanpeng
*/
@Job(name = "cronner-dataflow-job",listener = JobListenerExample.class)
public class DataflowCronnerJobSpring implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList("hello","cronner");
}
@Override
public void processData(ShardingContext shardingContext, List<String> data) {
System.out.println(String.format("jobName=%s,jobParameter=%s,shardingItem=%s,shardingParameter=%s",shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingItem(),shardingContext.getShardingParameter()));
for (String str : data){
System.out.println(str);
}
}
}
配置yml文件,并启动作业。
########### Zookeeper Configuration Start ###########
zookeeper:
serverLists: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
namespace: cronner
baseSleepTimeMilliseconds: 1000
maxSleepTimeMilliseconds: 3000
maxRetries: 3
############ Zookeeper Configuration End ###########
########### cronner packege Configuration Start ###########
cronner:
package: cronner.jfaster.org.example.job
############ cronner packege Configuration End ###########
src/main/java/cronner/jfaster/org/example/CronnerSpringBootMain.java
package cronner.jfaster.org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
/**
* @author fangyanpeng
*/
@SpringBootApplication
public class CronnerSpringBootMain {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(CronnerSpringBootMain.class,args);
}
}
执行main方法,就会启动作业,就是一个执行节点。每个执行节点都会启动一个端口接收调度节点的命令,默认端口9233。如果同一台机器启动多个执行节点,通过java -Dcronner.executor.port=8888指定端口,避免端口冲突。
添加依赖.
<dependency>
<groupId>org.jfaster.cronner</groupId>
<artifactId>cronner-spring-support</artifactId>
<version>1.0.1</version>
</dependency>
创建一个作业。
src/main/java/cronner/jfaster/org/example/job/listener/JobListenerExample.java
package cronner.jfaster.org.example.job.listener;
import cronner.jfaster.org.executor.ShardingContexts;
import cronner.jfaster.org.job.api.listener.CronnerJobListener;
/**
* @author fangyanpeng
*/
public class JobListenerExample implements CronnerJobListener{
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println(String.format("----Spring Job: %s begin----",shardingContexts.getJobName()));
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println(String.format("----Spring Job: %s end----",shardingContexts.getJobName()));
}
}
src/main/java/cronner/jfaster/org/example/job/SimpeCronnerJobSpring.java
package cronner.jfaster.org.example.job;
import cronner.jfaster.org.job.annotation.Job;
import cronner.jfaster.org.example.job.listener.JobListenerExample;
import cronner.jfaster.org.job.api.ShardingContext;
import cronner.jfaster.org.job.api.simple.SimpleJob;
import java.util.concurrent.TimeUnit;
/**
* @author fangyanpeng
*/
@Job(name = "cronner-simple-job",listener = JobListenerExample.class)
public class SimpeCronnerJobSpring implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("jobName=%s,jobParameter=%s,shardingItem=%s,shardingParameter=%s",shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingItem(),shardingContext.getShardingParameter()));
}
}
src/main/java/cronner/jfaster/org/example/job/DataflowCronnerJobSpringboot.java
package cronner.jfaster.org.example.job;
import cronner.jfaster.org.job.annotation.Job;
import cronner.jfaster.org.example.job.listener.JobListenerExample;
import cronner.jfaster.org.job.api.ShardingContext;
import cronner.jfaster.org.job.api.dataflow.DataflowJob;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
*
*/
@Job(name = "cronner-dataflow-job",listener = JobListenerExample.class)
public class DataflowCronnerJobSpringboot implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Arrays.asList("hello","cronner");
}
@Override
public void processData(ShardingContext shardingContext, List<String> data) {
System.out.println(String.format("jobName=%s,jobParameter=%s,shardingItem=%s,shardingParameter=%s",shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingItem(),shardingContext.getShardingParameter()));
for (String str : data){
System.out.println(str);
}
}
}
配置xml文件,并启动作业。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:cronner="http://cronner.jfaster.org/schema/cronner"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://cronner.jfaster.org/schema/cronner
http://cronner.jfaster.org/schema/cronner/cronner.xsd
">
<context:component-scan base-package="cronner.jfaster.org.example" />
<context:annotation-config/>
<!--zookeeper配置-->
<cronner:zookeeper server-lists="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" namespace="cronner" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/>
<!--cronner扫描包配置-->
<cronner:job-scan package="cronner.jfaster.org.example.job"/>
</beans>
src/main/java/cronner/jfaster/org/example/CronnerSpringMain.java
package cronner.jfaster.org.example;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.CountDownLatch;
/**
* @author fangyanpeng
*/
public class CronnerSpringMain {
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:job.xml");
ctx.start();
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}
}
执行main方法,就会启动作业,就又启动了一个执行节点。相同的作业名会构成一个执行集群,整个集群节点间进行分片和失效转移等。例如上面启动的两个进程,就构成了两个分别以cronner-simple-job和cronner-dataflow-job为标识的作业执行集群。
以上部分启动的都是执行节点,接下来启动调度节点。调度节点用于作业配置、作业调度、作业分发、收集执行节点的作业执行情况。 为了保证高可用,调度节点至少启动两个,一个down了,其它调度节点可以自动接管任务调度。 调度节点提供管理界面,用于作业的集中配置和管理。
添加作业的名称要与执行节点的作业名称一致才能调度。