Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
Comment thread
beanliu marked this conversation as resolved.
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(new double[] {10, 100, 1_000}, AggregationTemporality.DELTA)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregator.createHandle();
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
Comment thread
bogdandrutu marked this conversation as resolved.
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ static AggregatorFactory minMaxSumCount() {
return MinMaxSumCountAggregatorFactory.INSTANCE;
}

/**
* Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of
* the measurements taken.
*
* @param temporality configures what temporality to be produced for the Histogram metrics.
* @param boundaries configures the fixed bucket boundaries.
* @return an {@code AggregationFactory} that calculates histogram of recorded measurements.
*/
static AggregatorFactory histogram(double[] boundaries, AggregationTemporality temporality) {
Comment thread
beanliu marked this conversation as resolved.
Outdated
return new HistogramAggregatorFactory(boundaries, temporality);
}

/**
* Returns a new {@link Aggregator}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;

final class DoubleHistogramAggregator extends AbstractAggregator<HistogramAccumulation> {
private static final long[] countsOfOne = new long[] {1};

private final double[] boundaries;

DoubleHistogramAggregator(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
double[] boundaries,
boolean stateful) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful);
this.boundaries = boundaries;
}

@Override
public AggregatorHandle<HistogramAccumulation> createHandle() {
return new Handle(this.boundaries);
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
Comment thread
beanliu marked this conversation as resolved.
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
@Override
public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) {
long[] mergedCounts = new long[x.getCounts().length];
for (int i = 0; i < x.getCounts().length; ++i) {
mergedCounts[i] = x.getCounts()[i] + y.getCounts()[i];
}
return HistogramAccumulation.create(x.getSum() + y.getSum(), mergedCounts);
}

@Override
public final MetricData toMetricData(
Comment thread
beanliu marked this conversation as resolved.
Map<Labels, HistogramAccumulation> accumulationByLabels,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
List<Double> boundaries = new ArrayList<>(this.boundaries.length);
for (double v : this.boundaries) {
boundaries.add(v);
}
Comment thread
beanliu marked this conversation as resolved.
Outdated
return MetricData.createDoubleHistogram(
getResource(),
getInstrumentationLibraryInfo(),
getInstrumentDescriptor().getName(),
getInstrumentDescriptor().getDescription(),
getInstrumentDescriptor().getUnit(),
DoubleHistogramData.create(
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA,
MetricDataUtils.toDoubleHistogramPointList(
accumulationByLabels,
isStateful() ? startEpochNanos : lastCollectionEpoch,
epochNanos,
boundaries)));
}

@Override
public HistogramAccumulation accumulateDouble(double value) {
return HistogramAccumulation.create(value, countsOfOne);
Comment thread
beanliu marked this conversation as resolved.
Outdated
}

@Override
public HistogramAccumulation accumulateLong(long value) {
return HistogramAccumulation.create(value, countsOfOne);
Comment thread
beanliu marked this conversation as resolved.
Outdated
}

static final class Handle extends AggregatorHandle<HistogramAccumulation> {
private final double[] boundaries;

private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private final State current;

Handle(double[] boundaries) {
this.boundaries = boundaries;
this.current = new State(this.boundaries.length + 1);
}

// Benchmark shows that linear search performs better than binary search with ordinary
// buckets.
private int findBucketIndex(double value) {
for (int i = 0; i < boundaries.length; ++i) {
if (Double.compare(value, boundaries[i]) <= 0) {
return i;
}
}
return boundaries.length;
}

@Override
protected HistogramAccumulation doAccumulateThenReset() {
lock.lock();
try {
HistogramAccumulation acc =
HistogramAccumulation.create(
current.sum, Arrays.copyOf(current.counts, current.counts.length));
current.reset();
return acc;
} finally {
lock.unlock();
}
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = findBucketIndex(value);

lock.lock();
try {
current.record(bucketIndex, value);
} finally {
lock.unlock();
}
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
}

private static final class State {
Comment thread
beanliu marked this conversation as resolved.
Outdated
private double sum;
private final long[] counts;

public State(int bucketSize) {
this.counts = new long[bucketSize];
reset();
}

private void reset() {
this.sum = 0;
Arrays.fill(this.counts, 0);
}

private void record(int bucketIndex, double value) {
this.sum += value;
this.counts[bucketIndex]++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import com.google.auto.value.AutoValue;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
abstract class HistogramAccumulation {
/**
* Creates a new {@link HistogramAccumulation} with the given values. Assume `counts` is read-only
* so we don't need a defensive-copy here.
*
* @return a new {@link HistogramAccumulation} with the given values.
*/
static HistogramAccumulation create(double sum, long[] counts) {
return new AutoValue_HistogramAccumulation(sum, counts);
}

