From 5bf7b4c0e30dd3baf9c1ed796bfbd8f84a695075 Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Tue, 28 Oct 2025 13:49:37 +0100 Subject: [PATCH] feat: add possibility to have multiple input paylaods with headers We can now pass an entire folder as input data This will also read headers along with the actual data. This also includes a simple script to generator a set of payloads with some variability in content Signed-off-by: Charly Molter --- .../benchmark/WorkloadGenerator.java | 9 +- .../utils/payload/FilePayloadReader.java | 50 --- .../utils/payload/PayloadReader.java | 104 ++++- .../benchmark/worker/LocalWorker.java | 3 +- .../benchmark/worker/MessageProducer.java | 7 +- .../benchmark/worker/WorkerHandler.java | 2 +- .../benchmark/worker/commands/Payload.java | 76 ++++ .../commands/ProducerWorkAssignment.java | 2 +- bin/generate_payloads.sh | 357 ++++++++++++++++++ .../benchmark/driver/BenchmarkProducer.java | 15 + driver-kafka/kafka-compression-lz4.yaml | 2 +- .../driver/kafka/KafkaBenchmarkProducer.java | 14 +- payload/multiple/base | 1 + .../multiple/base64_with_headers.payload.yaml | 4 + .../base64nonjson_with_headers.payload.yaml | 4 + payload/multiple/simple.payload.yaml | 1 + payload/multiple/with_headers.payload.yaml | 4 + .../with_headers_and_json.payload.yaml | 5 + workloads/multi-payloads.yaml | 29 ++ 19 files changed, 623 insertions(+), 66 deletions(-) delete mode 100644 benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java create mode 100644 benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java create mode 100755 bin/generate_payloads.sh create mode 100644 payload/multiple/base create mode 100644 payload/multiple/base64_with_headers.payload.yaml create mode 100644 payload/multiple/base64nonjson_with_headers.payload.yaml create mode 100644 payload/multiple/simple.payload.yaml create mode 100644 payload/multiple/with_headers.payload.yaml create mode 100644 payload/multiple/with_headers_and_json.payload.yaml create mode 100644 workloads/multi-payloads.yaml diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java index dc91353ee..600619404 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java @@ -19,12 +19,12 @@ import io.openmessaging.benchmark.utils.PaddingDecimalFormat; import io.openmessaging.benchmark.utils.RandomGenerator; import io.openmessaging.benchmark.utils.Timer; -import io.openmessaging.benchmark.utils.payload.FilePayloadReader; import io.openmessaging.benchmark.utils.payload.PayloadReader; import io.openmessaging.benchmark.worker.Worker; import io.openmessaging.benchmark.worker.commands.ConsumerAssignment; import io.openmessaging.benchmark.worker.commands.CountersStats; import io.openmessaging.benchmark.worker.commands.CumulativeLatencies; +import io.openmessaging.benchmark.worker.commands.Payload; import io.openmessaging.benchmark.worker.commands.PeriodStats; import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment; import io.openmessaging.benchmark.worker.commands.TopicSubscription; @@ -95,8 +95,6 @@ public TestResult run() throws Exception { }); } - final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize); - ProducerWorkAssignment producerWorkAssignment = new ProducerWorkAssignment(); producerWorkAssignment.keyDistributorType = workload.keyDistributor; producerWorkAssignment.publishRate = targetPublishRate; @@ -113,10 +111,11 @@ public TestResult run() throws Exception { r.nextBytes(randArray); byte[] zerodArray = new byte[zerodBytes]; byte[] combined = ArrayUtils.addAll(randArray, zerodArray); - producerWorkAssignment.payloadData.add(combined); + producerWorkAssignment.payloadData.add(new Payload(combined)); } } else { - producerWorkAssignment.payloadData.add(payloadReader.load(workload.payloadFile)); + final PayloadReader payloadReader = new PayloadReader(workload.messageSize); + producerWorkAssignment.payloadData = payloadReader.load(workload.payloadFile); } worker.startLoad(producerWorkAssignment); diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java deleted file mode 100644 index 2fe60e052..000000000 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.benchmark.utils.payload; - -import static java.nio.file.Files.readAllBytes; - -import java.io.File; -import java.io.IOException; -import java.text.MessageFormat; - -public class FilePayloadReader implements PayloadReader { - - private final int expectedLength; - - public FilePayloadReader(int expectedLength) { - this.expectedLength = expectedLength; - } - - @Override - public byte[] load(String resourceName) { - byte[] payload; - try { - payload = readAllBytes(new File(resourceName).toPath()); - checkPayloadLength(payload); - return payload; - } catch (IOException e) { - throw new PayloadException(e.getMessage()); - } - } - - private void checkPayloadLength(byte[] payload) { - if (expectedLength != payload.length) { - throw new PayloadException( - MessageFormat.format( - "Payload length mismatch. Actual is: {0}, but expected: {1} ", - payload.length, expectedLength)); - } - } -} diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java index 97d82aa80..e835ec995 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +26,94 @@ */ package io.openmessaging.benchmark.utils.payload; -public interface PayloadReader { +import static java.nio.file.Files.readAllBytes; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.openmessaging.benchmark.worker.commands.Payload; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PayloadReader { + private static final Logger log = LoggerFactory.getLogger(PayloadReader.class); + public final int expectedLength; + private static final ObjectMapper mapper = + new ObjectMapper( + new YAMLFactory().configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + static { + mapper.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE); + } + + public PayloadReader(int expectedLength) { + this.expectedLength = expectedLength; + } + + private void checkPayloadLength(List payload) { + if (payload.size() == 1 && payload.get(0).data.length != expectedLength) { + throw new PayloadException( + MessageFormat.format( + "payload length mismatch. Actual payload size is: {0}, but expected: {1} ", + payload.get(0).data.length, expectedLength)); + } + int total = 0; + for (Payload p : payload) { + total += p.data.length; + } + int avg = total / payload.size(); + int expect10p = (int) (expectedLength * 0.1); + if (expectedLength - expect10p < avg || expectedLength + expect10p > avg) { + log.warn( + "Average payload length {} differs from expected length {} by over 10% " + + "this means that throughput target maybe incorrect", + avg, expectedLength); + } + } - byte[] load(String resourceName); + public List load(String payloadFile) { + List out = new ArrayList<>(); + try { + File f = new File(payloadFile); + if (Files.isDirectory(f.toPath())) { + File[] files = f.listFiles(); + if (files == null) { + throw new PayloadException("list files returned null for file " + payloadFile); + } + for (File file : files) { + if (!file.isFile()) { + continue; + } + if (file.getName().endsWith(".payload.yaml")) { + Payload p = mapper.readValue(file, Payload.class); + log.info( + "Loaded payload from file file='{}', headers='{}', data='{}'", + file.getAbsolutePath(), + p.headers, + new String(p.data, StandardCharsets.UTF_8)); + out.add(p); + } else { + byte[] payload = readAllBytes(file.toPath()); + out.add(new Payload(payload)); + } + } + } else { + byte[] payload = readAllBytes(f.toPath()); + out.add(new Payload(payload)); + } + checkPayloadLength(out); + return out; + } catch (IOException e) { + throw new PayloadException(e.getMessage()); + } + } } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java index b968b5ce7..1faaeb856 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java @@ -36,6 +36,7 @@ import io.openmessaging.benchmark.worker.commands.ConsumerAssignment; import io.openmessaging.benchmark.worker.commands.CountersStats; import io.openmessaging.benchmark.worker.commands.CumulativeLatencies; +import io.openmessaging.benchmark.worker.commands.Payload; import io.openmessaging.benchmark.worker.commands.PeriodStats; import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment; import io.openmessaging.benchmark.worker.commands.TopicsInfo; @@ -194,7 +195,7 @@ public void probeProducers() throws IOException { } private void submitProducersToExecutor( - List producers, KeyDistributor keyDistributor, List payloads) { + List producers, KeyDistributor keyDistributor, List payloads) { ThreadLocalRandom r = ThreadLocalRandom.current(); int payloadCount = payloads.size(); executor.submit( diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java index bf1191ef9..6c208f1d1 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java @@ -17,6 +17,7 @@ import io.openmessaging.benchmark.driver.BenchmarkProducer; import io.openmessaging.benchmark.utils.UniformRateLimiter; +import io.openmessaging.benchmark.worker.commands.Payload; import java.util.Optional; import java.util.function.Supplier; import org.slf4j.Logger; @@ -38,13 +39,13 @@ public class MessageProducer { this.stats = stats; } - public void sendMessage(BenchmarkProducer producer, Optional key, byte[] payload) { + public void sendMessage(BenchmarkProducer producer, Optional key, Payload payload) { final long intendedSendTime = rateLimiter.acquire(); uninterruptibleSleepNs(intendedSendTime); final long sendTime = nanoClock.get(); producer - .sendAsync(key, payload) - .thenRun(() -> success(payload.length, intendedSendTime, sendTime)) + .sendAsync(key, payload.data, payload.headers) + .thenRun(() -> success(payload.data.length, intendedSendTime, sendTime)) .exceptionally(this::failure); } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java index f7142cf9f..352903d60 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java @@ -116,7 +116,7 @@ private void handleStartLoad(Context ctx) throws Exception { log.info( "Start load publish-rate: {} msg/s -- payload-size: {}", producerWorkAssignment.publishRate, - producerWorkAssignment.payloadData.get(0).length); + producerWorkAssignment.payloadData.get(0).data.length); localWorker.startLoad(producerWorkAssignment); } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java new file mode 100644 index 000000000..bb0d1e008 --- /dev/null +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.worker.commands; + + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import javax.annotation.Nullable; + +public class Payload { + @JsonDeserialize(using = FlexibleByteArrayDeserializer.class) + public byte[] data; + + @Nullable public Map headers; + + public Payload() {} + + public Payload(byte[] data) { + this.data = data; + } +} + +class FlexibleByteArrayDeserializer extends JsonDeserializer { + @Override + public byte[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonToken token = p.currentToken(); + + if (token == JsonToken.VALUE_STRING) { + try { + return Base64.getDecoder().decode(p.getValueAsString()); + } catch (IllegalArgumentException e) { + // Not Base64, treat as UTF-8 string + } + return p.getValueAsString().getBytes(StandardCharsets.UTF_8); + } else if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { + // Handle as JSON object/array - serialize it + JsonNode node = p.readValueAsTree(); + return new JsonMapper().writeValueAsBytes(node); + } + + throw new IOException("Cannot deserialize byte[] from " + token); + } +} diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java index 9506ef7cb..725104054 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java @@ -19,7 +19,7 @@ public class ProducerWorkAssignment { - public List payloadData; + public List payloadData; public double publishRate; diff --git a/bin/generate_payloads.sh b/bin/generate_payloads.sh new file mode 100755 index 000000000..314ec523b --- /dev/null +++ b/bin/generate_payloads.sh @@ -0,0 +1,357 @@ +#!/usr/bin/env bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -euo pipefail + +# Generate benchmark payload files with controlled variability and headers +# +# Dependencies: jq + +show_help() { + cat << EOF +Usage: $(basename "$0") [OPTIONS] + +Generate benchmark payload files with controlled variability and headers. + +OPTIONS: + -o, --output-dir DIR Output directory for payload files (required) + -n, --num-files NUM Number of payload files to generate (required) + -s, --size BYTES Target size of each payload in bytes (required) + -H, --headers HEADERS Headers to add to each payload + Format: "header1=value1:percentage1,value2:percentage2;header2=value3:percentage3,value4:percentage4" + Example: "environment=production:70,staging:30;region=us-east:50,us-west:50" + Percentages for each header should sum to 100 + -e, --entropy INT Entropy/variability level (0-100, default: 50) + 0 = identical payloads + 100 = maximum variability + -h, --help Show this help message + +EXAMPLES: + # Generate 100 files with 1KB payloads and medium entropy + $(basename "$0") -o payloads -n 100 -s 1024 -e 50 + + # Generate with header distribution (70% production, 30% staging) + $(basename "$0") -o payloads -n 100 -s 1024 -H "environment=production:70,staging:30" -e 50 + + # Generate with multiple headers + $(basename "$0") -o payloads -n 100 -s 1024 -H "environment=production:70,staging:30;region=us-east:50,us-west:50" -e 50 + + # Generate with low variability (almost identical payloads) + $(basename "$0") -o payloads -n 50 -s 512 -e 10 + + # Generate with high variability + $(basename "$0") -o payloads -n 50 -s 512 -e 90 + +EOF +} + +# Default values +OUTPUT_DIR="" +NUM_FILES="" +SIZE="" +HEADERS="" +ENTROPY="50" + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + -o|--output-dir) + OUTPUT_DIR="$2" + shift 2 + ;; + -n|--num-files) + NUM_FILES="$2" + shift 2 + ;; + -s|--size) + SIZE="$2" + shift 2 + ;; + -H|--headers) + HEADERS="$2" + shift 2 + ;; + -e|--entropy) + ENTROPY="$2" + shift 2 + ;; + -h|--help) + show_help + exit 0 + ;; + *) + echo "Error: Unknown option: $1" >&2 + show_help + exit 1 + ;; + esac +done + +# Validate required arguments +if [[ -z "$OUTPUT_DIR" ]] || [[ -z "$NUM_FILES" ]] || [[ -z "$SIZE" ]]; then + echo "Error: Missing required arguments" >&2 + show_help + exit 1 +fi + +# Validate numeric arguments +if ! [[ "$NUM_FILES" =~ ^[0-9]+$ ]] || [[ "$NUM_FILES" -lt 1 ]]; then + echo "Error: Number of files must be a positive integer" >&2 + exit 1 +fi + +if ! [[ "$SIZE" =~ ^[0-9]+$ ]] || [[ "$SIZE" -lt 1 ]]; then + echo "Error: Size must be a positive integer" >&2 + exit 1 +fi + +# Validate entropy +if ! [[ "$ENTROPY" =~ ^[0-9]+$ ]] || [[ "$ENTROPY" -lt 0 ]] || [[ "$ENTROPY" -gt 100 ]]; then + echo "Error: Entropy must be an integer between 0 and 100" >&2 + exit 1 +fi + +# Check dependencies +if ! command -v jq &> /dev/null; then + echo "Error: jq is required but not installed" >&2 + exit 1 +fi + +# Create output directory +mkdir -p "$OUTPUT_DIR" + +echo "Generating $NUM_FILES payload files in $OUTPUT_DIR" +echo "Target payload size: $SIZE bytes" +echo "Entropy: $ENTROPY" +if [[ -n "$HEADERS" ]]; then + echo "Headers: $HEADERS" +fi +echo + +# Parse headers into parallel arrays +# Format: "header1=value1:percentage1,value2:percentage2;header2=value3:percentage3,value4:percentage4" +declare -a HEADER_NAMES +declare -a HEADER_VALUE_SPECS + +if [[ -n "$HEADERS" ]]; then + IFS=';' read -ra HEADER_SPECS <<< "$HEADERS" + for header_spec in "${HEADER_SPECS[@]}"; do + IFS='=' read -ra HEADER_PARTS <<< "$header_spec" + header_name="${HEADER_PARTS[0]}" + values_spec="${HEADER_PARTS[1]}" + HEADER_NAMES+=("$header_name") + HEADER_VALUE_SPECS+=("$values_spec") + done +fi + +# Function to select weighted random value for a given header's value spec +select_header_value() { + local values_spec="$1" + + declare -a value_list + declare -a percentage_list + + IFS=',' read -ra PAIRS <<< "$values_spec" + for pair in "${PAIRS[@]}"; do + IFS=':' read -ra PARTS <<< "$pair" + value="${PARTS[0]}" + percentage="${PARTS[1]:-50}" + value_list+=("$value") + percentage_list+=("$percentage") + done + + local rand=$((RANDOM % 100)) + local cumulative=0 + + for i in "${!value_list[@]}"; do + cumulative=$((cumulative + percentage_list[i])) + + if [[ $rand -lt $cumulative ]]; then + echo "${value_list[$i]}" + return + fi + done + + # Fallback + echo "${value_list[0]}" +} + +# Function to generate random string +random_string() { + local length=$1 + cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c "$length" +} + +# Function to generate JSON payload +generate_json_payload() { + local target_size=$1 + local entropy=$2 # Now 0-100 + local file_index=$3 + + # Start with base fields + local timestamp + local id + + if [[ $entropy -gt 0 ]]; then + timestamp=$((1000000000 + RANDOM * 10000)) + else + timestamp=1234567890 + fi + + if [[ $entropy -gt 50 ]]; then + id=$(random_string 16) + else + local id_variation=$((entropy * 10)) + id_variation=$((id_variation > 0 ? id_variation : 1)) + id="fixed-id-$((RANDOM % id_variation))" + fi + + # Create base JSON + local json=$(jq -n \ + --argjson ts "$timestamp" \ + --arg id "$id" \ + '{timestamp: $ts, id: $id}') + + # Calculate current size + local current_size=$(echo -n "$json" | wc -c | tr -d ' ') + + if [[ $current_size -ge $target_size ]]; then + echo "$json" + return + fi + + local remaining_size=$((target_size - current_size)) + + # Determine number of fields based on entropy + local num_fields + if [[ $entropy -lt 30 ]]; then + num_fields=1 + elif [[ $entropy -lt 70 ]]; then + num_fields=$((1 + RANDOM % 3)) + else + num_fields=$((2 + RANDOM % 4)) + fi + + # Calculate chars per field + local chars_per_field=$((remaining_size / (num_fields + 1))) + + # Add data fields + for ((i=0; i 1 ? string_length : 1)) + fi + + local field_value=$(random_string "$string_length") + json=$(echo "$json" | jq --arg key "$field_name" --arg val "$field_value" '. + {($key): $val}') + done + + # Fine-tune size with padding + current_size=$(echo -n "$json" | wc -c | tr -d ' ') + if [[ $current_size -lt $target_size ]]; then + local padding_size=$((target_size - current_size - 15)) + if [[ $padding_size -gt 0 ]]; then + local padding=$(random_string "$padding_size") + json=$(echo "$json" | jq --arg pad "$padding" '. + {_padding: $pad}') + fi + fi + + echo "$json" +} + +# Track sizes for statistics +declare -a SIZES + +# Generate payload files +for ((i=0; i "$filepath" + else + jq -n \ + --argjson data "$payload" \ + '{value: $data}' > "$filepath" + fi + + # Calculate actual size + actual_size=$(echo -n "$payload" | wc -c | tr -d ' ') + SIZES+=("$actual_size") + + # Progress update + if [[ $((i + 1)) -eq 1 ]] || [[ $(((i + 1) % 100)) -eq 0 ]] || [[ $((i + 1)) -eq $NUM_FILES ]]; then + echo "Generated $((i + 1))/$NUM_FILES files... (last file data size: $actual_size bytes)" + fi +done + +# Calculate statistics +total=0 +min_size=${SIZES[0]} +max_size=${SIZES[0]} + +for size in "${SIZES[@]}"; do + total=$((total + size)) + if [[ $size -lt $min_size ]]; then + min_size=$size + fi + if [[ $size -gt $max_size ]]; then + max_size=$size + fi +done + +avg_size=$(echo "scale=1; $total / $NUM_FILES" | bc) +size_variance=$((max_size - min_size)) +variance_percent=$(echo "scale=1; ($size_variance / $avg_size) * 100" | bc) + +echo +echo "Generation complete!" +echo "Average payload size: $avg_size bytes" +echo "Min payload size: $min_size bytes" +echo "Max payload size: $max_size bytes" +echo "Size variance: $size_variance bytes ($variance_percent%)" \ No newline at end of file diff --git a/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java b/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java index 1662cf1c6..1681cec71 100644 --- a/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java +++ b/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java @@ -14,6 +14,7 @@ package io.openmessaging.benchmark.driver; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -27,4 +28,18 @@ public interface BenchmarkProducer extends AutoCloseable { * @return a future that will be triggered when the message is successfully published */ CompletableFuture sendAsync(Optional key, byte[] payload); + + /** + * Same as sendAsync but can add headers to the message when supported by the driver. When not + * supported, the default implementation ignores the headers. + * + * @param key the key associated with this message + * @param payload the message payload + * @param headers the message headers + * @return a future that will be triggered when the message is successfully published + */ + default CompletableFuture sendAsync( + Optional key, byte[] payload, Map headers) { + return sendAsync(key, payload); + } } diff --git a/driver-kafka/kafka-compression-lz4.yaml b/driver-kafka/kafka-compression-lz4.yaml index 8fdccc982..60923d3d9 100644 --- a/driver-kafka/kafka-compression-lz4.yaml +++ b/driver-kafka/kafka-compression-lz4.yaml @@ -23,7 +23,7 @@ topicConfig: | min.insync.replicas=2 commonConfig: | - bootstrap.servers=localhost:9092 + bootstrap.servers=localhost:19092 producerConfig: | acks=all diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java index 6c62d9fbc..0cb162bd8 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java @@ -28,6 +28,8 @@ import io.openmessaging.benchmark.driver.BenchmarkProducer; +import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.producer.Producer; @@ -44,9 +46,12 @@ public KafkaBenchmarkProducer(Producer producer, String topic) { } @Override - public CompletableFuture sendAsync(Optional key, byte[] payload) { + public CompletableFuture sendAsync( + Optional key, byte[] payload, Map headers) { ProducerRecord record = new ProducerRecord<>(topic, key.orElse(null), payload); - + if (headers != null) { + headers.forEach((k, v) -> record.headers().add(k, v.getBytes(StandardCharsets.UTF_8))); + } CompletableFuture future = new CompletableFuture<>(); producer.send( @@ -62,6 +67,11 @@ public CompletableFuture sendAsync(Optional key, byte[] payload) { return future; } + @Override + public CompletableFuture sendAsync(Optional key, byte[] payload) { + return sendAsync(key, payload, null); + } + @Override public void close() throws Exception { producer.close(); diff --git a/payload/multiple/base b/payload/multiple/base new file mode 100644 index 000000000..b95490766 --- /dev/null +++ b/payload/multiple/base @@ -0,0 +1 @@ +fewhifewhufewhiufhew \ No newline at end of file diff --git a/payload/multiple/base64_with_headers.payload.yaml b/payload/multiple/base64_with_headers.payload.yaml new file mode 100644 index 000000000..359137f77 --- /dev/null +++ b/payload/multiple/base64_with_headers.payload.yaml @@ -0,0 +1,4 @@ +# Pass in a base64-encoded if you want JSON data along with custom headers. +data: eyJiaW0iOiJiYXIiLCAiYmFtIjoiYm9tIn0K +headers: + cool: beans \ No newline at end of file diff --git a/payload/multiple/base64nonjson_with_headers.payload.yaml b/payload/multiple/base64nonjson_with_headers.payload.yaml new file mode 100644 index 000000000..943afec51 --- /dev/null +++ b/payload/multiple/base64nonjson_with_headers.payload.yaml @@ -0,0 +1,4 @@ +# Pass in a base64-encoded if you want to send non-JSON data along with custom headers. +data: b2JzY3VyZV9zdHJpbmcK +headers: + cool: beans \ No newline at end of file diff --git a/payload/multiple/simple.payload.yaml b/payload/multiple/simple.payload.yaml new file mode 100644 index 000000000..4a33ba9af --- /dev/null +++ b/payload/multiple/simple.payload.yaml @@ -0,0 +1 @@ +data: "fooz_baz" \ No newline at end of file diff --git a/payload/multiple/with_headers.payload.yaml b/payload/multiple/with_headers.payload.yaml new file mode 100644 index 000000000..865b4b92e --- /dev/null +++ b/payload/multiple/with_headers.payload.yaml @@ -0,0 +1,4 @@ +data: "foo_bar" +headers: # Add your headers + header1: "value1" + header2: "value2" \ No newline at end of file diff --git a/payload/multiple/with_headers_and_json.payload.yaml b/payload/multiple/with_headers_and_json.payload.yaml new file mode 100644 index 000000000..d0210c972 --- /dev/null +++ b/payload/multiple/with_headers_and_json.payload.yaml @@ -0,0 +1,5 @@ +data: # You can send in structured data as well + fooz: baz +headers: + header1: "value1" + header2: "value2" \ No newline at end of file diff --git a/workloads/multi-payloads.yaml b/workloads/multi-payloads.yaml new file mode 100644 index 000000000..5a0860229 --- /dev/null +++ b/workloads/multi-payloads.yaml @@ -0,0 +1,29 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +name: Simple Workload 1 producer on 1 topic + +topics: 1 +partitionsPerTopic: 10 +messageSize: 16 +payloadFile: "payload/multiple" +subscriptionsPerTopic: 1 +producersPerTopic: 1 +consumerPerSubscription: 1 +producerRate: 10000 +consumerBacklogSizeGB: 0 +testDurationMinutes: 5 + +