diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java new file mode 100644 index 00000000000..f1ae34ba6b4 --- /dev/null +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class DoubleHistogramBenchmark { + private static final Aggregator aggregator = + AggregatorFactory.histogram(Arrays.asList(10.0, 100.0, 1_000.0), AggregationTemporality.DELTA) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "1", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)); + private AggregatorHandle aggregatorHandle; + + @Setup(Level.Trial) + public final void setup() { + aggregatorHandle = aggregator.createHandle(); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + @Threads(value = 10) + public void aggregate_10Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + @Threads(value = 5) + public void aggregate_5Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + @Threads(value = 1) + public void aggregate_1Threads() { + aggregatorHandle.recordDouble(100.0056); + } +} diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountBenchmark.java index a1fb8d575c8..52365ce1e5e 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountBenchmark.java @@ -12,9 +12,11 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -46,7 +48,8 @@ public final void setup() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 10) public void aggregate_10Threads() { aggregatorHandle.recordDouble(100.0056); @@ -56,7 +59,8 @@ public void aggregate_10Threads() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 5) public void aggregate_5Threads() { aggregatorHandle.recordDouble(100.0056); @@ -66,7 +70,8 @@ public void aggregate_5Threads() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 1) public void aggregate_1Threads() { aggregatorHandle.recordDouble(100.0056); diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountBenchmark.java index 895a06d4e25..b15d4704298 100644 --- a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountBenchmark.java +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountBenchmark.java @@ -12,9 +12,11 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -46,7 +48,8 @@ public final void setup() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 10) public void aggregate_10Threads() { aggregatorHandle.recordLong(100); @@ -56,7 +59,8 @@ public void aggregate_10Threads() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 5) public void aggregate_5Threads() { aggregatorHandle.recordLong(100); @@ -66,7 +70,8 @@ public void aggregate_5Threads() { @Fork(1) @Warmup(iterations = 5, time = 1) @Measurement(iterations = 10, time = 1) - @OutputTimeUnit(TimeUnit.MILLISECONDS) + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.NANOSECONDS) @Threads(value = 1) public void aggregate_1Threads() { aggregatorHandle.recordLong(100); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java index 05887a3fd20..777b13e4029 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java @@ -9,6 +9,7 @@ import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.resources.Resource; +import java.util.List; import javax.annotation.concurrent.Immutable; /** Factory class for {@link Aggregator}. */ @@ -77,6 +78,18 @@ static AggregatorFactory minMaxSumCount() { return MinMaxSumCountAggregatorFactory.INSTANCE; } + /** + * Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of + * the measurements taken. + * + * @param temporality configures what temporality to be produced for the Histogram metrics. + * @param boundaries configures the fixed bucket boundaries. + * @return an {@code AggregationFactory} that calculates histogram of recorded measurements. + */ + static AggregatorFactory histogram(List boundaries, AggregationTemporality temporality) { + return new HistogramAggregatorFactory(boundaries, temporality); + } + /** * Returns a new {@link Aggregator}. * diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java new file mode 100644 index 00000000000..35a8f1cd5ad --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java @@ -0,0 +1,158 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.concurrent.GuardedBy; + +final class DoubleHistogramAggregator extends AbstractAggregator { + private final double[] boundaries; + + // a cache for converting to MetricData + private final List boundaryList; + + DoubleHistogramAggregator( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor instrumentDescriptor, + double[] boundaries, + boolean stateful) { + super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful); + this.boundaries = boundaries; + + List boundaryList = new ArrayList<>(this.boundaries.length); + for (double v : this.boundaries) { + boundaryList.add(v); + } + this.boundaryList = Collections.unmodifiableList(boundaryList); + } + + @Override + public AggregatorHandle createHandle() { + return new Handle(this.boundaries); + } + + /** + * Return the result of the merge of two histogram accumulations. As long as one Aggregator + * instance produces all Accumulations with constant boundaries we don't need to worry about + * merging accumulations with different boundaries. + */ + @Override + public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) { + long[] mergedCounts = new long[x.getCounts().length]; + for (int i = 0; i < x.getCounts().length; ++i) { + mergedCounts[i] = x.getCounts()[i] + y.getCounts()[i]; + } + return HistogramAccumulation.create(x.getSum() + y.getSum(), mergedCounts); + } + + @Override + public final MetricData toMetricData( + Map accumulationByLabels, + long startEpochNanos, + long lastCollectionEpoch, + long epochNanos) { + return MetricData.createDoubleHistogram( + getResource(), + getInstrumentationLibraryInfo(), + getInstrumentDescriptor().getName(), + getInstrumentDescriptor().getDescription(), + getInstrumentDescriptor().getUnit(), + DoubleHistogramData.create( + isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA, + MetricDataUtils.toDoubleHistogramPointList( + accumulationByLabels, + isStateful() ? startEpochNanos : lastCollectionEpoch, + epochNanos, + boundaryList))); + } + + @Override + public HistogramAccumulation accumulateDouble(double value) { + long[] counts = new long[this.boundaries.length + 1]; + counts[findBucketIndex(this.boundaries, value)] = 1; + return HistogramAccumulation.create(value, counts); + } + + @Override + public HistogramAccumulation accumulateLong(long value) { + return accumulateDouble((double) value); + } + + // Benchmark shows that linear search performs better than binary search with ordinary + // buckets. + private static int findBucketIndex(double[] boundaries, double value) { + for (int i = 0; i < boundaries.length; ++i) { + if (value <= boundaries[i]) { + return i; + } + } + return boundaries.length; + } + + static final class Handle extends AggregatorHandle { + // read-only + private final double[] boundaries; + + @GuardedBy("lock") + private double sum; + + @GuardedBy("lock") + private final long[] counts; + + private final ReentrantLock lock = new ReentrantLock(); + + Handle(double[] boundaries) { + this.boundaries = boundaries; + this.counts = new long[this.boundaries.length + 1]; + this.sum = 0; + } + + @Override + protected HistogramAccumulation doAccumulateThenReset() { + lock.lock(); + try { + HistogramAccumulation acc = + HistogramAccumulation.create(sum, Arrays.copyOf(counts, counts.length)); + this.sum = 0; + Arrays.fill(this.counts, 0); + return acc; + } finally { + lock.unlock(); + } + } + + @Override + protected void doRecordDouble(double value) { + int bucketIndex = findBucketIndex(this.boundaries, value); + + lock.lock(); + try { + this.sum += value; + this.counts[bucketIndex]++; + } finally { + lock.unlock(); + } + } + + @Override + protected void doRecordLong(long value) { + doRecordDouble((double) value); + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java new file mode 100644 index 00000000000..7a8557f9820 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import com.google.auto.value.AutoValue; +import javax.annotation.concurrent.Immutable; + +@Immutable +@AutoValue +abstract class HistogramAccumulation { + /** + * Creates a new {@link HistogramAccumulation} with the given values. Assume `counts` is read-only + * so we don't need a defensive-copy here. + * + * @return a new {@link HistogramAccumulation} with the given values. + */ + static HistogramAccumulation create(double sum, long[] counts) { + return new AutoValue_HistogramAccumulation(sum, counts); + } + + HistogramAccumulation() {} + + /** + * The sum of all measurements recorded. + * + * @return the sum of recorded measurements. + */ + abstract double getSum(); + + /** + * The counts in each bucket. The returned type is a mutable object, but it should be fine because + * the class is only used internally. + * + * @return the counts in each bucket. do not mutate the returned object. + */ + @SuppressWarnings("mutable") + abstract long[] getCounts(); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java new file mode 100644 index 00000000000..1ce974d081d --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.resources.Resource; +import java.util.List; + +final class HistogramAggregatorFactory implements AggregatorFactory { + private final double[] boundaries; + private final AggregationTemporality temporality; + + HistogramAggregatorFactory(List boundaries, AggregationTemporality temporality) { + this.boundaries = boundaries.stream().mapToDouble(i -> i).toArray(); + this.temporality = temporality; + + for (double v : this.boundaries) { + if (Double.isNaN(v)) { + throw new IllegalArgumentException("invalid bucket boundary: NaN"); + } + } + for (int i = 1; i < this.boundaries.length; ++i) { + if (this.boundaries[i - 1] >= this.boundaries[i]) { + throw new IllegalArgumentException( + "invalid bucket boundary: " + this.boundaries[i - 1] + " >= " + this.boundaries[i]); + } + } + if (this.boundaries.length > 0) { + if (this.boundaries[0] == Double.NEGATIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: -Inf"); + } + if (this.boundaries[this.boundaries.length - 1] == Double.POSITIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: +Inf"); + } + } + } + + @Override + @SuppressWarnings("unchecked") + public Aggregator create( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor descriptor) { + final boolean stateful = this.temporality == AggregationTemporality.CUMULATIVE; + switch (descriptor.getValueType()) { + case LONG: + case DOUBLE: + return (Aggregator) + new DoubleHistogramAggregator( + resource, instrumentationLibraryInfo, descriptor, this.boundaries, stateful); + } + throw new IllegalArgumentException("Invalid instrument value type"); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java index 9574632271b..dd7a233810f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.aggregator; import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; @@ -44,4 +45,23 @@ static List toDoubleSummaryPointList( points.add(aggregator.toPoint(startEpochNanos, epochNanos, labels))); return points; } + + static List toDoubleHistogramPointList( + Map accumulationMap, + long startEpochNanos, + long epochNanos, + List boundaries) { + List points = new ArrayList<>(accumulationMap.size()); + accumulationMap.forEach( + (labels, aggregator) -> { + List counts = new ArrayList<>(aggregator.getCounts().length); + for (long v : aggregator.getCounts()) { + counts.add(v); + } + points.add( + DoubleHistogramPointData.create( + startEpochNanos, epochNanos, labels, aggregator.getSum(), boundaries, counts)); + }); + return points; + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java index f908ab243d0..d528a553e63 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.aggregator; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; @@ -13,6 +14,8 @@ import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.resources.Resource; +import java.util.Arrays; +import java.util.Collections; import org.junit.jupiter.api.Test; class AggregatorFactoryTest { @@ -123,4 +126,86 @@ void getSumAggregatorFactory() { InstrumentValueType.DOUBLE))) .isInstanceOf(DoubleSumAggregator.class); } + + @Test + void getHistogramAggregatorFactory() { + AggregatorFactory histogram = + AggregatorFactory.histogram(Collections.singletonList(1.0), AggregationTemporality.DELTA); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG))) + .isInstanceOf(DoubleHistogramAggregator.class); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE))) + .isInstanceOf(DoubleHistogramAggregator.class); + + assertThat( + histogram + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG)) + .isStateful()) + .isFalse(); + assertThat( + AggregatorFactory.histogram( + Collections.singletonList(1.0), AggregationTemporality.CUMULATIVE) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)) + .isStateful()) + .isTrue(); + + assertThatThrownBy( + () -> + AggregatorFactory.histogram( + Collections.singletonList(Double.NEGATIVE_INFINITY), + AggregationTemporality.DELTA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid bucket boundary: -Inf"); + assertThatThrownBy( + () -> + AggregatorFactory.histogram( + Arrays.asList(1.0, Double.POSITIVE_INFINITY), AggregationTemporality.DELTA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid bucket boundary: +Inf"); + assertThatThrownBy( + () -> + AggregatorFactory.histogram( + Arrays.asList(1.0, Double.NaN), AggregationTemporality.DELTA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid bucket boundary: NaN"); + assertThatThrownBy( + () -> + AggregatorFactory.histogram( + Arrays.asList(2.0, 1.0, 3.0), AggregationTemporality.DELTA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("invalid bucket boundary: 2.0 >= 1.0"); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java new file mode 100644 index 00000000000..efbc387adfc --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java @@ -0,0 +1,165 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; + +public class DoubleHistogramAggregatorTest { + private static final double[] boundaries = new double[] {10.0, 100.0, 1000.0}; + private static final DoubleHistogramAggregator aggregator = + new DoubleHistogramAggregator( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG), + boundaries, + /* stateful= */ false); + + @Test + void createHandle() { + assertThat(aggregator.createHandle()).isInstanceOf(DoubleHistogramAggregator.Handle.class); + } + + @Test + void testRecordings() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(20); + aggregatorHandle.recordLong(5); + aggregatorHandle.recordLong(150); + aggregatorHandle.recordLong(2000); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo(HistogramAccumulation.create(2175, new long[] {1, 1, 1, 1})); + } + + @Test + void toAccumulationAndReset() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(100); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo(HistogramAccumulation.create(100, new long[] {0, 1, 0, 0})); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(0); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo(HistogramAccumulation.create(0, new long[] {1, 0, 0, 0})); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + } + + @Test + void accumulateData() { + assertThat(aggregator.accumulateDouble(11.1)) + .isEqualTo(HistogramAccumulation.create(11.1, new long[] {0, 1, 0, 0})); + assertThat(aggregator.accumulateLong(10)) + .isEqualTo(HistogramAccumulation.create(10.0, new long[] {1, 0, 0, 0})); + } + + @Test + void toMetricData() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(10); + + MetricData metricData = + aggregator.toMetricData( + Collections.singletonMap(Labels.empty(), aggregatorHandle.accumulateThenReset()), + 0, + 10, + 100); + assertThat(metricData).isNotNull(); + assertThat(metricData.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(metricData.getDoubleHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + } + + @Test + void testHistogramCounts() { + assertThat(aggregator.accumulateDouble(1.1).getCounts().length) + .isEqualTo(boundaries.length + 1); + assertThat(aggregator.accumulateLong(1).getCounts().length).isEqualTo(boundaries.length + 1); + + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordDouble(1.1); + HistogramAccumulation histogramAccumulation = aggregatorHandle.accumulateThenReset(); + assertThat(histogramAccumulation).isNotNull(); + assertThat(histogramAccumulation.getCounts().length).isEqualTo(boundaries.length + 1); + } + + @Test + void testMultithreadedUpdates() throws InterruptedException { + final AggregatorHandle aggregatorHandle = aggregator.createHandle(); + final Histogram summarizer = new Histogram(); + final ImmutableList updates = + ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L); + final int numberOfThreads = updates.size(); + final int numberOfUpdates = 10000; + final ThreadPoolExecutor executor = + (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads); + + executor.invokeAll( + updates.stream() + .map( + v -> + Executors.callable( + () -> { + for (int j = 0; j < numberOfUpdates; j++) { + aggregatorHandle.recordLong(v); + if (ThreadLocalRandom.current().nextInt(10) == 0) { + summarizer.process(aggregatorHandle.accumulateThenReset()); + } + } + })) + .collect(Collectors.toList())); + + // make sure everything gets merged when all the aggregation is done. + summarizer.process(aggregatorHandle.accumulateThenReset()); + + assertThat(summarizer.accumulation) + .isEqualTo(HistogramAccumulation.create(1010000, new long[] {50000, 50000, 0, 0})); + } + + private static final class Histogram { + private final Object mutex = new Object(); + + @Nullable private HistogramAccumulation accumulation; + + void process(@Nullable HistogramAccumulation other) { + if (other == null) { + return; + } + + synchronized (mutex) { + if (accumulation == null) { + accumulation = other; + return; + } + accumulation = aggregator.merge(accumulation, other); + } + } + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountAggregatorTest.java index 267fc4e21bf..74d29f97882 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleMinMaxSumCountAggregatorTest.java @@ -97,7 +97,7 @@ void testMultithreadedUpdates() throws Exception { int numberOfThreads = 10; final double[] updates = new double[] {1, 2, 3, 5, 7, 11, 13, 17, 19, 23}; final int numberOfUpdates = 1000; - final CountDownLatch startingGun = new CountDownLatch(numberOfThreads); + final CountDownLatch starter = new CountDownLatch(numberOfThreads); List workers = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { final int index = i; @@ -106,7 +106,7 @@ void testMultithreadedUpdates() throws Exception { () -> { double update = updates[index]; try { - startingGun.await(); + starter.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -121,7 +121,7 @@ void testMultithreadedUpdates() throws Exception { t.start(); } for (int i = 0; i <= numberOfThreads; i++) { - startingGun.countDown(); + starter.countDown(); } for (Thread worker : workers) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountAggregatorTest.java index 6485cec4693..81a76999e67 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongMinMaxSumCountAggregatorTest.java @@ -94,7 +94,7 @@ void testMultithreadedUpdates() throws Exception { int numberOfThreads = 10; final long[] updates = new long[] {1, 2, 3, 5, 7, 11, 13, 17, 19, 23}; final int numberOfUpdates = 1000; - final CountDownLatch startingGun = new CountDownLatch(numberOfThreads); + final CountDownLatch starter = new CountDownLatch(numberOfThreads); List workers = new ArrayList<>(); for (int i = 0; i < numberOfThreads; i++) { final int index = i; @@ -103,7 +103,7 @@ void testMultithreadedUpdates() throws Exception { () -> { long update = updates[index]; try { - startingGun.await(); + starter.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -118,7 +118,7 @@ void testMultithreadedUpdates() throws Exception { t.start(); } for (int i = 0; i <= numberOfThreads; i++) { - startingGun.countDown(); + starter.countDown(); } for (Thread worker : workers) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java index c72fa92d25b..eb44b57053e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.data; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.ImmutableList; import io.opentelemetry.api.metrics.common.Labels; @@ -14,7 +15,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; /** Unit tests for {@link io.opentelemetry.sdk.metrics.data.MetricData}. */ @@ -178,31 +178,31 @@ void metricData_HistogramPoints() { AggregationTemporality.DELTA, Collections.singleton(HISTOGRAM_POINT))); assertThat(metricData.getDoubleHistogramData().getPoints()).containsExactly(HISTOGRAM_POINT); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - DoubleHistogramPointData.create( - 0, 0, Labels.empty(), 0.0, ImmutableList.of(), ImmutableList.of())); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - DoubleHistogramPointData.create( - 0, - 0, - Labels.empty(), - 0.0, - ImmutableList.of(1.0, 1.0), - ImmutableList.of(0L, 0L, 0L))); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - DoubleHistogramPointData.create( - 0, - 0, - Labels.empty(), - 0.0, - ImmutableList.of(Double.NEGATIVE_INFINITY), - ImmutableList.of(0L, 0L))); + assertThatThrownBy( + () -> + DoubleHistogramPointData.create( + 0, 0, Labels.empty(), 0.0, ImmutableList.of(), ImmutableList.of())) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + DoubleHistogramPointData.create( + 0, + 0, + Labels.empty(), + 0.0, + ImmutableList.of(1.0, 1.0), + ImmutableList.of(0L, 0L, 0L))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + DoubleHistogramPointData.create( + 0, + 0, + Labels.empty(), + 0.0, + ImmutableList.of(Double.NEGATIVE_INFINITY), + ImmutableList.of(0L, 0L))) + .isInstanceOf(IllegalArgumentException.class); } @Test