Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API.
Parallel Collectors is a toolkit easing parallel collection processing in Java using Stream API... but without limitations imposed by standard Parallel Streams.
list.stream()
.collect(parallel(i -> blockingOp(i), toList()))
.orTimeout(1000, MILLISECONDS)
.thenAcceptAsync(System.out::println, executor)
.thenRun(() -> System.out.println("Finished!"));
They are:
CompletableFuture
s, allowing for timeout specification, composition with other CompletableFuture
s, and asynchronous processing)Executor
s and parallelism levels)Collector
interface, no magic inside, zero-dependencies, no Stream API internals hacking)Collectors
)<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>2.6.0</version>
</dependency>
compile 'com.pivovarit:parallel-collectors:3.0.0'`
compile 'com.pivovarit:parallel-collectors:2.6.0'`
Parallel Collectors are intentionally unopinionated, leaving responsibility to users for:
Executor
s and their lifecycle managementReview the API documentation before deploying in production.
The main entrypoint is the com.pivovarit.collectors.ParallelCollectors
class - which follows the convention established by java.util.stream.Collectors
and features static factory methods returning custom java.util.stream.Collector
implementations spiced up with parallel processing capabilities.
By design, it's obligatory to supply a custom Executor
instance and manage its lifecycle.
All parallel collectors are one-off and must not be reused.
CompletableFuture<Stream<T>> parallel(Function)
(uses Virtual Threads)
CompletableFuture<Collection<T>> parallel(Function, Collector)
(uses Virtual Threads)
CompletableFuture<Stream<T>> parallel(Function, Executor, parallelism)
CompletableFuture<Collection<T>> parallel(Function, Collector, Executor, parallelism)
Stream<T> parallelToStream(Function)
(uses Virtual Threads)
Stream<T> parallelToOrderedStream(Function)
(uses Virtual Threads)
Stream<T> parallelToStream(Function, Executor, parallelism)
Stream<T> parallelToOrderedStream(Function, Executor, parallelism)
By default, all ExecutorService
threads compete for each task separately - which results in a basic form of work-stealing, which, unfortunately, is not free, but can decrease processing time for subtasks with varying processing time.
However, if the processing time for all subtasks is similar, it might be better to distribute tasks in batches to avoid excessive contention:
Batching alternatives are available under the ParallelCollectors.Batching
namespace.
Parallel Collectors™ expose results wrapped in CompletableFuture
instances which provides great flexibility and possibility of working with them in a non-blocking fashion:
CompletableFuture<List<String>> result = list.stream()
.collect(parallel(i -> foo(i), toList(), executor));
This makes it possible to conveniently apply callbacks, and compose with other CompletableFuture
s:
list.stream()
.collect(parallel(i -> foo(i), toSet(), executor))
.thenAcceptAsync(System.out::println, otherExecutor)
.thenRun(() -> System.out.println("Finished!"));
Or just join()
if you just want to block the calling thread and wait for the result:
List<String> result = list.stream()
.collect(parallel(i -> foo(i), toList(), executor))
.join();
What's more, since JDK9, you can even provide your own timeout easily.
i -> foo(i)
in parallel on a custom Executor
and collect to List
Executor executor = ...
CompletableFuture<List<String>> result = list.stream()
.collect(parallel(i -> foo(i), toList(), executor));
i -> foo(i)
in parallel on a custom Executor
with max parallelism of 4 and collect to Set
Executor executor = ...
CompletableFuture<Set<String>> result = list.stream()
.collect(parallel(i -> foo(i), toSet(), executor, 4));
i -> foo(i)
in parallel on a custom Executor
and collect to LinkedList
Executor executor = ...
CompletableFuture<List<String>> result = list.stream()
.collect(parallel(i -> foo(i), toCollection(LinkedList::new), executor));
i -> foo(i)
in parallel on a custom Executor
and stream results in completion orderExecutor executor = ...
list.stream()
.collect(parallelToStream(i -> foo(i), executor))
.forEach(i -> ...);
i -> foo(i)
in parallel on a custom Executor
and stream results in original orderExecutor executor = ...
list.stream()
.collect(parallelToOrderedStream(i -> foo(i), executor))
.forEach(i -> ...);
Stream API is a great tool for collection processing, especially if you need to parallelize execution of CPU-intensive tasks, for example:
public static void parallelSetAll(int[] array, IntUnaryOperator generator) {
Objects.requireNonNull(generator);
IntStream.range(0, array.length).parallel().forEach(i -> { array[i] = generator.applyAsInt(i); });
}
However, Parallel Streams execute tasks on a shared ForkJoinPool
instance.
Unfortunately, it's not the best choice for running blocking operations even when using ManagedBlocker
- as explained here by Tagir Valeev) - this could easily lead to the saturation of the common pool, and to a performance degradation of everything that uses it.
For example:
List<String> result = list.parallelStream()
.map(i -> foo(i)) // runs implicitly on ForkJoinPool.commonPool()
.toList();
In order to avoid such problems, the solution is to isolate blocking tasks and run them on a separate thread pool... but there's a catch.
Sadly, Streams can only run parallel computations on the common ForkJoinPool
which effectively restricts the applicability of them to CPU-bound jobs.
However, there's a trick that allows running parallel Streams in a custom FJP instance... but it's not considered reliable (and can still induce oversubscription issues while competing with the common pool for resources)
Note, however, that this technique of submitting a task to a fork-join pool to run the parallel stream in that pool is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified. By default, the common fork-join pool is used, but in different environments, different thread pools might end up being used.
Says Stuart Marks on StackOverflow.
Not even mentioning that this approach was seriously flawed before JDK-10 - if a Stream
was targeted towards another pool, splitting would still need to adhere to the parallelism of the common pool, and not the one of the targeted pool [JDK8190974].
None - the library is implemented using core Java libraries.
Upstream Stream
is always evaluated as a whole, even if the following operation is short-circuiting.
This means that none of these should be used for working with infinite streams. This limitation is imposed by the design of the Collector
API.
Never use Parallel Collectors with Executor
s with RejectedExecutionHandler
that discards tasks - this might result in a deadlock.
CompletableFuture
s in order to not block for unreasonably long in case when something bad happens (how-to)
ExecutorService
should be shut down to allow reclamation of its resourcesCompletableFuture#then(Apply|Combine|Consume|Run|Accept)
might be executed by the calling thread. If this is not suitable, use CompletableFuture#then(Apply|Combine|Consume|Run|Accept)Async
instead, and provide a custom Executor instance.While Parallel Collectors and Virtual Threads make parallelization easy, it doesn't always mean it's the best choice. Platform threads are resource-intensive, and parallelism comes with a cost.
Before opting for parallel processing, consider addressing the root cause through alternatives like DB-level JOIN statements, batching, data reorganization, or... simply selecting a more suitable API method.
See CHANGELOG.MD for a complete version history.