Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ endif::[]

== 0.5.3.4

=== Features

* feature: add options to enable virtual threads

=== Improvements

* refactor: migrate synchronized into ReentrantLock

=== Fixes

* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
Expand Down
8 changes: 8 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,14 @@ endif::[]

== 0.5.3.4

=== Features

* feature: add options to enable virtual threads

=== Improvements

* refactor: migrate synchronized into ReentrantLock

=== Fixes

* fix: keep instantiated OffsetMapCodecManager so that metrics will not be recreated every commit (#859)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

@UtilityClass
Expand All @@ -16,15 +17,19 @@ public class SupplierUtils {
public static <T> Supplier<T> memoize(Supplier<T> delegate) {
Objects.requireNonNull(delegate);
AtomicReference<T> value = new AtomicReference<>();
ReentrantLock lock = new ReentrantLock();
return () -> {
T val = value.get();
if (val == null) {
synchronized (value) {
lock.lock();
try {
val = value.get();
if (val == null) {
val = Objects.requireNonNull(delegate.get());
value.set(val);
}
} finally {
lock.unlock();
}
}
return val;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,21 @@ public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

transactionsValidation();
validateVirtualThreadsSupport();
}

private void validateVirtualThreadsSupport() {
if (useVirtualThreads) {
try {
// Thread.ofVirtual(), Executors.newThreadPerTaskExecutor(ThreadFactory)
Class.forName("java.lang.Thread").getMethod("ofVirtual");
java.util.concurrent.Executors.class.getMethod("newThreadPerTaskExecutor", java.util.concurrent.ThreadFactory.class);
} catch (NoSuchMethodException | ClassNotFoundException e) {
throw new UnsupportedOperationException(
"useVirtualThreads is enabled, but the current JVM does not support Virtual Threads or the required ExecutorService."
);
}
}
}

private void transactionsValidation() {
Expand Down Expand Up @@ -570,4 +585,14 @@ public boolean isProducerSupplied() {
*/
@Builder.Default
public final boolean ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck = false;

/**
* Whether to use Virtual Threads (Project Loom) for the worker pool. This feature requires running on JDK 21 or
* higher. If enabled on an older JDK the system will throw an exception during initialization.
* <p>
* Note: When enabled, this overrides any custom {@link #managedExecutorService} or {@link #managedThreadFactory}
* settings, as it creates a specific ExecutorService for virtual threads.
*/
@Builder.Default
private final boolean useVirtualThreads = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -116,7 +117,7 @@ public Duration getTimeBetweenCommits() {
* The pool which is used for running the users' supplied function
*/
@Getter(PROTECTED)
protected final Supplier<ThreadPoolExecutor> workerThreadPool;
protected final Supplier<ExecutorService> workerThreadPool;

private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();

Expand Down Expand Up @@ -196,6 +197,8 @@ public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> work) {
*/
private final AtomicBoolean commitCommand = new AtomicBoolean(false);

private final ReentrantLock commitLock = new ReentrantLock();

/**
* Multiple of {@link ParallelConsumerOptions#getMaxConcurrency()} to have in our processing queue, in order to make
* sure threads always have work to do.
Expand Down Expand Up @@ -298,7 +301,17 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp

this.dynamicExtraLoadFactor = module.dynamicExtraLoadFactor();

workerThreadPool = SupplierUtils.memoize(() -> setupWorkerPool(newOptions.getMaxConcurrency()));
workerThreadPool = SupplierUtils.memoize(() -> {
ExecutorService executor = setupWorkerPool(newOptions.getMaxConcurrency());

if (!options.isUseVirtualThreads() && !(executor instanceof ThreadPoolExecutor)) {
throw new IllegalStateException(
"When Virtual Threads are disabled, the worker pool must be an instance of ThreadPoolExecutor. " +
"If you are using a custom ExecutorService, make sure it extends ThreadPoolExecutor."
);
}
return executor;
});

this.wm = module.workManager();

Expand Down Expand Up @@ -345,7 +358,24 @@ private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Cons
}
}

protected ThreadPoolExecutor setupWorkerPool(int poolSize) {
protected ExecutorService setupWorkerPool(int poolSize) {
if (options.isUseVirtualThreads()) {
try {
// Thread.ofVirtual().name("pc-vt-").factory()
Object builder = Class.forName("java.lang.Thread").getMethod("ofVirtual").invoke(null);
Class<?> builderClass = Class.forName("java.lang.Thread$Builder");
builderClass.getMethod("name", String.class, long.class).invoke(builder, "pc-vt-", 0);
ThreadFactory factory = (ThreadFactory) builderClass.getMethod("factory").invoke(builder);

// Executors.newThreadPerTaskExecutor(factory)
return (ExecutorService) java.util.concurrent.Executors.class
.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(null, factory);
} catch (Exception e) {
throw new IllegalStateException("Virtual threads not supported on this JVM", e);
}
}

ThreadFactory defaultFactory;
try {
defaultFactory = InitialContext.doLookup(options.getManagedThreadFactory());
Expand Down Expand Up @@ -679,11 +709,11 @@ private void innerDoClose(Duration timeout) throws TimeoutException, ExecutionEx

log.debug("Shutting down execution pool...");
//Clear scheduled but not started work in execution pool
workerThreadPool.get().getQueue().clear();
clearWorkerQueue();
//request graceful shutdown
workerThreadPool.get().shutdown();
if (workerThreadPool.get().getActiveCount() > 0) {
log.info("Inflight work in execution pool: {}, letting to finish on shutdown with timeout: {}", workerThreadPool.get().getActiveCount(), timeout);
if (getPoolActiveCount() > 0) {
log.info("Inflight work in execution pool: {}, letting to finish on shutdown with timeout: {}", getPoolActiveCount(), timeout);
}

log.debug("Awaiting worker pool termination...");
Expand All @@ -708,8 +738,8 @@ private void innerDoClose(Duration timeout) throws TimeoutException, ExecutionEx
}
awaitingInflightProcessingCompletionOnShutdown.getAndSet(false);

if (workerThreadPool.get().getActiveCount() > 0) {
log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", workerThreadPool.get().getActiveCount());
if (getPoolActiveCount() > 0) {
log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", getPoolActiveCount());
}
log.debug("Worker pool terminated.");

Expand Down Expand Up @@ -986,7 +1016,7 @@ private <R> int retrieveAndDistributeNewWork(final Function<PollContextInternal<
queueStatsLimiter.performIfNotLimited(() -> {
int queueSize = getNumberOfUserFunctionsQueued();
log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}",
workerThreadPool.get().getActiveCount(), queueSize, queueSize, getPoolLoadTarget(), dynamicExtraLoadFactor.getCurrentFactor());
getPoolActiveCount(), queueSize, queueSize, getPoolLoadTarget(), dynamicExtraLoadFactor.getCurrentFactor());
});

return gotWorkCount;
Expand Down Expand Up @@ -1105,7 +1135,12 @@ protected int getTargetOutForProcessing() {

protected int getQueueTargetLoaded() {
//noinspection unchecked
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor();
ExecutorService executor = workerThreadPool.get();
if (executor instanceof ThreadPoolExecutor) {
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor();
} else {
return getPoolLoadTarget();
}
}

/**
Expand Down Expand Up @@ -1140,12 +1175,22 @@ private int getPoolLoadTarget() {
}

private boolean isPoolQueueLow() {
int queueSize = getNumberOfUserFunctionsQueued();
int queueTarget = getPoolLoadTarget();
boolean workAmountBelowTarget = queueSize <= queueTarget;
log.debug("isPoolQueueLow()? workAmountBelowTarget {} {} vs {};",
workAmountBelowTarget, queueSize, queueTarget);
return workAmountBelowTarget;
ExecutorService executor = workerThreadPool.get();

// Platform Threads
if (executor instanceof ThreadPoolExecutor) {
int queueSize = getNumberOfUserFunctionsQueued();
int queueTarget = getPoolLoadTarget();
boolean workAmountBelowTarget = queueSize <= queueTarget;
log.debug("isPoolQueueLow()? workAmountBelowTarget {} {} vs {};",
workAmountBelowTarget, queueSize, queueTarget);
return workAmountBelowTarget;
}

// Virtual Threads
int activeCount = wm.getNumberRecordsOutForProcessing();
int maxConcurrency = options.getTargetAmountOfRecordsInFlight();
return activeCount <= maxConcurrency;
}

private void drain() {
Expand Down Expand Up @@ -1183,7 +1228,7 @@ protected void processWorkCompleteMailBox(final Duration timeToBlockFor) {
currentlyPollingWorkCompleteMailBox.getAndSet(true);
if (log.isDebugEnabled()) {
log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}",
timeToBlockFor, workerThreadPool.get().getActiveCount(), getNumberOfUserFunctionsQueued());
timeToBlockFor, getPoolActiveCount(), getNumberOfUserFunctionsQueued());
}
// wait for work, with a timeToBlockFor for sanity
log.trace("Blocking poll {}", timeToBlockFor);
Expand Down Expand Up @@ -1278,7 +1323,14 @@ protected boolean isTimeToCommitNow() {
}

private int getNumberOfUserFunctionsQueued() {
return workerThreadPool.get().getQueue().size();
// Platform Threads
ExecutorService executor = workerThreadPool.get();
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getQueue().size();
}

// Virtual Threads does not queue.
return 0;
}


Expand All @@ -1305,12 +1357,15 @@ private Duration getTimeSinceLastCheck() {
* Visible for testing
*/
protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException {
log.trace("Synchronizing on commitCommand...");
synchronized (commitCommand) {
log.trace("Synchronizing on commitLock...");
commitLock.lock();
try {
log.debug("Committing offsets that are ready...");
committer.retrieveOffsetsAndCommit();
clearCommitCommand();
this.lastCommitTime = Instant.now();
} finally {
commitLock.unlock();
}
}

Expand Down Expand Up @@ -1483,8 +1538,11 @@ public void setLongPollTimeout(Duration ofMillis) {
*/
public void requestCommitAsap() {
log.debug("Registering command to commit next chance");
synchronized (commitCommand) {
commitLock.lock();
try {
this.commitCommand.set(true);
} finally {
commitLock.unlock();
}
notifySomethingToDo();
}
Expand Down Expand Up @@ -1517,18 +1575,45 @@ public void resumeIfPaused() {
}

private boolean isCommandedToCommit() {
synchronized (commitCommand) {
commitLock.lock();
try {
return this.commitCommand.get();
} finally {
commitLock.unlock();
}
}

private void clearCommitCommand() {
synchronized (commitCommand) {
commitLock.lock();
try {
if (commitCommand.get()) {
log.debug("Command to commit asap received, clearing");
this.commitCommand.set(false);
}
} finally {
commitLock.unlock();
}
}

private int getPoolActiveCount() {
ExecutorService executor = workerThreadPool.get();

// Platform Threads
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getActiveCount();
}

// Virtual Threads
return wm.getNumberRecordsOutForProcessing();
}

private void clearWorkerQueue() {
// Platform Threads
ExecutorService executor = workerThreadPool.get();
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).getQueue().clear();
}
// Virtual Threads do not have a task queue that can be cleared.
}

}
Loading