diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java index 7b877a197..71ba22d62 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java @@ -83,4 +83,12 @@ public boolean usesDistribution() { public int testDurationMinutes; public int warmupDurationMinutes = 1; + + /** + * If > 0, after warmup the consumers are paused for exactly this many seconds before being + * automatically resumed and the measurement window starts. Use this when you want time-based + * backlog accumulation (e.g. to ensure data is offloaded to tiered storage before consumers + * start draining), as an alternative to size-based {@link #consumerBacklogSizeGB}. + */ + public int consumeDelaySeconds = 0; } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java index 86a2b1a73..97b8d1aea 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java @@ -164,6 +164,7 @@ public TestResult run() throws Exception { } worker.resetStats(); + applyConsumeDelayIfConfigured(); log.info("----- Starting benchmark traffic ({}m)------", workload.testDurationMinutes); TestResult result = printAndCollectStats(workload.testDurationMinutes, TimeUnit.MINUTES); @@ -253,6 +254,23 @@ public void close() throws Exception { executor.shutdownNow(); } + private void applyConsumeDelayIfConfigured() throws IOException { + if (workload.consumeDelaySeconds <= 0) { + return; + } + worker.pauseConsumers(); + log.info("----- Delaying consumption for {}s before measurement window -----", + workload.consumeDelaySeconds); + try { + Thread.sleep(workload.consumeDelaySeconds * 1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + worker.resumeConsumers(); + log.info("----- Consumers resumed after {}s delay -----", workload.consumeDelaySeconds); + } + private void createConsumers(List topics) throws IOException { ConsumerAssignment consumerAssignment = new ConsumerAssignment();