响应式并发批处理
假设 DataProcessor 接口定义了方法 batchProcess 能够对一批数据进行处理,一批处理 500 个数据。现在我们需要对一个响应式数据流 Flux
public interface DataProcessor {
Mono<String> batchProcess(List<DataItem> dataItems);
... ...
}
DataProcessor dataProcessor = ...;
int batchSize = 500;
Flux<DataItem> dataItems = ...
下面分别以串行和并行的方式展示一下 Reactor API 的使用。
1)攒够 batchSize 个数据后进行处理。
Mono<List<String>> result = dataItems.buffer(batchSize)
.flatMap(dataProcessor::batchProcess)
.collectList();
2)以并行的方式,把流分成 10 股,每股攒够 batchSize 个数据后进行处理。
Mono<List<String>> result = dataItems
.parallel(10)
.runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(10)))
.groups()
.flatMap(g -> g.buffer(batchSize).flatMap(dataProcessor::batchProcess))
.collectList();
这里 runOn 接收的参数可以是 Schedulers 不同策略的实现,具有不同适用范围,比如适合计算密集型的 ParallelScheduler、单线程的 SingleScheduler。这里使用的是 Executors FixedThreadPool。
可以想象如果我们自己实现这样一个处理逻辑的复杂度,而通过 reactor api,仅仅几行代码就完成了这么复杂高效的处理。
3)使用 reactive mongo driver 需要的线程。
Spring 默认到 monog 的链接池最大为 100,但是实际上在使用 reactive 方式访问时使用 20 ~ 10 个左右的线程就足够了。因此对 mongog 的连接串最好明确使用适合自己情况的连接数以避免连接浪费或不够。
测试了一个 70 万条、大概 250M 数据的批量插入,发现无论使用串行还是并行,数据库插入时间都差不多(36s ~ 26s)。而连接池最大连接设为 200、100、50、20、10 对数据库插入的性能也没有太大影响,200 个线程时反而有一点下降。这个情况从 mongo 响应式驱动的角度去解释是完全可以理解的,如果使用传统驱动,恐怕所需的线程就不是这个量级的了。