diff --git a/src/main/java/com/pivovarit/collectors/AbstractParallelCollector.java b/src/main/java/com/pivovarit/collectors/AbstractParallelCollector.java deleted file mode 100644 index 80c631cb..00000000 --- a/src/main/java/com/pivovarit/collectors/AbstractParallelCollector.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collector; - -/** - * @author Grzegorz Piwowarek - */ -sealed abstract class AbstractParallelCollector - implements Collector>, R> - permits AsyncParallelCollector, AsyncParallelStreamingCollector { - - protected final Dispatcher dispatcher; - protected final Function task; - - protected AbstractParallelCollector(Function task, Dispatcher dispatcher) { - this.task = task; - this.dispatcher = dispatcher; - } - - abstract Function>, R> finalizer(); - - @Override - public final Supplier>> supplier() { - return ArrayList::new; - } - - @Override - public final BiConsumer>, T> accumulator() { - return (acc, e) -> { - if (!dispatcher.wasStarted()) { - dispatcher.start(); - } - acc.add(dispatcher.submit(() -> task.apply(e))); - }; - } - - @Override - public BinaryOperator>> combiner() { - return (left, right) -> { - throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported"); - }; - } - - @Override - public final Function>, R> finisher() { - return list -> { - dispatcher.stop(); - return finalizer().apply(list); - }; - } - - @Override - public Set characteristics() { - return Collections.emptySet(); - } -} diff --git a/src/main/java/com/pivovarit/collectors/AsyncCollector.java b/src/main/java/com/pivovarit/collectors/AsyncCollector.java deleted file mode 100644 index d4c572a6..00000000 --- a/src/main/java/com/pivovarit/collectors/AsyncCollector.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collector; -import java.util.stream.Stream; - -record AsyncCollector(Function mapper, Function, RR> processor, Executor executor) - implements Collector, CompletableFuture> { - - @Override - public Supplier> supplier() { - return Stream::builder; - } - - @Override - public BiConsumer, T> accumulator() { - return Stream.Builder::add; - } - - @Override - public BinaryOperator> combiner() { - return (left, right) -> { - throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported"); - }; - } - - @Override - public Function, CompletableFuture> finisher() { - return acc -> { - try { - return CompletableFuture.supplyAsync(() -> processor.apply(acc.build().map(mapper)), executor); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } - }; - } - - @Override - public Set characteristics() { - return Set.of(); - } -} diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java deleted file mode 100644 index 430fad50..00000000 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collector; -import java.util.stream.Stream; - -import static com.pivovarit.collectors.BatchingSpliterator.partitioned; -import static java.util.concurrent.CompletableFuture.allOf; - -/** - * @author Grzegorz Piwowarek - */ -final class AsyncParallelCollector extends AbstractParallelCollector> { - - private final Function, C> finalizer; - - AsyncParallelCollector(Function task, Dispatcher dispatcher, Function, C> finalizer) { - super(task, dispatcher); - this.finalizer = finalizer; - } - - @Override - public Function>, CompletableFuture> finalizer() { - return futures -> { - var combined = allOf(futures.toArray(CompletableFuture[]::new)) - .thenApply(__ -> futures.stream().map(CompletableFuture::join)); - - for (var future : futures) { - future.whenComplete((o, ex) -> { - if (ex != null) { - combined.completeExceptionally(ex); - } - }); - } - - return combined.thenApply(finalizer); - }; - } - - record BatchingCollector(Function task, Function, C> finalizer, - Executor executor, int parallelism) - implements Collector, CompletableFuture> { - - @Override - public Supplier> supplier() { - return ArrayList::new; - } - - @Override - public BiConsumer, T> accumulator() { - return ArrayList::add; - } - - @Override - public BinaryOperator> combiner() { - return (left, right) -> { - throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported"); - }; - } - - @Override - public Function, CompletableFuture> finisher() { - return items -> { - if (items.size() == parallelism) { - return items.stream() - .collect((Collector>) new AsyncParallelCollector(task, new Dispatcher<>(executor, parallelism), finalizer)); - } else { - return partitioned(items, parallelism) - .collect((Collector, ?, CompletableFuture>) new AsyncParallelCollector, List, C>((Function, ? extends List>) batch -> { - List list = new ArrayList<>(batch.size()); - for (T t : batch) { - list.add(task.apply(t)); - } - return list; - }, new Dispatcher<>(executor, parallelism), r -> finalizer.apply(r.flatMap(Collection::stream)))); - } - }; - } - - @Override - public Set characteristics() { - return Collections.emptySet(); - } - } -} diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelStreamingCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelStreamingCollector.java deleted file mode 100644 index 5be717f8..00000000 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelStreamingCollector.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collector; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -import static com.pivovarit.collectors.BatchingSpliterator.batching; -import static com.pivovarit.collectors.BatchingSpliterator.partitioned; -import static java.util.Collections.emptySet; - -/** - * @author Grzegorz Piwowarek - */ -final class AsyncParallelStreamingCollector extends AbstractParallelCollector> { - - private static final EnumSet UNORDERED_CHARACTERISTICS = EnumSet.of(Characteristics.UNORDERED); - - private final CompletionStrategy completionStrategy; - - AsyncParallelStreamingCollector(Function task, Dispatcher dispatcher, boolean ordered) { - super(task, dispatcher); - this.completionStrategy = ordered ? CompletionStrategy.ORDERED : CompletionStrategy.UNORDERED; - } - - @Override - public Function>, Stream> finalizer() { - return acc -> switch (completionStrategy) { - case ORDERED -> acc.stream().map(CompletableFuture::join); - case UNORDERED -> StreamSupport.stream(new CompletionOrderSpliterator<>(acc), false); - }; - } - - @Override - public Set characteristics() { - return switch (completionStrategy) { - case ORDERED -> emptySet(); - case UNORDERED -> UNORDERED_CHARACTERISTICS; - }; - } - - record BatchingCollector(Function task, Executor executor, int parallelism, boolean ordered) - implements Collector, Stream> { - - @Override - public Supplier> supplier() { - return ArrayList::new; - } - - @Override - public BiConsumer, T> accumulator() { - return ArrayList::add; - } - - @Override - public BinaryOperator> combiner() { - return (left, right) -> { - throw new UnsupportedOperationException("using parallel stream with parallel collectors is not supported"); - }; - } - - @Override - public Function, Stream> finisher() { - return items -> { - if (items.size() == parallelism) { - return items.stream() - .collect(new AsyncParallelStreamingCollector<>(task, new Dispatcher<>(executor, parallelism), ordered)); - } else { - return partitioned(items, parallelism) - .collect(new AsyncParallelStreamingCollector<>(batching(task), new Dispatcher<>(executor, parallelism), ordered)) - .flatMap(Collection::stream); - } - }; - } - - @Override - public Set characteristics() { - return emptySet(); - } - } -} diff --git a/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java b/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java index ea8edd4d..c5c8941f 100644 --- a/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java +++ b/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java @@ -17,112 +17,26 @@ import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.Spliterator; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Stream; -import static java.util.stream.Stream.empty; -import static java.util.stream.Stream.of; -import static java.util.stream.StreamSupport.stream; +final class BatchingSpliterator { -/** - * @author Grzegorz Piwowarek - */ -final class BatchingSpliterator implements Spliterator> { - - private final List source; - - private int chunks; - private int chunkSize; - private int consumed; - - BatchingSpliterator(List list, int batches) { - Objects.requireNonNull(list, "list can't be null"); - if (batches < 1) { - throw new IllegalArgumentException("batches can't be lower than one"); - } - source = list; - chunks = list.isEmpty() ? 0 : Math.min(batches, list.size()); - chunkSize = (int) Math.ceil(((double) source.size()) / batches); - } - - static Stream> partitioned(List list, int numberOfParts) { - int size = list.size(); - - if (size == 0) { - return empty(); - } else if (numberOfParts == 1) { - return of(list); - } else if (size <= numberOfParts) { - return asSingletonListStream(list); - } else { - return stream(new BatchingSpliterator<>(list, numberOfParts), false); - } - } - - private static Stream> asSingletonListStream(List list) { - Stream.Builder> acc = Stream.builder(); - for (T t : list) { - acc.add(List.of(t)); - } - return acc.build(); + private BatchingSpliterator() { } - static Function, List> batching(Function mapper) { - return batch -> { - List list = new ArrayList<>(batch.size()); - for (T t : batch) { - list.add(mapper.apply(t)); - } - return list; - }; - } - - @Override - public boolean tryAdvance(Consumer> action) { - if (consumed < source.size() && chunks != 0) { - int end = Math.min(source.size(), consumed + chunkSize); - List batch = source.subList(consumed, end); - consumed += batch.size(); - if (--chunks > 0) { - chunkSize = (int) Math.ceil(((double) (source.size() - consumed)) / chunks); - } - action.accept(batch); - return true; - } else { - return false; + static List> partition(List items, int batches) { + if (items.isEmpty()) { + return List.of(); } - } - - @Override - public Spliterator> trySplit() { - int remaining = source.size() - consumed; - if (remaining <= chunkSize || chunks <= 1) { - return null; + int count = Math.min(batches, items.size()); + int base = items.size() / count; + int remainder = items.size() % count; + List> result = new ArrayList<>(count); + int offset = 0; + for (int i = 0; i < count; i++) { + int size = base + (i < remainder ? 1 : 0); + result.add(new ArrayList<>(items.subList(offset, offset + size))); + offset += size; } - - int midChunks = chunks / 2; - int midSize = Math.min(midChunks * chunkSize, source.size() - consumed); - - var subList = source.subList(consumed, consumed + midSize); - var split = new BatchingSpliterator<>(subList, midChunks); - - consumed += midSize; - chunks -= midChunks; - chunkSize = (int) Math.ceil(((double) (source.size() - consumed)) / chunks); - - return split; - } - - @Override - public long estimateSize() { - return chunks; - } - - @Override - public int characteristics() { - return IMMUTABLE | ORDERED | SIZED; + return result; } } diff --git a/src/main/java/com/pivovarit/collectors/CollectingConfigurer.java b/src/main/java/com/pivovarit/collectors/CollectingConfigurer.java index a9eaddcf..44f91859 100644 --- a/src/main/java/com/pivovarit/collectors/CollectingConfigurer.java +++ b/src/main/java/com/pivovarit/collectors/CollectingConfigurer.java @@ -15,14 +15,9 @@ */ package com.pivovarit.collectors; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Executor; -import java.util.function.UnaryOperator; +import java.util.function.Function; /** * Fluent configuration builder for collectors that collect all results (i.e. non-streaming). @@ -32,43 +27,30 @@ */ public final class CollectingConfigurer { - private final List modifiers = new ArrayList<>(); - private final Set> seen = new HashSet<>(); + Integer parallelism; + Executor executor; + boolean batching; + Function taskDecorator; + Function executorDecorator; CollectingConfigurer() { } - /** - * Enables batching of work submitted to workers. - *

- * When enabled, each worker thread receives a batch of input items and processes them in one go, - * instead of scheduling one task per item. This reduces the number of tasks created and typically - * decreases contention on the underlying worker queue. - * - *

Note: Depending on batch sizing and workload skew, batching may reduce load balancing and - * can lead to thread starvation (some workers become idle while others remain overloaded). - * - * @return this configurer instance for fluent chaining - */ - public CollectingConfigurer batching() { - addOnce(ConfigProcessor.Option.Batched.INSTANCE); - return this; - } - /** * Sets the maximum level of parallelism. *

* This limits the number of tasks submitted to the worker queue at once, effectively bounding * the amount of in-flight work and the maximum concurrency used by the collector. * - * @param parallelism the desired parallelism level (must be positive) + * @param p the desired parallelism level (must be positive) * * @return this configurer instance for fluent chaining */ - public CollectingConfigurer parallelism(int parallelism) { - Preconditions.requireValidParallelism(parallelism); - - addOnce(new ConfigProcessor.Option.Parallelism(parallelism)); + public CollectingConfigurer parallelism(int p) { + if (p < 1) { + throw new IllegalArgumentException("parallelism must be greater than 0"); + } + this.parallelism = p; return this; } @@ -79,76 +61,63 @@ public CollectingConfigurer parallelism(int parallelism) { * {@code RejectedExecutionHandler} that discards submitted work). Dropping tasks will cause the * collector to wait for results that will never be produced, which can lead to deadlocks. * - * @param executor the executor to use + * @param e the executor to use * * @return this configurer instance for fluent chaining */ - public CollectingConfigurer executor(Executor executor) { - Preconditions.requireValidExecutor(executor); - - addOnce(new ConfigProcessor.Option.ThreadPool(executor)); + public CollectingConfigurer executor(Executor e) { + this.executor = Objects.requireNonNull(e, "executor can't be null"); return this; } /** - * Decorates the executor used for running tasks. + * Enables batching of work submitted to workers. *

- * The decorator receives the resolved executor (either the default virtual-thread executor or - * the one provided via {@link #executor(Executor)}) and returns a wrapped replacement. - * This is useful for augmenting the executor with cross-cutting concerns such as context - * propagation (MDC, OpenTelemetry spans, etc.) or monitoring, without replacing the executor entirely. - * - *

Note: The executor returned by the decorator must not drop tasks on rejection. - * Dropping tasks will cause the collector to wait for results that will never be produced, - * which can lead to deadlocks. + * When enabled, each worker thread receives a batch of input items and processes them in one go, + * instead of scheduling one task per item. This reduces the number of tasks created and typically + * decreases contention on the underlying worker queue. * - * @param decorator a function that wraps the resolved executor + *

Note: Depending on batch sizing and workload skew, batching may reduce load balancing and + * can lead to thread starvation (some workers become idle while others remain overloaded). * * @return this configurer instance for fluent chaining */ - public CollectingConfigurer executorDecorator(UnaryOperator decorator) { - Objects.requireNonNull(decorator, "executor decorator can't be null"); - - addOnce(new ConfigProcessor.Option.ExecutorDecorator(decorator)); + public CollectingConfigurer batching() { + this.batching = true; return this; } /** - * Decorates each individual task before it is submitted to the executor. + * Wraps each task with the provided decorator before submission. *

- * The decorator receives the {@link Runnable} representing a single unit of work and returns a - * wrapped replacement that runs in its place. This is useful for propagating thread-local context - * (e.g. MDC entries, OpenTelemetry spans, {@code SecurityContext}) into worker threads, or for - * per-task instrumentation, without replacing the executor entirely. + * Useful for propagating thread-local context, adding instrumentation, or applying cross-cutting + * concerns (e.g. logging, tracing) around the execution of each mapper invocation. * - *

Unlike {@link #executorDecorator(UnaryOperator)}, which wraps the executor as a whole, - * this decorator is applied to each task individually and runs on the worker thread. - * - * @param decorator a function that wraps each submitted task + * @param decorator function that wraps the original {@code Runnable} * * @return this configurer instance for fluent chaining */ - public CollectingConfigurer taskDecorator(UnaryOperator decorator) { + public CollectingConfigurer taskDecorator(Function decorator) { Objects.requireNonNull(decorator, "task decorator can't be null"); - - addOnce(new ConfigProcessor.Option.TaskDecorator(decorator)); - return this; - } - - List getConfig() { - return Collections.unmodifiableList(modifiers); - } - - void validate() { - if (seen.contains(ConfigProcessor.Option.Batched.class) && !seen.contains(ConfigProcessor.Option.Parallelism.class)) { - throw new IllegalStateException("parallelism must be configured when batching is enabled"); + if (this.taskDecorator != null) { + throw new IllegalArgumentException("task decorator already set"); } + this.taskDecorator = decorator; + return this; } - private void addOnce(ConfigProcessor.Option option) { - if (!seen.add(option.getClass())) { - throw new IllegalArgumentException("'%s' can only be configured once".formatted(ConfigProcessor.toHumanReadableString(option))); - } - modifiers.add(option); + /** + * Wraps the underlying {@link Executor} with the provided decorator. + *

+ * Useful for instrumenting or augmenting the executor (e.g. with structured concurrency scopes + * or custom scheduling policies) without replacing it entirely. + * + * @param o function that wraps the original {@code Executor} + * + * @return this configurer instance for fluent chaining + */ + public CollectingConfigurer executorDecorator(Function o) { + this.executorDecorator = Objects.requireNonNull(o, "executor decorator can't be null"); + return this; } } diff --git a/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java b/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java index d3df77dd..7d1f06cb 100644 --- a/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java +++ b/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java @@ -18,46 +18,53 @@ import java.util.List; import java.util.Spliterator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; -/** - * @author Grzegorz Piwowarek - */ -final class CompletionOrderSpliterator implements Spliterator { +final class CompletionOrderSpliterator implements Spliterator { - private final BlockingQueue> completed = new LinkedBlockingQueue<>(); + private final BlockingQueue> completed = new LinkedBlockingQueue<>(); private int remaining; - CompletionOrderSpliterator(List> futures) { + CompletionOrderSpliterator(List> futures) { this.remaining = futures.size(); - futures.forEach(f -> f.whenComplete((__, ___) -> completed.add(f))); - } - - @Override - public boolean tryAdvance(Consumer action) { - if (remaining <= 0) { - return false; + for (CompletableFuture f : futures) { + f.whenComplete((v, e) -> completed.offer(f)); } - action.accept(nextCompleted().join()); - return true; } - private CompletableFuture nextCompleted() { - try { - var next = completed.take(); + @Override + public boolean tryAdvance(Consumer action) { + while (remaining > 0) { + CompletableFuture next; + try { + next = completed.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } remaining--; - return next; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new RuntimeException(e); + try { + R value = next.join(); + action.accept(value); + return true; + } catch (CancellationException ce) { + continue; + } catch (CompletionException ce) { + if (ce.getCause() instanceof CancellationException) { + continue; + } + throw ce; + } } + return false; } @Override - public Spliterator trySplit() { + public Spliterator trySplit() { return null; } @@ -68,6 +75,6 @@ public long estimateSize() { @Override public int characteristics() { - return SIZED | IMMUTABLE; + return NONNULL; } } diff --git a/src/main/java/com/pivovarit/collectors/CompletionStrategy.java b/src/main/java/com/pivovarit/collectors/Config.java similarity index 70% rename from src/main/java/com/pivovarit/collectors/CompletionStrategy.java rename to src/main/java/com/pivovarit/collectors/Config.java index 0247f68f..940aa157 100644 --- a/src/main/java/com/pivovarit/collectors/CompletionStrategy.java +++ b/src/main/java/com/pivovarit/collectors/Config.java @@ -15,6 +15,15 @@ */ package com.pivovarit.collectors; -enum CompletionStrategy { - ORDERED, UNORDERED +import java.util.concurrent.Executor; +import java.util.function.Function; + +record Config( + Integer parallelism, + Executor executor, + boolean batching, + boolean ordered, + Function taskDecorator, + Function executorDecorator +) { } diff --git a/src/main/java/com/pivovarit/collectors/ConfigProcessor.java b/src/main/java/com/pivovarit/collectors/ConfigProcessor.java index 3d820225..3ca852b0 100644 --- a/src/main/java/com/pivovarit/collectors/ConfigProcessor.java +++ b/src/main/java/com/pivovarit/collectors/ConfigProcessor.java @@ -15,109 +15,87 @@ */ package com.pivovarit.collectors; -import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.UnaryOperator; - -import static java.util.Objects.requireNonNull; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Consumer; final class ConfigProcessor { - private static final ExecutorService DEFAULT_EXECUTOR = Executors.newThreadPerTaskExecutor(Thread.ofVirtual() - .name("parallel-collectors-", 0) - .factory()); - - record Config(boolean ordered, boolean batching, int parallelism, Executor executor) { - Config { - Objects.requireNonNull(executor, "executor can't be null"); - } + private ConfigProcessor() { } - static Config process(List A uoeCombiner(A a, A b) { + throw new UnsupportedOperationException(); + } - Collector batching(Function mapper, Executor executor, int parallelism); + static final class AsyncContainer { + final Dispatcher dispatcher; + final List> futures = new ArrayList<>(); - Collector parallel(Function mapper, Dispatcher dispatcher); + AsyncContainer(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } } - static StreamingConfigurer streaming(Consumer consumer) { - var c = new StreamingConfigurer(); - consumer.accept(c); - c.validate(); - return c; + static final class GroupingContainer { + final LinkedHashMap> groups = new LinkedHashMap<>(); } - static CollectingConfigurer collecting(Consumer consumer) { - var c = new CollectingConfigurer(); - consumer.accept(c); - c.validate(); - return c; + static final class BatchContext { + final java.util.Set running = java.util.concurrent.ConcurrentHashMap.newKeySet(); + final java.util.concurrent.atomic.AtomicBoolean failed = new java.util.concurrent.atomic.AtomicBoolean(); + final java.util.concurrent.atomic.AtomicReference primary = new java.util.concurrent.atomic.AtomicReference<>(); + + void onFailure(Throwable cause) { + if (failed.compareAndSet(false, true)) { + primary.set(cause); + for (Thread t : running) { + t.interrupt(); + } + } + } + + Throwable primaryException() { + return primary.get(); + } } } diff --git a/src/main/java/com/pivovarit/collectors/FutureCollectors.java b/src/main/java/com/pivovarit/collectors/FutureCollectors.java deleted file mode 100644 index 620da26d..00000000 --- a/src/main/java/com/pivovarit/collectors/FutureCollectors.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collector; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toList; - -/** - * @author Grzegorz Piwowarek - */ -final class FutureCollectors { - static Collector, ?, CompletableFuture> toFuture(Collector collector) { - Objects.requireNonNull(collector, "collector cannot be null"); - - return Collectors.collectingAndThen(toList(), list -> { - var future = CompletableFuture.allOf(list.toArray(CompletableFuture[]::new)) - .thenApply(__ -> list.stream() - .map(CompletableFuture::join) - .collect(collector)); - - for (var f : list) { - f.whenComplete((__, throwable) -> { - if (throwable != null) { - future.completeExceptionally(throwable); - } - }); - } - - future.whenComplete((v, ex) -> { - if (ex != null) { - for (var f : list) { - f.cancel(true); - } - } - }); - - return future; - }); - } - - static Collector, ?, CompletableFuture>> toFuture() { - return toFuture(toList()); - } -} diff --git a/src/main/java/com/pivovarit/collectors/Group.java b/src/main/java/com/pivovarit/collectors/Group.java index 171a170f..6b44818f 100644 --- a/src/main/java/com/pivovarit/collectors/Group.java +++ b/src/main/java/com/pivovarit/collectors/Group.java @@ -17,59 +17,67 @@ import java.util.List; import java.util.Objects; -import java.util.function.BiFunction; /** - * Represents a grouping of values under a specific key. + * Represents a grouping of values under a specific key, produced by the {@code parallelBy} / + * {@code parallelToStreamBy} collectors. * - * @param the type of the key - * @param the type of the values - * @param key the key of this group, must not be null - * @param values the list of values, must not be null + * @param the type of the key + * @param the type of the values */ -public record Group(T key, List values) { +public class Group { - /** - * Constructs a new {@code Group} instance ensuring key and values are not null. - * - * @param key the key, must not be null - * @param values the list of values, must not be null - */ - public Group { - Objects.requireNonNull(key, "key cannot be null"); - Objects.requireNonNull(values, "values cannot be null"); + private final K key; + private final List values; + + Group(K key, List values) { + this.key = key; + this.values = values; } /** * Creates a new {@code Group} instance with the given key and values. * - * @param key the key, must not be null - * @param values the list of values, must not be null - * @param the type of the key - * @param the type of the values + * @param i the key + * @param integers the list of values * * @return a new {@code Group} instance */ - public static Group of(T key, List values) { - return new Group<>(key, values); + public static Group of(int i, List integers) { + return new Group<>(i, integers); } /** - * Transforms the values in this group using the provided mapper function. - *

- * The mapper receives both the group's key and each value, allowing transformations - * that depend on the grouping key. + * Returns the values associated with this group's key. * - * @param mapper the mapping function receiving (key, value), must not be null - * @param the target type of the mapped values - * - * @return a new {@code Group} instance with the same key and the values produced by applying - * {@code mapper} to the key and each value in this group + * @return the list of values in this group */ - public Group map(BiFunction mapper) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - return new Group<>(key, values.stream() - .map(v -> (R) mapper.apply(key, v)) - .toList()); + public List values() { + return values; + } + + K key() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Group other)) { + return false; + } + return Objects.equals(key, other.key) && Objects.equals(values, other.values); + } + + @Override + public int hashCode() { + return Objects.hash(key, values); + } + + @Override + public String toString() { + return "Group[key=" + key + ", values=" + values + "]"; } } diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java index 8b531b43..9860ff12 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java +++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java @@ -15,8 +15,6 @@ */ package com.pivovarit.collectors; -import java.util.List; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -48,13 +46,6 @@ private ParallelCollectors() { *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i), toList()));
-     * }
- * * @param mapper transformation applied to each element * @param collector the {@code Collector} describing the reduction * @param the input element type @@ -62,72 +53,39 @@ private ParallelCollectors() { * @param the reduction result type produced by {@code collector} * * @return a {@code Collector} producing a {@link CompletableFuture} of the reduced result - * - * @since 4.0.0 */ public static Collector> parallel( Function mapper, Collector collector) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - - return Factory.collecting(s -> s.collect(collector), mapper, c -> {}); + return Factory.async(mapper, ConfigProcessor.empty(), collector); } /** * A convenience {@link Collector} that performs parallel computations using Virtual Threads * and returns a {@link CompletableFuture} containing a {@link Stream} of the mapped results. - *

- * Each element is transformed using the provided {@code mapper} in parallel on Virtual Threads. - * Unlike {@link #parallel(Function, Collector)}, this overload does not perform a reduction; it - * simply collects and exposes the mapped elements as a {@code Stream} in the resulting future. * *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i)));
-     *
-     * result.thenAccept(s -> s.forEach(System.out::println));
-     * }
- * * @param mapper transformation applied to each element * @param the input element type * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code CompletableFuture} of a {@code Stream} of mapped elements - * - * @since 3.0.0 */ public static Collector>> parallel( Function mapper) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.collecting((Function, Stream>) s -> s, mapper, c -> {}); + return Factory.async(mapper, ConfigProcessor.empty()); } /** * A convenience {@link Collector} that performs parallel computations by classifying input * elements using the provided {@code classifier}, applying the given {@code mapper}, and * emitting {@link Group} entries representing each group. - *

- * Each element is classified using {@code classifier}, then transformed using {@code mapper} in - * parallel on Virtual Threads. The resulting {@link Group} entries are exposed as a - * {@link CompletableFuture} of {@code Stream>}. * *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t)));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param the input element type @@ -135,38 +93,24 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@link CompletableFuture} of a {@code Stream} of grouped results - * - * @since 3.4.0 */ public static Collector>>> parallelBy( Function classifier, Function mapper) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.collectingBy(classifier, mapper, c -> {}); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.empty()); } /** * A convenience {@link Collector} that performs parallel computations by classifying input * elements using the provided {@code classifier}, applying the given {@code mapper}, and - * emitting {@link Group} entries representing each group. + * reducing the resulting {@link Group} entries using the user-provided {@code collector}. *

- * The generated {@link Stream} of {@code Group} instances is then reduced using the - * user-provided {@code collector}, executed on Virtual Threads. Each group is processed - * independently, and every group is guaranteed to be processed on a single thread. - * The reduction is applied to the grouped results rather than to the raw mapped elements. + * Each group is processed independently, and every group is guaranteed to be processed on + * a single thread. * *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t), toList()));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param collector the {@code Collector} describing the reduction of the grouped results @@ -177,30 +121,17 @@ private ParallelCollectors() { * * @return a {@code Collector} producing a {@link CompletableFuture} whose value is obtained * by reducing the {@code Stream>} produced by the parallel classification - * - * @since 3.4.0 */ public static Collector> parallelBy( Function classifier, Function mapper, Collector, ?, RR> collector) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - - return Factory.collectingBy( - classifier, - (Function>, RR>) s -> s.collect(collector), - mapper, - c -> {}); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.empty(), collector); } /** * A convenience {@link Collector} that performs parallel computations using Virtual Threads * and returns a {@link Stream} of the mapped results. - *

- * Each element is transformed using the provided {@code mapper} in parallel on Virtual Threads, - * and the mapped elements are exposed as a {@code Stream}. * *

Ordering: This collector emits elements in an arbitrary order. To preserve encounter * order, use the {@link StreamingConfigurer} overload and configure {@link StreamingConfigurer#ordered()}. @@ -208,36 +139,21 @@ private ParallelCollectors() { *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * Stream result = Stream.of(1, 2, 3)
-     *   .collect(parallelToStream(i -> foo(i)));
-     * }
- * * @param mapper transformation applied to each element * @param the input element type * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of mapped elements - * - * @since 3.0.0 */ public static Collector> parallelToStream( Function mapper) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.streaming(mapper, c -> {}); + return Factory.streaming(mapper, ConfigProcessor.empty()); } /** * A convenience {@link Collector} that performs parallel computations by classifying input * elements using the provided {@code classifier}, applying the given {@code mapper}, and - * emitting {@link Group} entries representing each group. - *

- * Each element is classified using {@code classifier}, then transformed using {@code mapper} in - * parallel on Virtual Threads. The resulting grouped entries are exposed as a - * {@code Stream>}. + * emitting {@link Group} entries representing each group as a {@link Stream}. * *

Ordering: This collector emits {@link Group} elements in an arbitrary order. * To preserve encounter order, use the {@link StreamingConfigurer} overload and configure @@ -246,13 +162,6 @@ private ParallelCollectors() { *

Note: This collector does not limit parallelism in any way (it may spawn work for every * element). As a result, it is not suitable for processing huge streams. * - *
- * Example: - *

{@code
-     * Stream> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelToStreamBy(Task::groupId, t -> compute(t)));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param the input element type @@ -260,16 +169,11 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of grouped results - * - * @since 3.4.0 */ public static Collector>> parallelToStreamBy( Function classifier, Function mapper) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.streamingBy(classifier, mapper, c -> {}); + return Factory.streamingGrouping(classifier, mapper, ConfigProcessor.empty()); } // configurers @@ -278,46 +182,21 @@ private ParallelCollectors() { * A convenience {@link Collector} that performs parallel computations (by default on Virtual Threads) * and returns a {@link CompletableFuture} containing a {@link Stream} of the mapped results, * with additional configuration applied via the provided {@code configurer}. - *

- * Each element is transformed using the provided {@code mapper} in parallel. Unless overridden via - * {@link CollectingConfigurer#executor(java.util.concurrent.Executor)}, tasks are executed on - * Virtual Threads. The {@code configurer} can also be used to enable batching and/or set a maximum - * parallelism level. - * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. * *

Note: For more information on available configuration options, see * {@link CollectingConfigurer}. * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i), c -> c
-     *     .parallelism(64)
-     *     .batching()
-     *   ));
-     * }
- * * @param mapper transformation applied to each element * @param configurer callback used to configure execution (see {@link CollectingConfigurer}) * @param the input element type * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code CompletableFuture} of a {@code Stream} of mapped elements - * - * @since 4.0.0 */ public static Collector>> parallel( Function mapper, Consumer configurer) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - - return Factory.collecting((Function, Stream>) i -> i, mapper, configurer); + return Factory.async(mapper, ConfigProcessor.fromCollecting(configurer)); } /** @@ -325,52 +204,24 @@ private ParallelCollectors() { * and returns a {@link CompletableFuture} containing the result of applying the user-provided * {@link Collector} to the mapped elements, with additional configuration applied via the provided * {@code configurer}. - *

- * Each element is transformed using the provided {@code mapper} in parallel, and the results are - * reduced according to the supplied {@code collector}. Unless overridden via - * {@link CollectingConfigurer#executor(java.util.concurrent.Executor)}, tasks are executed on - * Virtual Threads. The {@code configurer} can also be used to enable batching and/or limit the - * maximum parallelism. - * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. * *

Note: For more information on available configuration options, see * {@link CollectingConfigurer}. * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i), c -> c
-     *       .parallelism(64)
-     *       .batching(),
-     *     toList()
-     *   ));
-     * }
- * * @param mapper transformation applied to each element - * @param collector the {@code Collector} describing the reduction * @param configurer callback used to configure execution (see {@link CollectingConfigurer}) + * @param collector the {@code Collector} describing the reduction * @param the input element type * @param the type produced by {@code mapper} * @param the reduction result type produced by {@code collector} * * @return a {@code Collector} producing a {@link CompletableFuture} of the reduced result - * - * @since 4.0.0 */ public static Collector> parallel( Function mapper, Consumer configurer, Collector collector) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - - return Factory.collecting(s -> s.collect(collector), mapper, configurer); + return Factory.async(mapper, ConfigProcessor.fromCollecting(configurer), collector); } /** @@ -378,31 +229,10 @@ private ParallelCollectors() { * elements using the provided {@code classifier}, applying the given {@code mapper}, and * emitting {@link Group} entries representing each group, with additional configuration applied * via the provided {@code configurer}. - *

- * Each element is classified using {@code classifier}, then transformed using {@code mapper} in - * parallel. Unless overridden via {@link CollectingConfigurer#executor(java.util.concurrent.Executor)}, - * tasks are executed on Virtual Threads. The resulting grouped entries are exposed as a - * {@link CompletableFuture} of {@code Stream>}. The {@code configurer} can also be used - * to enable batching and/or set a maximum parallelism level. - * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. * *

Note: For more information on available configuration options, see * {@link CollectingConfigurer}. * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t), c -> c
-     *     .parallelism(64)
-     *     .batching()
-     *   ));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param configurer callback used to configure execution (see {@link CollectingConfigurer}) @@ -411,53 +241,26 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@link CompletableFuture} of a {@code Stream} of grouped results - * - * @since 4.0.0 */ public static Collector>>> parallelBy( Function classifier, Function mapper, Consumer configurer) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - - return Factory.collectingBy(classifier, mapper, configurer); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.fromCollecting(configurer)); } /** * A convenience {@link Collector} that performs parallel computations by classifying input * elements using the provided {@code classifier}, applying the given {@code mapper}, and - * emitting {@link Group} entries representing each group, and then reducing them using the - * user-provided {@code collector}, with additional configuration applied via the provided - * {@code configurer}. + * reducing the resulting {@link Group} entries using the user-provided {@code collector}, + * with additional configuration applied via the provided {@code configurer}. *

- * The generated {@link Stream} of {@code Group} instances is reduced using the supplied - * {@code collector}, executed on Virtual Threads by default (unless overridden via - * {@link CollectingConfigurer#executor(java.util.concurrent.Executor)}). Each group is processed - * independently, and every group is guaranteed to be processed on a single thread. - * The reduction is applied to the grouped results rather than to the raw mapped elements. - * The {@code configurer} can also be used to enable batching and/or set a maximum parallelism level. - * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link CollectingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. + * Each group is processed independently, and every group is guaranteed to be processed on + * a single thread. * *

Note: For more information on available configuration options, see * {@link CollectingConfigurer}. * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t), c -> c
-     *       .parallelism(64)
-     *       .batching(),
-     *     toList()
-     *   ));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param configurer callback used to configure execution (see {@link CollectingConfigurer}) @@ -469,109 +272,52 @@ private ParallelCollectors() { * * @return a {@code Collector} producing a {@link CompletableFuture} whose value is obtained by * reducing the {@code Stream>} produced by the parallel classification - * - * @since 4.0.0 */ public static Collector> parallelBy( Function classifier, Function mapper, Consumer configurer, Collector, ?, RR> collector) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - - return Factory.collectingBy( - classifier, - (Function>, RR>) s -> s.collect(collector), - mapper, - configurer); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.fromCollecting(configurer), collector); } /** * A convenience {@link Collector} that performs parallel computations (by default on Virtual Threads) * and returns a {@link Stream} of the mapped results, with additional configuration applied via the * provided {@code configurer}. - *

- * Each element is transformed using the provided {@code mapper} in parallel. Unless overridden via - * {@link StreamingConfigurer#executor(java.util.concurrent.Executor)}, tasks are executed on Virtual - * Threads. The {@code configurer} can also be used to enable batching and/or cap parallelism. * *

Ordering: By default, this collector emits elements in an arbitrary order. * To preserve encounter order, configure ordered emission via {@link StreamingConfigurer#ordered()}. * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link StreamingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. - * *

Note: For more information on available configuration options, see * {@link StreamingConfigurer}. * - *
- * Example: - *

{@code
-     * Stream result = Stream.of(1, 2, 3)
-     *   .collect(parallelToStream(i -> foo(i), c -> c
-     *     .ordered()
-     *     .parallelism(64)
-     *     .batching()
-     *   ));
-     * }
- * * @param mapper transformation applied to each element * @param configurer callback used to configure execution (see {@link StreamingConfigurer}) * @param the input element type * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of mapped elements - * - * @since 4.0.0 */ public static Collector> parallelToStream( Function mapper, Consumer configurer) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - - return Factory.streaming(mapper, configurer); + return Factory.streaming(mapper, ConfigProcessor.fromStreaming(configurer)); } /** * A convenience {@link Collector} that performs parallel computations by classifying input * elements using the provided {@code classifier}, applying the given {@code mapper}, and - * emitting {@link Group} entries representing each group, with additional configuration applied - * via the provided {@code configurer}. - *

- * Each element is classified using {@code classifier}, then transformed using {@code mapper} in - * parallel. Unless overridden via {@link StreamingConfigurer#executor(java.util.concurrent.Executor)}, - * tasks are executed on Virtual Threads. The resulting grouped entries are exposed as a - * {@code Stream>}. + * emitting {@link Group} entries representing each group as a {@link Stream}, with additional + * configuration applied via the provided {@code configurer}. * *

Ordering: By default, this collector emits {@link Group} elements in an * arbitrary order. To preserve encounter order, configure ordered emission via * {@link StreamingConfigurer#ordered()}. * - *

Note: Unless the {@code configurer} explicitly limits parallelism (e.g. via - * {@link StreamingConfigurer#parallelism(int)}), this collector does not limit parallelism in any - * way (it may spawn work for every element). As a result, it is not suitable for processing huge - * streams. - * *

Note: For more information on available configuration options, see * {@link StreamingConfigurer}. * - *
- * Example: - *

{@code
-     * Stream> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelToStreamBy(Task::groupId, t -> compute(t), c -> c
-     *     .ordered()
-     *     .parallelism(64)
-     *     .batching()
-     *   ));
-     * }
- * * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param configurer callback used to configure execution (see {@link StreamingConfigurer}) @@ -580,36 +326,20 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of grouped results - * - * @since 4.0.0 */ public static Collector>> parallelToStreamBy( Function classifier, Function mapper, Consumer configurer) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(configurer, "configurer cannot be null"); - - return Factory.streamingBy(classifier, mapper, configurer); + return Factory.streamingGrouping(classifier, mapper, ConfigProcessor.fromStreaming(configurer)); } // convenience (defaults + parallelism) /** * A convenience {@link Collector} that performs parallel computations using Virtual Threads - * and returns a {@link CompletableFuture} containing a {@link Stream} of the mapped results. - *

