diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/MetricAdapter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/MetricAdapter.java index deb82a6e241..5dc94712e84 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/MetricAdapter.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/MetricAdapter.java @@ -149,6 +149,9 @@ static Metric toProtoMetric(MetricData metricData) { .addAllDataPoints(toDoubleDataPoints(doubleGaugeData.getPoints())) .build()); break; + case HISTOGRAM: + // TODO: no-op, will add in the following PRs + break; } return builder.build(); } diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java index e04cb914a95..1755b294630 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java @@ -85,6 +85,8 @@ static Collector.Type toMetricFamilyType(MetricData metricData) { return Collector.Type.GAUGE; case SUMMARY: return Collector.Type.SUMMARY; + case HISTOGRAM: + return Collector.Type.HISTOGRAM; } return Collector.Type.UNKNOWN; } @@ -122,6 +124,9 @@ static List toSamples( addSummarySamples( (DoubleSummaryPointData) pointData, name, labelNames, labelValues, samples); break; + case HISTOGRAM: + // TODO: no-op, will add in the following PRs + break; } } return samples; @@ -189,6 +194,8 @@ private static Collection getPoints(MetricData metricData) return metricData.getLongSumData().getPoints(); case SUMMARY: return metricData.getDoubleSummaryData().getPoints(); + case HISTOGRAM: + return metricData.getDoubleHistogramData().getPoints(); } return Collections.emptyList(); } diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java new file mode 100644 index 00000000000..f70ce64f656 --- /dev/null +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.resources.Resource; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class DoubleHistogramBenchmark { + private static final Aggregator aggregator = + AggregatorFactory.histogram(new double[] {10, 100, 1_000}, /* stateful= */ false) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "1", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)); + private AggregatorHandle aggregatorHandle; + + @Setup(Level.Trial) + public final void setup() { + aggregatorHandle = aggregator.createHandle(); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 10) + public void aggregate_10Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 5) + public void aggregate_5Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void aggregate_1Threads() { + aggregatorHandle.recordDouble(100.0056); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java index 05887a3fd20..5ae686e19c8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java @@ -45,6 +45,18 @@ static AggregatorFactory count(AggregationTemporality temporality) { return new CountAggregatorFactory(temporality); } + /** + * Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of + * the measurements taken. + * + * @param stateful configures if the aggregator is stateful. + * @param boundaries configures the fixed bucket boundaries. + * @return an {@code AggregationFactory} that calculates histogram of recorded measurements. + */ + static AggregatorFactory histogram(double[] boundaries, boolean stateful) { + return new HistogramAggregatorFactory(boundaries, stateful); + } + /** * Returns an {@code AggregationFactory} that calculates the last value of all recorded * measurements. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java new file mode 100644 index 00000000000..e0430a7c9f6 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java @@ -0,0 +1,168 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.concurrent.GuardedBy; + +final class DoubleHistogramAggregator extends AbstractAggregator { + private final ImmutableDoubleArray boundaries; + + DoubleHistogramAggregator( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor instrumentDescriptor, + ImmutableDoubleArray boundaries, + boolean stateful) { + super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful); + this.boundaries = boundaries; + } + + @Override + public AggregatorHandle createHandle() { + return new Handle(this.boundaries); + } + + /** + * Return the result of the merge of two histogram accumulations. As long as one Aggregator + * instance produces all Accumulations with constant boundaries we don't need to worry about + * merging accumulations with different boundaries. + */ + @Override + public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) { + long[] mergedCounts = new long[x.getCounts().length()]; + for (int i = 0; i < x.getCounts().length(); ++i) { + mergedCounts[i] = x.getCounts().get(i) + y.getCounts().get(i); + } + return HistogramAccumulation.create( + x.getCount() + y.getCount(), + x.getSum() + y.getSum(), + ImmutableLongArray.copyOf(mergedCounts)); + } + + @Override + public final MetricData toMetricData( + Map accumulationByLabels, + long startEpochNanos, + long lastCollectionEpoch, + long epochNanos) { + return MetricData.createDoubleHistogram( + getResource(), + getInstrumentationLibraryInfo(), + getInstrumentDescriptor().getName(), + getInstrumentDescriptor().getDescription(), + getInstrumentDescriptor().getUnit(), + DoubleHistogramData.create( + isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA, + MetricDataUtils.toDoubleHistogramPointList( + accumulationByLabels, + isStateful() ? startEpochNanos : lastCollectionEpoch, + epochNanos, + boundaries.toList()))); + } + + @Override + public HistogramAccumulation accumulateDouble(double value) { + return HistogramAccumulation.create(1, value, ImmutableLongArray.of(1)); + } + + @Override + public HistogramAccumulation accumulateLong(long value) { + return HistogramAccumulation.create(1, value, ImmutableLongArray.of(1)); + } + + static final class Handle extends AggregatorHandle { + private final ImmutableDoubleArray boundaries; + + private final ReentrantLock lock = new ReentrantLock(); + + @GuardedBy("lock") + private final State current; + + Handle(ImmutableDoubleArray boundaries) { + this.boundaries = boundaries; + this.current = new State(this.boundaries.length() + 1); + } + + // Benchmark shows that linear search performs better than binary search with ordinary + // buckets. + private int findBucketIndex(double value) { + for (int i = 0; i < boundaries.length(); ++i) { + if (value < boundaries.get(i)) { + return i; + } + } + return boundaries.length(); + } + + @Override + protected HistogramAccumulation doAccumulateThenReset() { + double sum; + ImmutableLongArray counts; + lock.lock(); + try { + sum = current.sum; + counts = ImmutableLongArray.copyOf(current.counts); + current.reset(); + } finally { + lock.unlock(); + } + + long totalCount = 0; + for (int i = 0; i < counts.length(); ++i) { + totalCount += counts.get(i); + } + + return HistogramAccumulation.create(totalCount, sum, counts); + } + + @Override + protected void doRecordDouble(double value) { + int bucketIndex = findBucketIndex(value); + + lock.lock(); + try { + current.record(bucketIndex, value); + } finally { + lock.unlock(); + } + } + + @Override + protected void doRecordLong(long value) { + doRecordDouble((double) value); + } + + private static final class State { + private double sum; + private final long[] counts; + + public State(int bucketSize) { + this.counts = new long[bucketSize]; + reset(); + } + + private void reset() { + this.sum = 0; + Arrays.fill(this.counts, 0); + } + + private void record(int bucketIndex, double value) { + this.sum += value; + this.counts[bucketIndex]++; + } + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java new file mode 100644 index 00000000000..e7bbc7de529 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import com.google.auto.value.AutoValue; +import javax.annotation.concurrent.Immutable; + +@Immutable +@AutoValue +abstract class HistogramAccumulation { + /** + * Creates a new {@link HistogramAccumulation} with the given values. + * + * @return a new {@link HistogramAccumulation} with the given values. + */ + static HistogramAccumulation create(long count, double sum, ImmutableLongArray counts) { + return new AutoValue_HistogramAccumulation(count, sum, counts); + } + + HistogramAccumulation() {} + + /** + * The number of measurements taken. + * + * @return the count of recorded measurements. + */ + abstract long getCount(); + + /** + * The sum of all measurements recorded. + * + * @return the sum of recorded measurements. + */ + abstract double getSum(); + + /** + * The counts in each bucket. + * + * @return the counts in each bucket. + */ + abstract ImmutableLongArray getCounts(); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java new file mode 100644 index 00000000000..49c3e40bf88 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.resources.Resource; + +final class HistogramAggregatorFactory implements AggregatorFactory { + private final ImmutableDoubleArray boundaries; + private final boolean stateful; + + HistogramAggregatorFactory(double[] boundaries, boolean stateful) { + this.boundaries = ImmutableDoubleArray.copyOf(boundaries); + this.stateful = stateful; + + for (int i = 1; i < this.boundaries.length(); ++i) { + if (Double.compare(this.boundaries.get(i - 1), this.boundaries.get(i)) >= 0) { + throw new IllegalArgumentException( + "invalid bucket boundary: " + + this.boundaries.get(i - 1) + + " >= " + + this.boundaries.get(i)); + } + } + if (this.boundaries.length() > 0) { + if (this.boundaries.get(0) == Double.NEGATIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: -Inf"); + } + if (this.boundaries.get(this.boundaries.length() - 1) == Double.POSITIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: +Inf"); + } + } + } + + @Override + @SuppressWarnings("unchecked") + public Aggregator create( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor descriptor) { + switch (descriptor.getValueType()) { + case LONG: + case DOUBLE: + return (Aggregator) + new DoubleHistogramAggregator( + resource, instrumentationLibraryInfo, descriptor, this.boundaries, this.stateful); + } + throw new IllegalArgumentException("Invalid instrument value type"); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableDoubleArray.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableDoubleArray.java new file mode 100644 index 00000000000..22c9f146bbc --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableDoubleArray.java @@ -0,0 +1,100 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +@Immutable +class ImmutableDoubleArray { + private static final ImmutableDoubleArray EMPTY = new ImmutableDoubleArray(new double[0]); + + /** Returns an immutable array containing the given values, in order. */ + public static ImmutableDoubleArray copyOf(double[] values) { + return values.length == 0 + ? EMPTY + : new ImmutableDoubleArray(Arrays.copyOf(values, values.length)); + } + + private final double[] array; + + private ImmutableDoubleArray(double[] array) { + this.array = array; + } + + /** Returns a copy of the underlying data as list. */ + public List toList() { + List result = new ArrayList<>(array.length); + for (double v : array) { + result.add(v); + } + return result; + } + + /** Returns the number of values in this array. */ + public int length() { + return array.length; + } + + /** + * Returns the {@code double} value present at the given index. + * + * @throws IndexOutOfBoundsException if {@code index} is negative, or greater than or equal to + * {@link #length} + */ + public double get(int index) { + return array[index]; + } + + /** + * Returns {@code true} if {@code object} is an {@code ImmutableDoubleArray} containing the same + * values as this one, in the same order. + */ + @Override + public boolean equals(@Nullable Object object) { + if (object == this) { + return true; + } + if (!(object instanceof ImmutableDoubleArray)) { + return false; + } + ImmutableDoubleArray that = (ImmutableDoubleArray) object; + return Arrays.equals(this.array, that.array); + } + + /** Returns an unspecified hash code for the contents of this immutable array. */ + @Override + public int hashCode() { + int hash = 1; + for (double value : array) { + hash *= 31; + hash += ((Double) value).hashCode(); + } + return hash; + } + + /** + * Returns a string representation of this array in the same form as {@link + * Arrays#toString(double[])}, for example {@code "[1, 2, 3]"}. + */ + @Override + public String toString() { + if (length() == 0) { + return "[]"; + } + StringBuilder builder = new StringBuilder(length() * 5); // rough estimate is fine + builder.append('[').append(array[0]); + + for (int i = 1; i < length(); i++) { + builder.append(", ").append(array[i]); + } + builder.append(']'); + return builder.toString(); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableLongArray.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableLongArray.java new file mode 100644 index 00000000000..e1ec8a1db60 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/ImmutableLongArray.java @@ -0,0 +1,113 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +@Immutable +class ImmutableLongArray { + private static final ImmutableLongArray EMPTY = new ImmutableLongArray(new long[0]); + + /** Returns an immutable array containing a single value. */ + public static ImmutableLongArray of(long e0) { + return new ImmutableLongArray(new long[] {e0}); + } + + /** Returns an immutable array containing the given values, in order. */ + public static ImmutableLongArray copyOf(long[] values) { + return values.length == 0 + ? EMPTY + : new ImmutableLongArray(Arrays.copyOf(values, values.length)); + } + + private final long[] array; + + private ImmutableLongArray(long[] array) { + this.array = array; + } + + /** Returns a copy of the underlying data as list. */ + public List toList() { + List result = new ArrayList<>(array.length); + for (long v : array) { + result.add(v); + } + return result; + } + + /** Returns the number of values in this array. */ + public int length() { + return array.length; + } + + /** + * Returns the {@code long} value present at the given index. + * + * @throws IndexOutOfBoundsException if {@code index} is negative, or greater than or equal to + * {@link #length} + */ + public long get(int index) { + return array[index]; + } + + /** + * Returns {@code true} if {@code object} is an {@code ImmutableLongArray} containing the same + * values as this one, in the same order. + */ + @Override + public boolean equals(@Nullable Object object) { + if (object == this) { + return true; + } + if (!(object instanceof ImmutableLongArray)) { + return false; + } + ImmutableLongArray that = (ImmutableLongArray) object; + if (this.length() != that.length()) { + return false; + } + for (int i = 0; i < length(); i++) { + if (this.get(i) != that.get(i)) { + return false; + } + } + return true; + } + + /** Returns an unspecified hash code for the contents of this immutable array. */ + @Override + public int hashCode() { + int hash = 1; + for (long value : array) { + hash *= 31; + hash += (int) (value ^ (value >>> 32)); + } + return hash; + } + + /** + * Returns a string representation of this array in the same form as {@link + * Arrays#toString(long[])}, for example {@code "[1, 2, 3]"}. + */ + @Override + public String toString() { + if (length() == 0) { + return "[]"; + } + StringBuilder builder = new StringBuilder(length() * 5); // rough estimate is fine + builder.append('[').append(array[0]); + + for (int i = 1; i < length(); i++) { + builder.append(", ").append(array[i]); + } + builder.append(']'); + return builder.toString(); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java index 9574632271b..973b5470fbd 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.aggregator; import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; @@ -44,4 +45,24 @@ static List toDoubleSummaryPointList( points.add(aggregator.toPoint(startEpochNanos, epochNanos, labels))); return points; } + + static List toDoubleHistogramPointList( + Map accumulationMap, + long startEpochNanos, + long epochNanos, + List boundaries) { + List points = new ArrayList<>(accumulationMap.size()); + accumulationMap.forEach( + (labels, aggregator) -> + points.add( + DoubleHistogramPointData.create( + startEpochNanos, + epochNanos, + labels, + aggregator.getSum(), + aggregator.getCount(), + boundaries, + aggregator.getCounts().toList()))); + return points; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java new file mode 100644 index 00000000000..a5f90422faf --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.data; + +import com.google.auto.value.AutoValue; +import java.util.Collection; +import javax.annotation.concurrent.Immutable; + +@Immutable +@AutoValue +public abstract class DoubleHistogramData implements Data { + DoubleHistogramData() {} + + public static DoubleHistogramData create( + AggregationTemporality temporality, Collection points) { + return new AutoValue_DoubleHistogramData(temporality, points); + } + + /** + * Returns the {@code AggregationTemporality} of this metric, + * + *

