Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
Comment thread
beanliu marked this conversation as resolved.
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(Arrays.asList(10.0, 100.0, 1_000.0), AggregationTemporality.DELTA)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregator.createHandle();
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
Comment thread
bogdandrutu marked this conversation as resolved.
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
Expand Down Expand Up @@ -46,7 +48,8 @@ public final void setup() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregatorHandle.recordDouble(100.0056);
Expand All @@ -56,7 +59,8 @@ public void aggregate_10Threads() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
Expand All @@ -66,7 +70,8 @@ public void aggregate_5Threads() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
Expand Down Expand Up @@ -46,7 +48,8 @@ public final void setup() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregatorHandle.recordLong(100);
Expand All @@ -56,7 +59,8 @@ public void aggregate_10Threads() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordLong(100);
Expand All @@ -66,7 +70,8 @@ public void aggregate_5Threads() {
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordLong(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import javax.annotation.concurrent.Immutable;

/** Factory class for {@link Aggregator}. */
Expand Down Expand Up @@ -77,6 +78,18 @@ static AggregatorFactory minMaxSumCount() {
return MinMaxSumCountAggregatorFactory.INSTANCE;
}

/**
* Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of
* the measurements taken.
*
* @param temporality configures what temporality to be produced for the Histogram metrics.
* @param boundaries configures the fixed bucket boundaries.
* @return an {@code AggregationFactory} that calculates histogram of recorded measurements.
*/
static AggregatorFactory histogram(List<Double> boundaries, AggregationTemporality temporality) {
return new HistogramAggregatorFactory(boundaries, temporality);
}

/**
* Returns a new {@link Aggregator}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;

final class DoubleHistogramAggregator extends AbstractAggregator<HistogramAccumulation> {
private final double[] boundaries;

// a cache for converting to MetricData
private final List<Double> boundaryList;

DoubleHistogramAggregator(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
double[] boundaries,
boolean stateful) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful);
this.boundaries = boundaries;

List<Double> boundaryList = new ArrayList<>(this.boundaries.length);
for (double v : this.boundaries) {
boundaryList.add(v);
}
this.boundaryList = Collections.unmodifiableList(boundaryList);
}

@Override
public AggregatorHandle<HistogramAccumulation> createHandle() {
return new Handle(this.boundaries);
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
Comment thread
beanliu marked this conversation as resolved.
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
@Override
public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) {
long[] mergedCounts = new long[x.getCounts().length];
for (int i = 0; i < x.getCounts().length; ++i) {
mergedCounts[i] = x.getCounts()[i] + y.getCounts()[i];
}
return HistogramAccumulation.create(x.getSum() + y.getSum(), mergedCounts);
}

@Override
public final MetricData toMetricData(
Comment thread
beanliu marked this conversation as resolved.
Map<Labels, HistogramAccumulation> accumulationByLabels,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
return MetricData.createDoubleHistogram(
getResource(),
getInstrumentationLibraryInfo(),
getInstrumentDescriptor().getName(),
getInstrumentDescriptor().getDescription(),
getInstrumentDescriptor().getUnit(),
DoubleHistogramData.create(
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA,
MetricDataUtils.toDoubleHistogramPointList(
accumulationByLabels,
isStateful() ? startEpochNanos : lastCollectionEpoch,
epochNanos,
boundaryList)));
}

@Override
public HistogramAccumulation accumulateDouble(double value) {
long[] counts = new long[this.boundaries.length + 1];
counts[findBucketIndex(this.boundaries, value)] = 1;
return HistogramAccumulation.create(value, counts);
}

@Override
public HistogramAccumulation accumulateLong(long value) {
return accumulateDouble((double) value);
}

// Benchmark shows that linear search performs better than binary search with ordinary
// buckets.
private static int findBucketIndex(double[] boundaries, double value) {
for (int i = 0; i < boundaries.length; ++i) {
if (value <= boundaries[i]) {
return i;
}
}
return boundaries.length;
}

static final class Handle extends AggregatorHandle<HistogramAccumulation> {
// read-only
private final double[] boundaries;

@GuardedBy("lock")
private double sum;

@GuardedBy("lock")
private final long[] counts;

private final ReentrantLock lock = new ReentrantLock();
Comment thread
beanliu marked this conversation as resolved.

Handle(double[] boundaries) {
this.boundaries = boundaries;
this.counts = new long[this.boundaries.length + 1];
this.sum = 0;
}

@Override
protected HistogramAccumulation doAccumulateThenReset() {
lock.lock();
try {
HistogramAccumulation acc =
HistogramAccumulation.create(sum, Arrays.copyOf(counts, counts.length));
this.sum = 0;
Arrays.fill(this.counts, 0);
return acc;
} finally {
lock.unlock();
}
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = findBucketIndex(this.boundaries, value);

lock.lock();
try {
this.sum += value;
this.counts[bucketIndex]++;
} finally {
lock.unlock();
}
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import com.google.auto.value.AutoValue;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
abstract class HistogramAccumulation {
/**
* Creates a new {@link HistogramAccumulation} with the given values. Assume `counts` is read-only
* so we don't need a defensive-copy here.
*
* @return a new {@link HistogramAccumulation} with the given values.
*/
static HistogramAccumulation create(double sum, long[] counts) {
return new AutoValue_HistogramAccumulation(sum, counts);
}

HistogramAccumulation() {}

/**
* The sum of all measurements recorded.
*
* @return the sum of recorded measurements.
*/
abstract double getSum();

/**
* The counts in each bucket. The returned type is a mutable object, but it should be fine because
* the class is only used internally.
*
* @return the counts in each bucket. <b>do not mutate</b> the returned object.
*/
@SuppressWarnings("mutable")
abstract long[] getCounts();
}
Loading