From 35dd2ea5c73a0f7dcdc8ee4d632afff0e8f3c043 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 28 Apr 2026 11:25:56 +0800 Subject: [PATCH] feat: add consumeDelaySeconds for time-based consumer delay Introduces a workload-level consumeDelaySeconds option. When set, after warmup the framework pauses all consumers for the configured number of seconds and then automatically resumes them before the measurement window starts. This enables tiered-storage style scenarios where backlog must accumulate and be offloaded before consumers begin draining, without requiring an external coordinator. --- .../io/openmessaging/benchmark/Workload.java | 8 ++++++++ .../benchmark/WorkloadGenerator.java | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java index 7b877a197..71ba22d62 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java @@ -83,4 +83,12 @@ public boolean usesDistribution() { public int testDurationMinutes; public int warmupDurationMinutes = 1; + + /** + * If > 0, after warmup the consumers are paused for exactly this many seconds before being + * automatically resumed and the measurement window starts. Use this when you want time-based + * backlog accumulation (e.g. to ensure data is offloaded to tiered storage before consumers + * start draining), as an alternative to size-based {@link #consumerBacklogSizeGB}. + */ + public int consumeDelaySeconds = 0; } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java index 86a2b1a73..97b8d1aea 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java @@ -164,6 +164,7 @@ public TestResult run() throws Exception { } worker.resetStats(); + applyConsumeDelayIfConfigured(); log.info("----- Starting benchmark traffic ({}m)------", workload.testDurationMinutes); TestResult result = printAndCollectStats(workload.testDurationMinutes, TimeUnit.MINUTES); @@ -253,6 +254,23 @@ public void close() throws Exception { executor.shutdownNow(); } + private void applyConsumeDelayIfConfigured() throws IOException { + if (workload.consumeDelaySeconds <= 0) { + return; + } + worker.pauseConsumers(); + log.info("----- Delaying consumption for {}s before measurement window -----", + workload.consumeDelaySeconds); + try { + Thread.sleep(workload.consumeDelaySeconds * 1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + worker.resumeConsumers(); + log.info("----- Consumers resumed after {}s delay -----", workload.consumeDelaySeconds); + } + private void createConsumers(List topics) throws IOException { ConsumerAssignment consumerAssignment = new ConsumerAssignment();