HistogramAccumulation() {}

/**
* The sum of all measurements recorded.
*
* @return the sum of recorded measurements.
*/
abstract double getSum();

/**
* The counts in each bucket. The returned type is a mutable object, but it should be fine because
* the class is only used internally.
*
* @return the counts in each bucket. <b>do not mutate</b> the returned object.
*/
@SuppressWarnings("mutable")
abstract long[] getCounts();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;

final class HistogramAggregatorFactory implements AggregatorFactory {
private final double[] boundaries;
private final AggregationTemporality temporality;

HistogramAggregatorFactory(double[] boundaries, AggregationTemporality temporality) {
this.boundaries = boundaries;
Comment thread
beanliu marked this conversation as resolved.
Outdated
this.temporality = temporality;

Comment thread
beanliu marked this conversation as resolved.
for (int i = 1; i < this.boundaries.length; ++i) {
if (Double.compare(this.boundaries[i - 1], this.boundaries[i]) >= 0) {
Comment thread
beanliu marked this conversation as resolved.
Outdated
throw new IllegalArgumentException(
"invalid bucket boundary: " + this.boundaries[i - 1] + " >= " + this.boundaries[i]);
}
}
if (this.boundaries.length > 0) {
if (this.boundaries[0] == Double.NEGATIVE_INFINITY) {
throw new IllegalArgumentException("invalid bucket boundary: -Inf");
}
if (this.boundaries[this.boundaries.length - 1] == Double.POSITIVE_INFINITY) {
throw new IllegalArgumentException("invalid bucket boundary: +Inf");
}
}
}

@Override
@SuppressWarnings("unchecked")
public <T> Aggregator<T> create(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor descriptor) {
final boolean stateful = this.temporality == AggregationTemporality.CUMULATIVE;
switch (descriptor.getValueType()) {
case LONG:
case DOUBLE:
return (Aggregator<T>)
new DoubleHistogramAggregator(
resource, instrumentationLibraryInfo, descriptor, this.boundaries, stateful);
}
throw new IllegalArgumentException("Invalid instrument value type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
Expand Down Expand Up @@ -44,4 +45,23 @@ static List<DoubleSummaryPointData> toDoubleSummaryPointList(
points.add(aggregator.toPoint(startEpochNanos, epochNanos, labels)));
return points;
}

static List<DoubleHistogramPointData> toDoubleHistogramPointList(
Map<Labels, HistogramAccumulation> accumulationMap,
long startEpochNanos,
long epochNanos,
List<Double> boundaries) {
List<DoubleHistogramPointData> points = new ArrayList<>(accumulationMap.size());
accumulationMap.forEach(
Comment thread
beanliu marked this conversation as resolved.
(labels, aggregator) -> {
List<Long> counts = new ArrayList<>(aggregator.getCounts().length);
for (long v : aggregator.getCounts()) {
counts.add(v);
}
points.add(
DoubleHistogramPointData.create(
startEpochNanos, epochNanos, labels, aggregator.getSum(), boundaries, counts));
});
return points;
}
}
Loading