用SQL来描述Reactor API. 可用SQL来实现数据处理逻辑,支持实时数据处理,支持聚合,分组,自定义函数等功能,让数据处理更简单.
Reactor + JSqlParser = ReactorQL
select name username from user
.count
,sum
,avg
,max
,min
.select val/100 percent from cpu_usage
.select sum(val) sum from topic group by interval('10s')
. 按时间分组.select count(1) total,productId,deviceId from messages group by productId,deviceId
.select avg(temp) avgTemp from temps group by interval('10s') having avgTemp>10
.select case type when 1 then '警告' when 2 then '故障' else '其他' end type from topic
.select t1.name,t2.detail from t1,t2 where t1.id = t2.id
.引入依赖
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>reactor-ql</artifactId>
<version>{version}</version>
</dependency>
用例:
ReactorQL.builder()
.sql("select avg(this) total from test group by interval('1s') having total > 2") //按每秒分组,并计算流中数据平均值,如果平均值大于2则下游收到数据.
.build()
.start(Flux.range(0, 10).delayElements(Duration.ofMillis(500)))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(4)
.verifyComplete();
更多用法请看 单元测试
解析SQL查询语句,生成SQL抽象语法树。
遍历SQL抽象语法树,使用策略模式,根据不同的语法类型,编译生成针对Flux
的转换函数。
FilterFeature
)进行创建。columnMapper
)使用ValueMapFeature
,ValueFlatMapFeature
,ValueAggMapFeature
进行创建。groupBy
)使用GroupFeature
进行创建。from
)使用FromFeature
进行创建。
Flux
数据流。ReactorQL
对象并执行获取数据源。FromFeature
进行创建。orderBy
)使用ValueMapFeature
编译转换函数,基于转换函数执行结果进行排序。join
),支持多种join源。
Flux
数据流进行数据关联。ReactorQL
对象并执行获取数据源进行数据关联。组合编译后各个片段对应的Flux
转换函数。
limit->offset->distinct->orderBy->columnMapper->groupBy->where->join->from
传入上下文执行。
当内置的特性不满足需求时,可通过自定义的方式进行拓展。
拓展了特性后,在启动时注册到元数据中。
import org.jetlinks.reactor.ql.ReactorQL;
public ReactorQL createQL(String sql) {
return ReactorQL
.builder()
.sql(sql)
//注册自定义的特性
.feature(customFeature1, customFeature2)
.build();
}
可通过拓展转换函数特性(ValueMapFeature
)来自定义数据转换等操作,
如实现 select device.state(t.deviceId) from "/device/**" t
。
import org.jetlinks.reactor.ql.supports.map.FunctionMapFeature;
// FunctionMapFeature针对ValueMapFeature实现了基础操作
public class DeviceStateFunction extends FunctionMapFeature {
public DeviceStateFunction() {
super("device.state",
1,//最大参数数量
1,//最小参数数量
flux -> flux
.collectList()//将响应式参数流中的数据收集为List
.flatMap(args -> {
if (args.size() != 1) {
return Mono.empty();
}
String deviceId = String.valueOf(args.get(0));
return getDeviceState(deviceId);
}));
}
}
可通过拓展数据源函数特性来自定义数据源,
如实现 select * from mysql(....)
。
例:
import net.sf.jsqlparser.statement.select.FromItem;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.FromFeature;
import reactor.core.publisher.Flux;
import java.util.function.Function;
// select * from mysql(....);
public class MysqlFromFeature implements FromFeature {
private static final String ID = FeatureId.From.of("mysql").getId();
@Override
public Function<ReactorQLContext, Flux<ReactorQLRecord>> createFromMapper(
FromItem fromItem,
ReactorQLMetadata metadata) {
TableFunction from = ((TableFunction) fromItem);
net.sf.jsqlparser.expression.Function function = from.getFunction();
//函数参数列表
ExpressionList list = function.getParameters();
//别名
String alias = from.getAlias() != null ? from.getAlias().getName() : null;
Object params = prepareParameter(list);
return ctx -> {
return this
.execute0(params)
.map(val -> {
return ReactorQLRecord.newRecord(alias, val, ctx);
});
};
}
public Flux<Object> execute0(Object args) {
//执行真实逻辑,返回数据流。
}
}