AggregationTemporality describes if the aggregator reports delta changes since last report + * time, or cumulative changes since a fixed start time. + * + * @return the {@code AggregationTemporality} of this metric + */ + public abstract AggregationTemporality getAggregationTemporality(); + + @Override + public abstract Collection getPoints(); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java new file mode 100644 index 00000000000..4b6d16f8c3e --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.data; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.metrics.common.Labels; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.annotation.concurrent.Immutable; + +/** + * DoubleHistogramPointData represents an approximate representation of the distribution of + * measurements. + */ +@Immutable +@AutoValue +public abstract class DoubleHistogramPointData implements PointData { + /** + * Functional interface for consuming bucket boundaries and counts as a sequence of pair values. + */ + public interface BucketConsumer { + void accept(double upperBound, long count); + } + + /** + * Creates a DoubleHistogramPointData. + * + * @return a DoubleHistogramPointData. + */ + public static DoubleHistogramPointData create( + long startEpochNanos, + long epochNanos, + Labels labels, + double sum, + long count, + List boundaries, + List counts) { + return new AutoValue_DoubleHistogramPointData( + startEpochNanos, + epochNanos, + labels, + sum, + count, + Collections.unmodifiableList(new ArrayList<>(boundaries)), + Collections.unmodifiableList(new ArrayList<>(counts))); + } + + DoubleHistogramPointData() {} + + /** + * The sum of all measurements recorded. + * + * @return the sum of recorded measurements. + */ + public abstract double getSum(); + + /** + * The number of measurements taken. + * + * @return the count of recorded measurements. + */ + public abstract long getCount(); + + /** + * The bucket boundaries. For a Histogram with N defined boundaries, e.g, [x, y, z]. There are N+1 + * counts: [-inf, x), [x, y), [y, z), [z, +inf]. + * + * @return the read-only bucket boundaries in increasing order. do not mutate the returned + * object. + */ + public abstract List getBoundaries(); + + /** + * The counts in each bucket. + * + * @return the read-only counts in each bucket. do not mutate the returned object. + */ + public abstract List getCounts(); + + /** Iterates over all the bucket boundaries and counts in this histogram. */ + public void forEach(BucketConsumer action) { + List boundaries = getBoundaries(); + List counts = getCounts(); + for (int i = 0; i < boundaries.size(); ++i) { + action.accept(boundaries.get(i), counts.get(i)); + } + action.accept(Double.POSITIVE_INFINITY, counts.get(boundaries.size())); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java index d7e2939635a..61df82f5db5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java @@ -29,6 +29,8 @@ public abstract class MetricData { /* isMonotonic= */ false, AggregationTemporality.CUMULATIVE, Collections.emptyList()); private static final DoubleSummaryData DEFAULT_DOUBLE_SUMMARY_DATA = DoubleSummaryData.create(Collections.emptyList()); + private static final DoubleHistogramData DEFAULT_DOUBLE_HISTOGRAM_DATA = + DoubleHistogramData.create(AggregationTemporality.CUMULATIVE, Collections.emptyList()); /** * Returns a new MetricData wih a {@link MetricDataType#DOUBLE_GAUGE} type. @@ -140,6 +142,28 @@ public static MetricData createDoubleSummary( data); } + /** + * Returns a new MetricData with a {@link MetricDataType#HISTOGRAM} type. + * + * @return a new MetricData wih a {@link MetricDataType#HISTOGRAM} type. + */ + public static MetricData createDoubleHistogram( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + String name, + String description, + String unit, + DoubleHistogramData data) { + return new AutoValue_MetricData( + resource, + instrumentationLibraryInfo, + name, + description, + unit, + MetricDataType.HISTOGRAM, + data); + } + MetricData() {} /** @@ -265,4 +289,18 @@ public final DoubleSummaryData getDoubleSummaryData() { } return DEFAULT_DOUBLE_SUMMARY_DATA; } + + /** + * Returns the {@code DoubleHistogramData} if type is {@link MetricDataType#HISTOGRAM}, otherwise + * a default empty data. + * + * @return the {@code DoubleHistogramData} if type is {@link MetricDataType#HISTOGRAM}, otherwise + * a default empty data. + */ + public final DoubleHistogramData getDoubleHistogramData() { + if (getType() == MetricDataType.HISTOGRAM) { + return (DoubleHistogramData) getData(); + } + return DEFAULT_DOUBLE_HISTOGRAM_DATA; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java index 50adc159fbd..0749807848f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java @@ -30,4 +30,10 @@ public enum MetricDataType { * value recorded, the sum of all measurements and the total number of measurements recorded. */ SUMMARY, + + /** + * A Histogram represents an approximate representation of the distribution of measurements + * recorded. + */ + HISTOGRAM, } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java index f908ab243d0..b0a3cc12c1c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.resources.Resource; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; class AggregatorFactoryTest { @@ -123,4 +124,73 @@ void getSumAggregatorFactory() { InstrumentValueType.DOUBLE))) .isInstanceOf(DoubleSumAggregator.class); } + + @Test + void getHistogramAggregatorFactory() { + AggregatorFactory histogram = + AggregatorFactory.histogram(new double[] {1.0}, /* stateful= */ false); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG))) + .isInstanceOf(DoubleHistogramAggregator.class); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE))) + .isInstanceOf(DoubleHistogramAggregator.class); + + assertThat( + histogram + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG)) + .isStateful()) + .isFalse(); + assertThat( + AggregatorFactory.histogram(new double[] {1.0}, /* stateful= */ true) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)) + .isStateful()) + .isTrue(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + AggregatorFactory.histogram( + new double[] {Double.NEGATIVE_INFINITY}, /* stateful= */ false)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + AggregatorFactory.histogram( + new double[] {1, Double.POSITIVE_INFINITY}, /* stateful= */ false)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> AggregatorFactory.histogram(new double[] {2, 1, 3}, /* stateful= */ false)); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java new file mode 100644 index 00000000000..9a2bda5e57c --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java @@ -0,0 +1,176 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; + +public class DoubleHistogramAggregatorTest { + private static final DoubleHistogramAggregator aggregator = + new DoubleHistogramAggregator( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG), + ImmutableDoubleArray.copyOf(new double[] {10.0, 100.0, 1000.0}), + /* stateful= */ false); + + @Test + void createHandle() { + assertThat(aggregator.createHandle()).isInstanceOf(DoubleHistogramAggregator.Handle.class); + } + + @Test + void testRecordings() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(20); + aggregatorHandle.recordLong(5); + aggregatorHandle.recordLong(150); + aggregatorHandle.recordLong(2000); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create( + 4, 2175, ImmutableLongArray.copyOf(new long[] {1, 1, 1, 1}))); + } + + @Test + void toAccumulationAndReset() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(100); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create( + 1, 100, ImmutableLongArray.copyOf(new long[] {0, 0, 1, 0}))); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(0); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create(1, 0, ImmutableLongArray.copyOf(new long[] {1, 0, 0, 0}))); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + } + + @Test + void toMetricData() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(10); + + MetricData metricData = + aggregator.toMetricData( + Collections.singletonMap(Labels.empty(), aggregatorHandle.accumulateThenReset()), + 0, + 10, + 100); + assertThat(metricData).isNotNull(); + assertThat(metricData.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(metricData.getDoubleHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + } + + @Test + void accumulateData() { + assertThat(aggregator.accumulateDouble(2.0)) + .isEqualTo(HistogramAccumulation.create(1, 2.0, ImmutableLongArray.of(1))); + assertThat(aggregator.accumulateLong(10)) + .isEqualTo(HistogramAccumulation.create(1, 10.0, ImmutableLongArray.of(1))); + } + + @Test + void testMultithreadedUpdates() throws Exception { + final AggregatorHandle aggregatorHandle = aggregator.createHandle(); + final Histogram summarizer = new Histogram(); + int numberOfThreads = 10; + final long[] updates = new long[] {1, 2, 3, 5, 7, 11, 13, 17, 19, 23}; + final int numberOfUpdates = 1000; + final CountDownLatch startingGun = new CountDownLatch(numberOfThreads); + List workers = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread t = + new Thread( + () -> { + long update = updates[index]; + try { + startingGun.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numberOfUpdates; j++) { + aggregatorHandle.recordLong(update); + if (ThreadLocalRandom.current().nextInt(10) == 0) { + summarizer.process(aggregatorHandle.accumulateThenReset()); + } + } + }); + workers.add(t); + t.start(); + } + for (int i = 0; i <= numberOfThreads; i++) { + startingGun.countDown(); + } + + for (Thread worker : workers) { + worker.join(); + } + // make sure everything gets merged when all the aggregation is done. + summarizer.process(aggregatorHandle.accumulateThenReset()); + + assertThat(summarizer.accumulation) + .isEqualTo( + HistogramAccumulation.create( + numberOfThreads * numberOfUpdates, + 101000, + ImmutableLongArray.copyOf(new long[] {5000, 5000, 0, 0}))); + } + + private static final class Histogram { + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + @GuardedBy("lock") + @Nullable + private HistogramAccumulation accumulation; + + void process(@Nullable HistogramAccumulation other) { + if (other == null) { + return; + } + lock.writeLock().lock(); + try { + if (accumulation == null) { + accumulation = other; + return; + } + accumulation = aggregator.merge(accumulation, other); + } finally { + lock.writeLock().unlock(); + } + } + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java index 01eae936c1e..e89013a2f30 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java @@ -7,11 +7,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableList; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -40,6 +43,15 @@ class MetricDataTest { Arrays.asList( ValueAtPercentile.create(0.0, DOUBLE_VALUE), ValueAtPercentile.create(100, DOUBLE_VALUE))); + private static final DoubleHistogramPointData HISTOGRAM_POINT = + DoubleHistogramPointData.create( + START_EPOCH_NANOS, + EPOCH_NANOS, + Labels.of("key", "value"), + DOUBLE_VALUE, + LONG_VALUE, + ImmutableList.of(1.0), + ImmutableList.of(1L, 1L)); @Test void metricData_Getters() { @@ -146,6 +158,39 @@ void metricData_SummaryPoints() { assertThat(metricData.getDoubleSummaryData().getPoints()).containsExactly(SUMMARY_POINT); } + @Test + void metricData_HistogramPoints() { + assertThat(HISTOGRAM_POINT.getStartEpochNanos()).isEqualTo(START_EPOCH_NANOS); + assertThat(HISTOGRAM_POINT.getEpochNanos()).isEqualTo(EPOCH_NANOS); + assertThat(HISTOGRAM_POINT.getLabels().size()).isEqualTo(1); + assertThat(HISTOGRAM_POINT.getLabels().get("key")).isEqualTo("value"); + assertThat(HISTOGRAM_POINT.getCount()).isEqualTo(LONG_VALUE); + assertThat(HISTOGRAM_POINT.getSum()).isEqualTo(DOUBLE_VALUE); + assertThat(HISTOGRAM_POINT.getBoundaries()).isEqualTo(ImmutableList.of(1.0)); + assertThat(HISTOGRAM_POINT.getCounts()).isEqualTo(ImmutableList.of(1L, 1L)); + + List boundaries = new ArrayList<>(); + List counts = new ArrayList<>(); + HISTOGRAM_POINT.forEach( + (b, c) -> { + boundaries.add(b); + counts.add(c); + }); + assertThat(boundaries).isEqualTo(Arrays.asList(1.0, Double.POSITIVE_INFINITY)); + assertThat(counts).isEqualTo(ImmutableList.of(1L, 1L)); + + MetricData metricData = + MetricData.createDoubleHistogram( + Resource.empty(), + InstrumentationLibraryInfo.empty(), + "metric_name", + "metric_description", + "ms", + DoubleHistogramData.create( + AggregationTemporality.DELTA, Collections.singleton(HISTOGRAM_POINT))); + assertThat(metricData.getDoubleHistogramData().getPoints()).containsExactly(HISTOGRAM_POINT); + } + @Test void metricData_GetDefault() { MetricData metricData = @@ -160,6 +205,7 @@ void metricData_GetDefault() { assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSumData().getPoints()).isEmpty(); assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); + assertThat(metricData.getDoubleHistogramData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSummaryData().getPoints()).containsExactly(SUMMARY_POINT); metricData = @@ -174,6 +220,7 @@ void metricData_GetDefault() { assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSumData().getPoints()).isEmpty(); assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); + assertThat(metricData.getDoubleHistogramData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSummaryData().getPoints()).isEmpty(); } }