- * This overload is a convenience for applying an easy parallelism cap. For additional configuration - * options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload - * accepting a {@link CollectingConfigurer}. - * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i), 64));
-     * }
+ * and returns a {@link CompletableFuture} containing a {@link Stream} of the mapped results, + * bounded by the provided parallelism level. * * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) @@ -617,34 +347,16 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code CompletableFuture} of a {@code Stream} of mapped elements - * - * @since 4.0.0 */ public static Collector>> parallel( Function mapper, int parallelism) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.collecting( - (Function, Stream>) i -> i, - mapper, - c -> c.parallelism(parallelism)); + return Factory.async(mapper, ConfigProcessor.collectingWithParallelism(parallelism)); } /** * A convenience {@link Collector} that performs parallel computations using Virtual Threads * and returns a {@link CompletableFuture} containing the reduced result produced by the provided - * {@code collector}. - *

- * This overload is a convenience for applying an easy parallelism cap. For additional configuration - * options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload - * accepting a {@link CollectingConfigurer}. - * - *
- * Example: - *

{@code
-     * CompletableFuture> result = Stream.of(1, 2, 3)
-     *   .collect(parallel(i -> foo(i), 64, toList()));
-     * }
+ * {@code collector}, bounded by the provided parallelism level. * * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) @@ -654,38 +366,19 @@ private ParallelCollectors() { * @param the reduction result type produced by {@code collector} * * @return a {@code Collector} producing a {@link CompletableFuture} of the reduced result - * - * @since 4.0.0 */ public static Collector> parallel( Function mapper, int parallelism, Collector collector) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - - return Factory.collecting( - (Function, RR>) s -> s.collect(collector), - mapper, - c -> c.parallelism(parallelism)); + return Factory.async(mapper, ConfigProcessor.collectingWithParallelism(parallelism), collector); } /** * A convenience {@link Collector} that performs parallel computations by classifying input elements * using the provided {@code classifier}, applying the given {@code mapper}, and emitting - * {@link Group} entries representing each batch. - *

- * This overload is a convenience for applying an easy parallelism cap. For additional configuration - * options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload - * accepting a {@link CollectingConfigurer}. - * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t), 64));
-     * }
- * - * @param classifier function that groups elements into batches + * {@link Group} entries representing each group, bounded by the provided parallelism level. + * + * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) * @param the input element type @@ -693,35 +386,20 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@link CompletableFuture} of a {@code Stream} of grouped results - * - * @since 4.0.0 */ public static Collector>>> parallelBy( Function classifier, Function mapper, int parallelism) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.collectingBy(classifier, mapper, c -> c.parallelism(parallelism)); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.collectingWithParallelism(parallelism)); } /** * A convenience {@link Collector} that performs parallel computations by classifying input elements * using the provided {@code classifier}, applying the given {@code mapper}, emitting {@link Group} - * entries representing each batch, and then reducing them using the user-provided {@code collector}. - *

- * This overload is a convenience for applying an easy parallelism cap. For additional configuration - * options (e.g. batching or a custom {@link java.util.concurrent.Executor}), use the overload - * accepting a {@link CollectingConfigurer}. - * - *
- * Example: - *

{@code
-     * CompletableFuture>> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelBy(Task::groupId, t -> compute(t), 64, toList()));
-     * }
- * - * @param classifier function that groups elements into batches + * entries representing each group, and then reducing them using the user-provided {@code collector}, + * bounded by the provided parallelism level. + * + * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) * @param collector the {@code Collector} describing the reduction for grouped results @@ -731,79 +409,44 @@ private ParallelCollectors() { * @param the reduction result type produced by {@code collector} * * @return a {@code Collector} producing a {@link CompletableFuture} of the reduced result - * - * @since 4.0.0 */ public static Collector> parallelBy( Function classifier, Function mapper, int parallelism, Collector, ?, RR> collector) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - Objects.requireNonNull(collector, "collector cannot be null"); - - return Factory.collectingBy( - classifier, - (Function>, RR>) s -> s.collect(collector), - mapper, - c -> c.parallelism(parallelism)); + return Factory.asyncGrouping(classifier, mapper, ConfigProcessor.collectingWithParallelism(parallelism), collector); } /** * A convenience {@link Collector} that performs parallel computations using Virtual Threads - * and returns a {@link Stream} of the mapped results. - *

- * This overload is a convenience for applying an easy parallelism cap. This method does not expose - * additional configuration (such as ordered emission, batching, or a custom executor). For more - * options, use the overload accepting a {@link StreamingConfigurer}. + * and returns a {@link Stream} of the mapped results, bounded by the provided parallelism level. * *

Ordering: This collector emits elements in an arbitrary order. To preserve encounter * order, use the {@link StreamingConfigurer} overload and configure {@link StreamingConfigurer#ordered()}. * - *
- * Example: - *

{@code
-     * Stream result = Stream.of(1, 2, 3)
-     *   .collect(parallelToStream(i -> foo(i), 64));
-     * }
- * * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) * @param the input element type * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of mapped elements - * - * @since 4.0.0 */ public static Collector> parallelToStream( Function mapper, int parallelism) { - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.streaming(mapper, c -> c.parallelism(parallelism)); + return Factory.streaming(mapper, ConfigProcessor.streamingWithParallelism(parallelism)); } /** * A convenience {@link Collector} that performs parallel computations by classifying input elements * using the provided {@code classifier}, applying the given {@code mapper}, and emitting - * {@link Group} entries representing each batch. - *

- * This overload is a convenience for applying an easy parallelism cap. This method does not expose - * additional configuration (such as ordered emission, batching, or a custom executor). For more - * options, use the overload accepting a {@link StreamingConfigurer}. + * {@link Group} entries representing each group as a {@link Stream}, bounded by the provided + * parallelism level. * *

Ordering: This collector emits {@link Group} elements in an arbitrary order. * To preserve encounter order, use the {@link StreamingConfigurer} overload and configure * {@link StreamingConfigurer#ordered()}. * - *
- * Example: - *

{@code
-     * Stream> result = Stream.of(t1, t2, t3)
-     *   .collect(parallelToStreamBy(Task::groupId, t -> compute(t), 64));
-     * }
- * - * @param classifier function that groups elements into batches + * @param classifier function that assigns a grouping key to each element * @param mapper transformation applied to each element * @param parallelism maximum parallelism (must be positive) * @param the input element type @@ -811,46 +454,10 @@ private ParallelCollectors() { * @param the type produced by {@code mapper} * * @return a {@code Collector} producing a {@code Stream} of grouped results - * - * @since 4.0.0 */ public static Collector>> parallelToStreamBy( Function classifier, Function mapper, int parallelism) { - Objects.requireNonNull(classifier, "classifier cannot be null"); - Objects.requireNonNull(mapper, "mapper cannot be null"); - - return Factory.streamingBy(classifier, mapper, c -> c.parallelism(parallelism)); - } - - /** - * A convenience {@code Collector} for collecting a {@code Stream>} - * into a {@code CompletableFuture} using a provided {@code Collector} - * - * @param collector the {@code Collector} describing the reduction - * @param the type of the collected elements - * @param the result type of the downstream {@code Collector} - * - * @return a {@code Collector} which collects all futures and combines them into a single future - * using the provided downstream {@code Collector} - * - * @since 2.3.0 - */ - public static Collector, ?, CompletableFuture> toFuture(Collector collector) { - return FutureCollectors.toFuture(collector); - } - - /** - * A convenience {@code Collector} for collecting a {@code Stream>} into a {@code CompletableFuture>} - * - * @param the type of the collected elements - * - * @return a {@code Collector} which collects all futures and combines them into a single future - * returning a list of results - * - * @since 2.3.0 - */ - public static Collector, ?, CompletableFuture>> toFuture() { - return FutureCollectors.toFuture(); + return Factory.streamingGrouping(classifier, mapper, ConfigProcessor.streamingWithParallelism(parallelism)); } } diff --git a/src/main/java/com/pivovarit/collectors/Preconditions.java b/src/main/java/com/pivovarit/collectors/Preconditions.java deleted file mode 100644 index 25f12774..00000000 --- a/src/main/java/com/pivovarit/collectors/Preconditions.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.Objects; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; - -final class Preconditions { - - private Preconditions() { - } - - static void requireValidParallelism(int parallelism) { - if (parallelism < 1) { - throw new IllegalArgumentException("Parallelism can't be lower than 1"); - } - } - - static void requireValidExecutor(Executor executor) { - Objects.requireNonNull(executor, "Executor can't be null"); - if (executor instanceof ThreadPoolExecutor tpe) { - switch (tpe.getRejectedExecutionHandler()) { - case ThreadPoolExecutor.DiscardPolicy ignored -> - throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks"); - case ThreadPoolExecutor.DiscardOldestPolicy ignored -> - throw new IllegalArgumentException("Executor's RejectedExecutionHandler can't discard tasks"); - default -> { - // no-op - } - } - } - } -} diff --git a/src/main/java/com/pivovarit/collectors/StreamingConfigurer.java b/src/main/java/com/pivovarit/collectors/StreamingConfigurer.java index dbdb94f9..29b1bf8a 100644 --- a/src/main/java/com/pivovarit/collectors/StreamingConfigurer.java +++ b/src/main/java/com/pivovarit/collectors/StreamingConfigurer.java @@ -15,14 +15,9 @@ */ package com.pivovarit.collectors; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Executor; -import java.util.function.UnaryOperator; +import java.util.function.Function; /** * Fluent configuration builder for collectors that stream results. @@ -32,138 +27,113 @@ */ public final class StreamingConfigurer { - private final List modifiers = new ArrayList<>(); - private final Set> seen = new HashSet<>(); + Integer parallelism; + Executor executor; + boolean batching; + boolean ordered; + Function taskDecorator; + Function executorDecorator; StreamingConfigurer() { } /** - * Preserves encounter order of the input when emitting results. + * Sets the maximum level of parallelism. *

- * Enabling this option may reduce throughput compared to unordered streaming, since it can - * require additional coordination and buffering to emit results in encounter order. + * This limits the number of tasks submitted to the worker queue at once, effectively bounding + * the amount of in-flight work and the maximum concurrency used by the collector. + * + * @param p the desired parallelism level (must be positive) * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer ordered() { - addOnce(ConfigProcessor.Option.Ordered.INSTANCE); + public StreamingConfigurer parallelism(int p) { + if (p < 1) { + throw new IllegalArgumentException("parallelism must be greater than 0"); + } + this.parallelism = p; return this; } /** - * Enables batching of work submitted to workers. + * Sets the {@link Executor} used for running tasks. *

- * When enabled, each worker thread receives a batch of input items and processes them in one go, - * instead of scheduling one task per item. This reduces the number of tasks created and typically - * decreases contention on the underlying worker queue. + * Use this to supply a custom execution strategy (for example, a dedicated thread pool). * - *

Note: Depending on batch sizing and workload skew, batching may reduce load balancing and - * can lead to thread starvation (some workers become idle while others remain overloaded). + *

Note: The provided executor must not drop tasks on rejection (e.g. using a + * {@code RejectedExecutionHandler} that discards submitted work). Dropping tasks will cause the + * stream to wait for results that will never be produced, which can lead to deadlocks. + * + * @param e the executor to use * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer batching() { - addOnce(ConfigProcessor.Option.Batched.INSTANCE); + public StreamingConfigurer executor(Executor e) { + this.executor = Objects.requireNonNull(e, "executor can't be null"); return this; } /** - * Sets the maximum level of parallelism. + * Enables batching of work submitted to workers. *

- * This limits the number of tasks submitted to the worker queue at once, effectively bounding - * the amount of in-flight work and the maximum concurrency used by the collector. + * When enabled, each worker thread receives a batch of input items and processes them in one go, + * instead of scheduling one task per item. This reduces the number of tasks created and typically + * decreases contention on the underlying worker queue. * - * @param parallelism the desired parallelism level (must be positive) + *

Note: Depending on batch sizing and workload skew, batching may reduce load balancing and + * can lead to thread starvation (some workers become idle while others remain overloaded). * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer parallelism(int parallelism) { - Preconditions.requireValidParallelism(parallelism); - - addOnce(new ConfigProcessor.Option.Parallelism(parallelism)); + public StreamingConfigurer batching() { + this.batching = true; return this; } /** - * Sets the {@link Executor} used for running tasks. + * Preserves encounter order of the input when emitting results. *

- * Use this to supply a custom execution strategy (for example, a dedicated thread pool). - * - *

Note: The provided executor must not drop tasks on rejection (e.g. using a - * {@code RejectedExecutionHandler} that discards submitted work). Dropping tasks will cause the - * stream to wait for results that will never be produced, which can lead to deadlocks. - * - * @param executor the executor to use + * Enabling this option may reduce throughput compared to unordered streaming, since it can + * require additional coordination and buffering to emit results in encounter order. * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer executor(Executor executor) { - Preconditions.requireValidExecutor(executor); - - addOnce(new ConfigProcessor.Option.ThreadPool(executor)); + public StreamingConfigurer ordered() { + this.ordered = true; return this; } /** - * Decorates the executor used for running tasks. + * Wraps each task with the provided decorator before submission. *

- * The decorator receives the resolved executor (either the default virtual-thread executor or - * the one provided via {@link #executor(Executor)}) and returns a wrapped replacement. - * This is useful for augmenting the executor with cross-cutting concerns such as context - * propagation (MDC, OpenTelemetry spans, etc.) or monitoring, without replacing the executor entirely. - * - *

Note: The executor returned by the decorator must not drop tasks on rejection. - * Dropping tasks will cause the stream to wait for results that will never be produced, - * which can lead to deadlocks. + * Useful for propagating thread-local context, adding instrumentation, or applying cross-cutting + * concerns (e.g. logging, tracing) around the execution of each mapper invocation. * - * @param decorator a function that wraps the resolved executor + * @param o function that wraps the original {@code Runnable} * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer executorDecorator(UnaryOperator decorator) { - Objects.requireNonNull(decorator, "executor decorator can't be null"); - - addOnce(new ConfigProcessor.Option.ExecutorDecorator(decorator)); + public StreamingConfigurer taskDecorator(Function o) { + Objects.requireNonNull(o, "task decorator can't be null"); + if (this.taskDecorator != null) { + throw new IllegalArgumentException("task decorator already set"); + } + this.taskDecorator = o; return this; } /** - * Decorates each individual task before it is submitted to the executor. + * Wraps the underlying {@link Executor} with the provided decorator. *

- * The decorator receives the {@link Runnable} representing a single unit of work and returns a - * wrapped replacement that runs in its place. This is useful for propagating thread-local context - * (e.g. MDC entries, OpenTelemetry spans, {@code SecurityContext}) into worker threads, or for - * per-task instrumentation, without replacing the executor entirely. + * Useful for instrumenting or augmenting the executor (e.g. with structured concurrency scopes + * or custom scheduling policies) without replacing it entirely. * - *

Unlike {@link #executorDecorator(UnaryOperator)}, which wraps the executor as a whole, - * this decorator is applied to each task individually and runs on the worker thread. - * - * @param decorator a function that wraps each submitted task + * @param o function that wraps the original {@code Executor} * * @return this configurer instance for fluent chaining */ - public StreamingConfigurer taskDecorator(UnaryOperator decorator) { - Objects.requireNonNull(decorator, "task decorator can't be null"); - - addOnce(new ConfigProcessor.Option.TaskDecorator(decorator)); + public StreamingConfigurer executorDecorator(Function o) { + this.executorDecorator = Objects.requireNonNull(o, "executor decorator can't be null"); return this; } - - List getConfig() { - return Collections.unmodifiableList(modifiers); - } - - void validate() { - if (seen.contains(ConfigProcessor.Option.Batched.class) && !seen.contains(ConfigProcessor.Option.Parallelism.class)) { - throw new IllegalStateException("parallelism must be configured when batching is enabled"); - } - } - - private void addOnce(ConfigProcessor.Option option) { - if (!seen.add(option.getClass())) { - throw new IllegalArgumentException("'%s' can only be configured once".formatted(ConfigProcessor.toHumanReadableString(option))); - } - modifiers.add(option); - } } diff --git a/src/main/java/com/pivovarit/collectors/SyncCollector.java b/src/main/java/com/pivovarit/collectors/SyncCollector.java deleted file mode 100644 index fe5466b1..00000000 --- a/src/main/java/com/pivovarit/collectors/SyncCollector.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collector; -import java.util.stream.Stream; - -record SyncCollector(Function mapper) - implements Collector, Stream> { - - @Override - public Supplier> supplier() { - return Stream::builder; - } - - @Override - public BiConsumer, T> accumulator() { - return (rs, t) -> rs.add(mapper.apply(t)); - } - - @Override - public BinaryOperator> combiner() { - return (rs, rs2) -> { - throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea"); - }; - } - - @Override - public Function, Stream> finisher() { - return Stream.Builder::build; - } - - @Override - public Set characteristics() { - return Set.of(); - } -} diff --git a/src/main/java/com/pivovarit/collectors/package-info.java b/src/main/java/com/pivovarit/collectors/package-info.java deleted file mode 100644 index 1f7bb5f7..00000000 --- a/src/main/java/com/pivovarit/collectors/package-info.java +++ /dev/null @@ -1,6 +0,0 @@ -/** - * Parallel {@link java.util.stream.Collector} implementations for processing stream elements concurrently. - * - *

The main entry point is {@link com.pivovarit.collectors.ParallelCollectors}. - */ -package com.pivovarit.collectors; diff --git a/src/test/java/com/pivovarit/collectors/ArchitectureTest.java b/src/test/java/com/pivovarit/collectors/ArchitectureTest.java deleted file mode 100644 index 0528fb10..00000000 --- a/src/test/java/com/pivovarit/collectors/ArchitectureTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import com.tngtech.archunit.core.importer.ImportOption; -import com.tngtech.archunit.junit.AnalyzeClasses; -import com.tngtech.archunit.junit.ArchTest; -import com.tngtech.archunit.lang.ArchRule; -import com.tngtech.archunit.lang.syntax.ArchRuleDefinition; -import com.tngtech.archunit.library.GeneralCodingRules; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import static com.tngtech.archunit.core.domain.JavaModifier.FINAL; -import static com.tngtech.archunit.lang.syntax.ArchRuleDefinition.classes; -import static com.tngtech.archunit.library.dependencies.SlicesRuleDefinition.slices; - -@AnalyzeClasses(packages = "com.pivovarit", importOptions = ImportOption.DoNotIncludeTests.class) -class ArchitectureTest { - - @ArchTest - static final ArchRule shouldBeFreeOfCycles = slices() - .matching("com.pivovarit.(*)..") - .should().beFreeOfCycles() - .as("the library should be free of cycles") - .because("cycles are bad"); - - @ArchTest - static final ArchRule shouldHaveSingletonFacade = classes() - .that().haveSimpleName("ParallelCollectors") - .should().haveOnlyFinalFields() - .andShould().haveOnlyPrivateConstructors() - .andShould().haveModifier(FINAL) - .as("ParallelCollectors should be a singleton") - .because("ParallelCollectors is never supposed to be instantiated"); - - @ArchTest - static final ArchRule shouldLimitPublicAPI = classes() - .that().arePublic() - .and().areNotNestedClasses() - .should().haveSimpleName("ParallelCollectors") - .orShould().haveSimpleName("Group") - .orShould().haveSimpleName("StreamingConfigurer") - .orShould().haveSimpleName("CollectingConfigurer") - .as("limit public API surface") - .because("only the designated API types should be public; everything else should remain internal"); - - @ArchTest - static final ArchRule shouldHaveZeroDependencies = classes() - .that().resideInAPackage("com.pivovarit.collectors") - .should() - .onlyDependOnClassesThat() - .resideInAnyPackage("com.pivovarit.collectors", "org.jspecify.annotations", "java..") - .as("the library should depend only on core Java classes") - .because("users appreciate not experiencing a dependency hell"); - - @ArchTest - static final ArchRule shouldHaveSinglePackage = classes() - .should().resideInAPackage("com.pivovarit.collectors"); - - @ArchTest - static final ArchRule shouldHaveTwoPublicClasses = classes() - .that().haveSimpleName("ParallelCollectors").or().haveSimpleName("Batching") - .should().bePublic().andShould().haveModifier(FINAL); - - @ArchTest - static final ArchRule noDefaultExecutorInCompletableFuture = ArchRuleDefinition.noClasses() - .should().callMethod(CompletableFuture.class, "supplyAsync", Supplier.class) - .orShould().callMethod(CompletableFuture.class, "runAsync", Runnable.class) - .orShould().callMethod(CompletableFuture.class, "thenRunAsync", Runnable.class) - .orShould().callMethod(CompletableFuture.class, "thenAcceptAsync", Consumer.class) - .orShould().callMethod(CompletableFuture.class, "thenApplyAsync", Function.class) - .orShould().callMethod(CompletableFuture.class, "thenCombineAsync", CompletionStage.class, BiFunction.class) - .orShould().callMethod(CompletableFuture.class, "runAfterBothAsync", CompletionStage.class, Runnable.class) - .orShould().callMethod(CompletableFuture.class, "thenAcceptBothAsync", CompletionStage.class, Consumer.class) - .orShould().callMethod(CompletableFuture.class, "thenComposeAsync", CompletionStage.class) - .orShould().callMethod(CompletableFuture.class, "completeAsync", Supplier.class) - .orShould().callMethod(CompletableFuture.class, "exceptionallyComposeAsync", Function.class) - .orShould().callMethod(CompletableFuture.class, "handleAsync", BiFunction.class) - .orShould().callMethod(CompletableFuture.class, "applyToEitherAsync", CompletionStage.class, Function.class) - .orShould().callMethod(CompletableFuture.class, "whenCompleteAsync", BiConsumer.class) - .because("those default to ForkJoinPool.commonPool() which is a terrible default for most cases - consider Executors#newVirtualThreadPerTaskExecutor instead. If you really need ForkJoinPool, provide it explicitly (ForkJoinPool.commonPool())"); - - @ArchTest - private static final ArchRule noClassesShouldAccessStandardStreams = GeneralCodingRules.NO_CLASSES_SHOULD_ACCESS_STANDARD_STREAMS; - - @ArchTest - private static final ArchRule noClassesShouldUseJavaUtilLogging = GeneralCodingRules.NO_CLASSES_SHOULD_USE_JAVA_UTIL_LOGGING; -} diff --git a/src/test/java/com/pivovarit/collectors/BatchingSpliteratorTest.java b/src/test/java/com/pivovarit/collectors/BatchingSpliteratorTest.java deleted file mode 100644 index b463c94e..00000000 --- a/src/test/java/com/pivovarit/collectors/BatchingSpliteratorTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Spliterator; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -import static com.pivovarit.collectors.BatchingSpliterator.partitioned; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertThrows; - -class BatchingSpliteratorTest { - - @Test - void shouldReturnEmptyStreamForEmptyList() { - assertThat(BatchingSpliterator.partitioned(List.of(), 3).toList()).isEmpty(); - } - - @Test - void shouldSplitAndProcessIndependently() { - var input = List.of(1, 2, 3, 4, 5, 6, 7, 8); - var original = new BatchingSpliterator<>(input, 4); - - var split = original.trySplit(); - assertThat(split).isNotNull(); - - var splitChunks = StreamSupport.stream(split, false).toList(); - var remainingChunks = StreamSupport.stream(original, false).toList(); - - var combined = Stream.concat(splitChunks.stream(), remainingChunks.stream()) - .flatMap(List::stream) - .toList(); - - assertThat(combined).containsExactlyElementsOf(input); - assertThat(splitChunks.size()).isEqualTo(2); - assertThat(remainingChunks.size()).isEqualTo(2); - } - - @Test - void shouldHandleEmptyList() { - var input = List.of(); - var spliterator = new BatchingSpliterator<>(input, 3); - - assertThat(spliterator.trySplit()).isNull(); - - var result = StreamSupport.stream(spliterator, false).toList(); - - assertThat(result).isEmpty(); - } - - @Test - void shouldHandleSingleElementList() { - var input = List.of(42); - var spliterator = new BatchingSpliterator<>(input, 3); - - assertThat(spliterator.trySplit()).isNull(); - - var result = StreamSupport.stream(spliterator, false).toList(); - - assertThat(result).hasSize(1); - assertThat(result.getFirst()).containsExactly(42); - } - - @Test - void shouldSplitInNEvenBatches() { - var list = IntStream.range(0, 10).boxed().toList(); - - var result = partitioned(list, 3).toList(); - - assertThat(result) - .hasSize(3) - .extracting(List::size) - .contains(4, 3); - } - - @Test - void shouldSplitInNBatches() { - var list = IntStream.range(0, 10).boxed().toList(); - - var result = partitioned(list, 2).toList(); - - assertThat(result) - .hasSize(2) - .extracting(List::size) - .containsOnly(5); - } - - @Test - void shouldSplitInNSingletonLists() { - var list = IntStream.range(0, 5).boxed().toList(); - - var result = partitioned(list, 10).toList(); - - assertThat(result) - .hasSize(5) - .extracting(List::size) - .containsOnly(1); - } - - @Test - void shouldReturnNestedListIfOneBatch() { - var list = IntStream.range(0, 10).boxed().toList(); - - var result = partitioned(list, 1).toList(); - - assertThat(result.getFirst()).containsExactlyElementsOf(list); - } - - @Test - void shouldReturnEmptyIfZeroParts() { - assertThatThrownBy(() -> partitioned(Arrays.asList(1, 2, 3), 0).toList()); - } - - @Test - void shouldReportCorrectSizeWhenOneBatch() { - var list = IntStream.range(0, 10).boxed().toList(); - - assertThat(partitioned(list, 1).count()).isEqualTo(1); - } - - @Test - void shouldReportCorrectSizeWhenMultipleBatches() { - var list = IntStream.range(0, 10).boxed().toList(); - - assertThat(partitioned(list, 2).count()).isEqualTo(2); - } - - @Test - void shouldPartitionToSingletonsWhenSizeLessThanBatches() { - List> result = partitioned(List.of(1, 2), 5).toList(); - - assertThat(result).containsExactly(List.of(1), List.of(2)); - } - - @Test - void shouldReturnSameListWhenBatchesIsOne() { - List input = List.of(1, 2, 3, 4, 5); - List> result = partitioned(input, 1).toList(); - - assertThat(result).containsExactly(input); - } - - @Test - void shouldNotExceedBoundsForUnevenSplit() { - List input = List.of(1, 2, 3, 4, 5); - List> result = partitioned(input, 3).toList(); - - assertThat(result).containsExactly(List.of(1, 2), List.of(3, 4), List.of(5)); - } - - @Test - void shouldNotExceedBoundsOnLargeChunkRecomputation() { - List input = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - input.add(i); - } - - List> result = partitioned(input, 3).toList(); - - assertThat(result).containsExactly(List.of(0, 1, 2, 3), List.of(4, 5, 6), List.of(7, 8, 9)); - } - - @Test - void shouldThrowWhenZeroBatches() { - List input = List.of(1, 2, 3); - - assertThrows(IllegalArgumentException.class, () -> partitioned(input, 0)); - } - - @Test - void shouldHandleExactDivisibility() { - List input = List.of(1, 2, 3, 4); - List> result = partitioned(input, 2).toList(); - - assertThat(result).containsExactly(List.of(1, 2), List.of(3, 4)); - } - - @Test - void shouldNotModifyInputList() { - List input = new ArrayList<>(List.of(1, 2, 3, 4, 5)); - partitioned(input, 2).forEach(c -> {}); - - assertThat(input).isEqualTo(List.of(1, 2, 3, 4, 5)); - } - - @Test - void shouldNotOvershootConsumedOnLastBatch() { - List source = List.of(1, 2, 3, 4, 5); - - Spliterator> spliterator = new BatchingSpliterator<>(source, 2); - - List> result = new ArrayList<>(); - while (spliterator.tryAdvance(result::add)) { - // no-op - } - - assertThat(result).containsExactly(List.of(1, 2, 3), List.of(4, 5)); - - assertThat(result.stream().mapToInt(List::size).sum()).isEqualTo(source.size()); - } - - @Test - void shouldReportOrderedSizedAndSubSized() { - Spliterator> spliterator = new BatchingSpliterator<>(List.of(1, 2, 3), 2); - - int characteristics = spliterator.characteristics(); - - assertThat(characteristics & Spliterator.IMMUTABLE).isNotZero(); - assertThat(characteristics & Spliterator.ORDERED).isNotZero(); - assertThat(characteristics & Spliterator.SIZED).isNotZero(); - assertThat(characteristics & Spliterator.SUBSIZED).isZero(); - } - - @Nested - class EstimateSizeTests { - @Test - void shouldReturnZeroForEmptyList() { - List empty = List.of(); - Spliterator> spliterator = new BatchingSpliterator<>(empty, 3); - - assertThat(spliterator.estimateSize()).isZero(); - } - - @Test - void shouldReturnOneForSingleBatch() { - List list = List.of(1, 2, 3, 4); - Spliterator> spliterator = new BatchingSpliterator<>(list, 1); - - assertThat(spliterator.estimateSize()).isEqualTo(1); - } - - @Test - void shouldReturnNumberOfElementsForMoreBatchesThanElements() { - List list = List.of(1, 2, 3); - Spliterator> spliterator = new BatchingSpliterator<>(list, 5); - - assertThat(spliterator.estimateSize()).isEqualTo(3); - } - - @Test - void shouldDecreaseAfterTryAdvance() { - List list = List.of(1, 2, 3, 4, 5); - BatchingSpliterator spliterator = new BatchingSpliterator<>(list, 2); - - long initialSize = spliterator.estimateSize(); - assertThat(initialSize).isEqualTo(2); - - spliterator.tryAdvance(batch -> { - }); - - long afterAdvance = spliterator.estimateSize(); - assertThat(afterAdvance).isEqualTo(1); - } - - @Test - void shouldAdjustCorrectlyAfterTrySplit() { - List list = List.of(1, 2, 3, 4, 5, 6); - BatchingSpliterator spliterator = new BatchingSpliterator<>(list, 3); - - long beforeSplit = spliterator.estimateSize(); - assertThat(beforeSplit).isEqualTo(3); - - Spliterator> split = spliterator.trySplit(); - assertThat(split).isNotNull(); - - long afterSplit = spliterator.estimateSize(); - assertThat(afterSplit).isEqualTo(2); // remaining chunks in original - } - - @Test - void shouldReturnZeroAfterAllConsumed() { - List list = List.of(1, 2, 3, 4, 5); - BatchingSpliterator spliterator = new BatchingSpliterator<>(list, 2); - - while (spliterator.tryAdvance(batch -> { - })) { - } - - assertThat(spliterator.estimateSize()).isZero(); - } - } -} diff --git a/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java b/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java deleted file mode 100644 index 771a144a..00000000 --- a/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Spliterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Consumer; -import java.util.stream.StreamSupport; -import org.junit.jupiter.api.Test; - -import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; - -class CompletionOrderSpliteratorTest { - - @Test - void shouldTraverseInCompletionOrder() { - CompletableFuture f1 = new CompletableFuture<>(); - CompletableFuture f2 = new CompletableFuture<>(); - CompletableFuture f3 = new CompletableFuture<>(); - List> futures = asList(f1, f2, f3); - - CompletableFuture.runAsync(() -> { - f3.complete(3); - sleep(100); - f1.complete(2); - sleep(100); - f2.complete(1); - }); - var results = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).toList(); - - assertThat(results).containsExactly(3, 2, 1); - } - - @Test - void shouldPropagateException() { - CompletableFuture f1 = new CompletableFuture<>(); - CompletableFuture f2 = new CompletableFuture<>(); - CompletableFuture f3 = new CompletableFuture<>(); - List> futures = asList(f1, f2, f3); - - CompletableFuture.runAsync(() -> { - f3.complete(3); - sleep(100); - f1.completeExceptionally(new RuntimeException()); - sleep(100); - f2.complete(1); - }); - assertThatThrownBy(() -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).toList()) - .isInstanceOf(CompletionException.class) - .hasCauseExactlyInstanceOf(RuntimeException.class); - } - - private static void sleep(int millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Test - void shouldStreamInCompletionOrder() { - int value = 42; - List> futures = asList(new CompletableFuture<>(), CompletableFuture - .completedFuture(value)); - - Optional result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).findAny(); - - assertThat(result).contains(value); - } - - @Test - void shouldNotConsumeOnEmpty() { - List> futures = Collections.emptyList(); - - Spliterator spliterator = new CompletionOrderSpliterator<>(futures); - - ResultHolder result = new ResultHolder<>(); - boolean consumed = spliterator.tryAdvance(result); - - assertThat(consumed).isFalse(); - assertThat(result.result).isNull(); - } - - @Test - void shouldHandleNullResults() { - List> futures = asList( - CompletableFuture.completedFuture(null), - CompletableFuture.completedFuture("a"), - CompletableFuture.completedFuture(null) - ); - - var results = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).toList(); - - assertThat(results).containsExactly(null, "a", null); - } - - @Test - void shouldRestoreInterrupt() { - Thread executorThread = new Thread(() -> { - Spliterator spliterator = new CompletionOrderSpliterator<>(List.of(new CompletableFuture<>())); - try { - spliterator.tryAdvance(i -> {}); - } catch (Exception e) { - while (true) { - Thread.onSpinWait(); - } - } - }); - - executorThread.start(); - - executorThread.interrupt(); - - await() - .until(executorThread::isInterrupted); - } - - static class ResultHolder implements Consumer { - - private volatile T result; - - @Override - public void accept(T t) { - result = t; - } - } -} diff --git a/src/test/java/com/pivovarit/collectors/DispatcherTest.java b/src/test/java/com/pivovarit/collectors/DispatcherTest.java deleted file mode 100644 index ec528661..00000000 --- a/src/test/java/com/pivovarit/collectors/DispatcherTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.lang.reflect.Field; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -class DispatcherTest { - - @Test - void shouldShutdownExecutorOnStop() { - AtomicReference holder = new AtomicReference<>(); - - var dispatcher = new Dispatcher(Executors.newCachedThreadPool(), 1, holder::set); - dispatcher.start(); - dispatcher.submit(() -> 42); - dispatcher.stop(); - - await().untilAsserted(() -> assertThat(holder.get().isAlive()).isFalse()); - } - - @Test - void shouldNotAcquirePermitOnPoisonPill() throws Exception { - var executorGate = new Semaphore(0); - - Executor blockingExecutor = command -> { - try { - executorGate.acquire(); - command.run(); - } catch (InterruptedException ignored) { - } - }; - - var dispatcher = new Dispatcher<>(blockingExecutor, 1); - - dispatcher.start(); - dispatcher.stop(); - - Semaphore limiter = extractSemaphore(dispatcher); - - assertThat(limiter.tryAcquire(100, TimeUnit.MILLISECONDS)) - .as("no permit should be leaked on stop") - .isTrue(); - } - - private static Semaphore extractSemaphore(Dispatcher dispatcher) throws Exception { - Field limiterField = Dispatcher.class.getDeclaredField("limiter"); - limiterField.setAccessible(true); - return (Semaphore) limiterField.get(dispatcher); - } -} diff --git a/src/test/java/com/pivovarit/collectors/FutureCollectorsTest.java b/src/test/java/com/pivovarit/collectors/FutureCollectorsTest.java deleted file mode 100644 index ad1a25af..00000000 --- a/src/test/java/com/pivovarit/collectors/FutureCollectorsTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.Executors; -import java.util.stream.IntStream; -import org.junit.jupiter.api.Test; - -import static java.util.stream.Collectors.toList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertTimeout; - -class FutureCollectorsTest { - - @Test - void shouldCollect() { - List list = Arrays.asList(1, 2, 3); - - CompletableFuture> result = list.stream() - .map(i -> CompletableFuture.supplyAsync(() -> i)) - .collect(ParallelCollectors.toFuture()); - - assertThat(result.join()).containsExactlyElementsOf(list); - } - - @Test - void shouldCollectToList() { - var list = Arrays.asList(1, 2, 3); - - var result = list.stream() - .map(i -> CompletableFuture.supplyAsync(() -> i)) - .collect(ParallelCollectors.toFuture(toList())); - - assertThat(result.join()).containsExactlyElementsOf(list); - } - - @Test - void shouldShortcircuit() { - var list = IntStream.range(0, 10).boxed().toList(); - - try (var e = Executors.newFixedThreadPool(10)) { - CompletableFuture> result - = list.stream() - .map(i -> CompletableFuture.supplyAsync(() -> { - if (i != 9) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - return i; - } else { - throw new RuntimeException(); - } - }, e)) - .collect(ParallelCollectors.toFuture(toList())); - - assertTimeout(Duration.ofMillis(100), () -> { - try { - result.join(); - } catch (CompletionException ex) { - } - }); - } - } - - @Test - void shouldCancelSiblingsOnFirstFailure() { - var futures = new ArrayList>(); - for (int i = 0; i < 10; i++) { - int idx = i; - futures.add(idx == 0 - ? CompletableFuture.failedFuture(new RuntimeException("boom")) - : new CompletableFuture<>()); - } - - var result = futures.stream().collect(ParallelCollectors.toFuture(toList())); - - assertThat(result).isCompletedExceptionally(); - await().atMost(Duration.ofSeconds(1)) - .until(() -> futures.stream().skip(1).allMatch(CompletableFuture::isCancelled)); - } -} diff --git a/src/test/java/com/pivovarit/collectors/GroupTest.java b/src/test/java/com/pivovarit/collectors/GroupTest.java deleted file mode 100644 index 577c6c49..00000000 --- a/src/test/java/com/pivovarit/collectors/GroupTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.List; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -class GroupTest { - - @Nested - class ConstructorTest { - - @Test - void constructorShouldRejectNullKey() { - assertThatThrownBy(() -> new Group<>(null, List.of(1, 2))) - .isInstanceOf(NullPointerException.class) - .hasMessage("key cannot be null"); - } - - @Test - void constructorShouldRejectNullValues() { - assertThatThrownBy(() -> new Group<>("key", null)) - .isInstanceOf(NullPointerException.class) - .hasMessage("values cannot be null"); - } - } - - @Nested - class FactoryTest { - @Test - void ofShouldCreateInstance() { - Group g = Group.of("k", List.of(1, 2)); - - assertThat(g.key()).isEqualTo("k"); - assertThat(g.values()).containsExactly(1, 2); - } - } - - @Nested - class MapTest { - - @Test - void mapShouldApplyMapperToAllValues() { - var g = new Group<>("numbers", List.of(1, 2, 3)); - - assertThat(g.map((k, i) -> "n" + i)).isEqualTo(Group.of("numbers", List.of("n1", "n2", "n3"))); - } - - @Test - void mapShouldWorkOnEmptyValuesList() { - var g = new Group<>("empty", List.of()); - - assertThat(g.map((k, o) -> o.toString())).isEqualTo(new Group<>("empty", List.of())); - } - - @Test - void mapShouldReturnNewInstanceAndNotMutateOriginal() { - Group g = new Group<>("x", List.of(1, 2)); - - Group mapped = g.map((k, i) -> i * 10); - - assertThat(g.values()).containsExactly(1, 2); - assertThat(mapped.values()).containsExactly(10, 20); - assertThat(mapped).isNotSameAs(g); - } - - @Test - void mapShouldRejectNullMapper() { - Group g = new Group<>("k", List.of(1)); - - assertThatThrownBy(() -> g.map(null)).isInstanceOf(NullPointerException.class); - } - } -} diff --git a/src/test/java/com/pivovarit/collectors/LifecycleTest.java b/src/test/java/com/pivovarit/collectors/LifecycleTest.java deleted file mode 100644 index fdb98e26..00000000 --- a/src/test/java/com/pivovarit/collectors/LifecycleTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.concurrent.Executors; -import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -class LifecycleTest { - - private volatile Dispatcher dispatcher; - - @BeforeEach - void setUp() { - dispatcher = new Dispatcher<>(Executors.newCachedThreadPool()); - } - - @Test - void shouldTerminateCollectingDispatcher() { - var result = Stream.of(1, 2, 3) - .collect(new AsyncParallelCollector<>(i -> i, dispatcher, Stream::toList)).join(); - - assertThat(result).containsExactly(1, 2, 3); - await().until(dispatcher::wasShutdown); - } - - @ParameterizedTest - @ValueSource(booleans = { true, false }) - void shouldTerminateStreamingDispatcher(boolean ordered) { - var result = Stream.of(1, 2, 3) - .collect(new AsyncParallelStreamingCollector<>(i -> i, dispatcher, ordered)).toList(); - - if (ordered) { - assertThat(result).containsExactly(1, 2, 3); - } else { - assertThat(result).containsExactlyInAnyOrder(1, 2, 3); - } - await().until(dispatcher::wasShutdown); - } -} diff --git a/src/test/java/com/pivovarit/collectors/OptionTest.java b/src/test/java/com/pivovarit/collectors/OptionTest.java deleted file mode 100644 index 4c8f889b..00000000 --- a/src/test/java/com/pivovarit/collectors/OptionTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; - -class OptionTest { - - @Test - void shouldThrowOnInvalidParallelism() { - assertThatThrownBy(() -> new ConfigProcessor.Option.Parallelism(0)).isInstanceOf(IllegalArgumentException.class); - } - - @Test - void shouldThrowOnNullExecutor() { - assertThatThrownBy(() -> new ConfigProcessor.Option.ThreadPool(null)).isInstanceOf(NullPointerException.class); - } - - @Test - void shouldRejectExecutorWithDiscardPolicy() { - try (var executor = new ThreadPoolExecutor(2, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy())) { - assertThatThrownBy(() -> new ConfigProcessor.Option.ThreadPool(executor)).isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Executor's RejectedExecutionHandler can't discard tasks"); - } - } - - @Test - void shouldRejectExecutorWithDiscardOldestPolicy() { - try (var executor = new ThreadPoolExecutor(2, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy())) { - assertThatThrownBy(() -> new ConfigProcessor.Option.ThreadPool(executor)).isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Executor's RejectedExecutionHandler can't discard tasks"); - } - } - - @Test - void shouldThrowOnDuplicateBatching() { - var configurer = new CollectingConfigurer(); - configurer.batching(); - assertThatThrownBy(configurer::batching) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'batching' can only be configured once"); - } - - @Test - void shouldThrowOnDuplicateParallelism() { - var configurer = new CollectingConfigurer(); - configurer.parallelism(1); - assertThatThrownBy(() -> configurer.parallelism(2)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'parallelism' can only be configured once"); - } - - @Test - void shouldThrowOnDuplicateExecutor() { - var configurer = new CollectingConfigurer(); - configurer.executor(r -> {}); - assertThatThrownBy(() -> configurer.executor(r -> {})) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'executor' can only be configured once"); - } - - @Test - void shouldThrowOnDuplicateOrdered() { - var configurer = new StreamingConfigurer(); - configurer.ordered(); - assertThatThrownBy(configurer::ordered) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'ordered' can only be configured once"); - } - - @Test - void shouldThrowOnNullExecutorDecorator() { - assertThatThrownBy(() -> new CollectingConfigurer().executorDecorator(null)) - .isInstanceOf(NullPointerException.class); - } - - @Test - void shouldThrowOnNullExecutorDecoratorStreaming() { - assertThatThrownBy(() -> new StreamingConfigurer().executorDecorator(null)) - .isInstanceOf(NullPointerException.class); - } - - @Test - void shouldThrowOnDuplicateExecutorDecorator() { - var configurer = new CollectingConfigurer(); - configurer.executorDecorator(e -> e); - assertThatThrownBy(() -> configurer.executorDecorator(e -> e)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'executor decorator' can only be configured once"); - } - - @Test - void shouldThrowOnDuplicateExecutorDecoratorStreaming() { - var configurer = new StreamingConfigurer(); - configurer.executorDecorator(e -> e); - assertThatThrownBy(() -> configurer.executorDecorator(e -> e)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("'executor decorator' can only be configured once"); - } - - @Test - void collectingConfigurerShouldRejectBatchingWithoutParallelism() { - var configurer = new CollectingConfigurer(); - configurer.batching(); - assertThatThrownBy(configurer::validate) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("batching") - .hasMessageContaining("parallelism"); - } - - @Test - void streamingConfigurerShouldRejectBatchingWithoutParallelism() { - var configurer = new StreamingConfigurer(); - configurer.batching(); - assertThatThrownBy(configurer::validate) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("batching") - .hasMessageContaining("parallelism"); - } - - @Test - void collectingConfigurerValidateShouldAcceptBatchingWithParallelism() { - var configurer = new CollectingConfigurer(); - configurer.batching().parallelism(2); - configurer.validate(); - } - - @Test - void streamingConfigurerValidateShouldAcceptBatchingWithParallelism() { - var configurer = new StreamingConfigurer(); - configurer.batching().parallelism(2); - configurer.validate(); - } - - @Test - void parallelCollectorShouldFailFastOnBatchingWithoutParallelism() { - assertThatThrownBy(() -> ParallelCollectors.parallel(i -> i, c -> c.batching())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("batching") - .hasMessageContaining("parallelism"); - } - - @Test - void parallelToStreamShouldFailFastOnBatchingWithoutParallelism() { - assertThatThrownBy(() -> ParallelCollectors.parallelToStream(i -> i, c -> c.batching())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("batching") - .hasMessageContaining("parallelism"); - } - -} diff --git a/src/test/java/com/pivovarit/collectors/TypeChecks.java b/src/test/java/com/pivovarit/collectors/TypeChecks.java deleted file mode 100644 index 75ac7247..00000000 --- a/src/test/java/com/pivovarit/collectors/TypeChecks.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors; - -import java.util.function.Function; -import java.util.stream.Collector; - -import static java.util.stream.Collectors.toList; - -/** - * This file exists solely to check that the public API - * exposes correct generic bounds (covariance / contravariance). - * It must compile. It is not intended to run. - */ -@SuppressWarnings({"unused"}) -final class TypeChecks { - - private TypeChecks() { - } - - static class SuperClass { - } - - static class SubClass extends SuperClass { - } - - private static void expectCollector(Collector c) { - // compile-only - } - - static final class Covariance { - - record Functions( - Function subToSuper, - Function subToSub - ) { - } - - private static final Functions fns = - new Functions(x -> x, x -> x); - - record Parallel(Functions f) { - Parallel { - expectCollector(ParallelCollectors.parallel(f.subToSuper, r -> {})); - expectCollector(ParallelCollectors.parallel(f.subToSub, r -> {})); - - expectCollector(ParallelCollectors.parallel(f.subToSuper, 42)); - expectCollector(ParallelCollectors.parallel(f.subToSub, 42)); - - expectCollector(ParallelCollectors.parallel(f.subToSuper)); - expectCollector(ParallelCollectors.parallel(f.subToSub)); - } - } - - record ParallelBy(Functions f) { - ParallelBy { - expectCollector(ParallelCollectors.parallelBy(f.subToSuper, f.subToSuper, c -> {})); - expectCollector(ParallelCollectors.parallelBy(f.subToSuper, f.subToSub, c -> {})); - expectCollector(ParallelCollectors.parallelBy(f.subToSub, f.subToSuper, c -> {})); - expectCollector(ParallelCollectors.parallelBy(f.subToSub, f.subToSub, c -> {})); - } - } - - record ParallelToList(Functions f) { - ParallelToList { - expectCollector(ParallelCollectors.parallel(f.subToSuper, c -> {}, toList())); - expectCollector(ParallelCollectors.parallel(f.subToSub, c -> {}, toList())); - } - } - - record ParallelByToList(Functions f) { - ParallelByToList { - expectCollector(ParallelCollectors.parallelBy(f.subToSuper, f.subToSuper, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(f.subToSuper, f.subToSub, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(f.subToSub, f.subToSuper, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(f.subToSub, f.subToSub, c -> {}, toList())); - } - } - - record Streaming(Functions f) { - Streaming { - expectCollector(ParallelCollectors.parallelToStream(f.subToSuper, c -> {})); - expectCollector(ParallelCollectors.parallelToStream(f.subToSub, c -> {})); - } - } - - record StreamingBy(Functions f) { - StreamingBy { - expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSuper, f.subToSuper, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSuper, f.subToSub, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSub, f.subToSuper, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(f.subToSub, f.subToSub, c -> {})); - } - } - } - - static final class Contravariance { - - private static final Function superToSub = x -> new SubClass(); - private static final Function objToSub = x -> new SubClass(); - - record Parallel() { - Parallel { - expectCollector(ParallelCollectors.parallel(superToSub, c -> {})); - expectCollector(ParallelCollectors.parallel(objToSub, c -> {})); - - expectCollector(ParallelCollectors.parallel(superToSub, 42)); - expectCollector(ParallelCollectors.parallel(objToSub, 42)); - - expectCollector(ParallelCollectors.parallel(superToSub)); - expectCollector(ParallelCollectors.parallel(objToSub)); - } - } - - record ParallelBy() { - ParallelBy { - expectCollector(ParallelCollectors.parallelBy(superToSub, superToSub, c -> {})); - expectCollector(ParallelCollectors.parallelBy(superToSub, objToSub, c -> {})); - expectCollector(ParallelCollectors.parallelBy(objToSub, superToSub, c -> {})); - expectCollector(ParallelCollectors.parallelBy(objToSub, objToSub, c -> {})); - } - } - - record ParallelToList() { - ParallelToList { - expectCollector(ParallelCollectors.parallel(superToSub, c -> {}, toList())); - expectCollector(ParallelCollectors.parallel(objToSub, c -> {}, toList())); - } - } - - record ParallelByToList() { - ParallelByToList { - expectCollector(ParallelCollectors.parallelBy(superToSub, superToSub, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(superToSub, objToSub, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(objToSub, superToSub, c -> {}, toList())); - expectCollector(ParallelCollectors.parallelBy(objToSub, objToSub, c -> {}, toList())); - } - } - - record Streaming() { - Streaming { - expectCollector(ParallelCollectors.parallelToStream(superToSub, c -> {})); - expectCollector(ParallelCollectors.parallelToStream(objToSub, c -> {})); - } - } - - record StreamingBy() { - StreamingBy { - expectCollector(ParallelCollectors.parallelToStreamBy(superToSub, superToSub, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(superToSub, objToSub, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(objToSub, superToSub, c -> {})); - expectCollector(ParallelCollectors.parallelToStreamBy(objToSub, objToSub, c -> {})); - } - } - } -} diff --git a/src/test/java/com/pivovarit/collectors/benchmark/BatchedVsNonBatchedBenchmark.java b/src/test/java/com/pivovarit/collectors/benchmark/BatchedVsNonBatchedBenchmark.java deleted file mode 100644 index 677cd719..00000000 --- a/src/test/java/com/pivovarit/collectors/benchmark/BatchedVsNonBatchedBenchmark.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors.benchmark; - -import com.pivovarit.collectors.ParallelCollectors; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.IntStream; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.runner.RunnerException; - -import static java.util.stream.Collectors.toList; - -public class BatchedVsNonBatchedBenchmark { - - @State(Scope.Benchmark) - public static class BenchmarkState { - - @Param({"1", "10", "100", "1000"}) - public int parallelism; - - private volatile ExecutorService executor; - - @Setup(Level.Trial) - public void setup() { - executor = Executors.newFixedThreadPool(1000); - } - - @TearDown(Level.Trial) - public void tearDown() { - executor.shutdown(); - } - } - - private static final List source = IntStream.range(0, 1000) - .boxed() - .toList(); - - @Benchmark - public List parallel_collect(BenchmarkState state) { - return source.stream() - .collect(ParallelCollectors.parallel(i -> i, c -> c.executor(state.executor).parallelism(state.parallelism), toList())) - .join(); - } - - @Benchmark - public List parallel_batch_collect(BenchmarkState state) { - return source.stream() - .collect(ParallelCollectors.parallel(i -> i, c -> c.batching().executor(state.executor).parallelism(state.parallelism), toList())) - .join(); - } - - @Benchmark - public List parallel_streaming(BenchmarkState state) { - return source.stream() - .collect(ParallelCollectors.parallelToStream(i -> i, c -> c.batching().executor(state.executor).parallelism(state.parallelism))) - .toList(); - } - - @Benchmark - public List parallel_batch_streaming_collect(BenchmarkState state) { - return source.stream() - .collect(ParallelCollectors.parallelToStream(i -> i, c -> c.batching().executor(state.executor).parallelism(state.parallelism))) - .toList(); - } - - public static void main(String[] args) throws RunnerException { - Benchmarks.run(BatchedVsNonBatchedBenchmark.class); - } -} diff --git a/src/test/java/com/pivovarit/collectors/benchmark/Benchmarks.java b/src/test/java/com/pivovarit/collectors/benchmark/Benchmarks.java deleted file mode 100644 index b3446fbd..00000000 --- a/src/test/java/com/pivovarit/collectors/benchmark/Benchmarks.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2014-2026 Grzegorz Piwowarek, https://4comprehension.com/ - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pivovarit.collectors.benchmark; - -import java.nio.file.Path; -import org.openjdk.jmh.results.format.ResultFormatType; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.OptionsBuilder; - -final class Benchmarks { - - private Benchmarks() { - } - - private static final Path BENCHMARKS_PATH = Path.of("src/test/resources/benchmarks/"); - - static void run(Class clazz) throws RunnerException { - new Runner(new OptionsBuilder() - .include(clazz.getSimpleName()) - .warmupIterations(3) - .measurementIterations(5) - .resultFormat(ResultFormatType.JSON) - .result(Benchmarks.BENCHMARKS_PATH.resolve("%s.json".formatted(clazz.getSimpleName())).toString()) - .forks(1) - .build()).run(); - } -}