Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
22beaee
add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 22, 2021
ed0251b
add no-op histogram support for the OTLP/Prometheus exporters
beanliu Jan 25, 2021
c5697b6
add Histogram aggregator and accumulation
beanliu Jan 22, 2021
264979b
fixup! add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 27, 2021
0859553
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
088f615
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
a7e9142
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
7b8cebd
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
75c9e4b
fixup! add Histogram aggregator and accumulation
beanliu Jan 28, 2021
79c1e0e
use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
e12b6c8
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
d4c2a0d
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
ce83dbc
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 29, 2021
d46ba13
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 31, 2021
09dd32b
remove ImmutableDoubleArray.of()
beanliu Feb 22, 2021
7696284
remove boundaries from HistogramAccumulation
beanliu Feb 22, 2021
df1d459
Merge remote-tracking branch 'upstream/main' into histogram-aggregati…
beanliu Feb 23, 2021
34ef27b
move ImmutableDoubleArray and ImmutableLongArray to sdk.metrics.aggre…
beanliu Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ static Metric toProtoMetric(MetricData metricData) {
.addAllDataPoints(toDoubleDataPoints(doubleGaugeData.getPoints()))
.build());
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ static Collector.Type toMetricFamilyType(MetricData metricData) {
return Collector.Type.GAUGE;
case SUMMARY:
return Collector.Type.SUMMARY;
case HISTOGRAM:
return Collector.Type.HISTOGRAM;
}
return Collector.Type.UNKNOWN;
}
Expand Down Expand Up @@ -122,6 +124,9 @@ static List<Sample> toSamples(
addSummarySamples(
(DoubleSummaryPointData) pointData, name, labelNames, labelValues, samples);
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
}
return samples;
Expand Down Expand Up @@ -189,6 +194,8 @@ private static Collection<? extends PointData> getPoints(MetricData metricData)
return metricData.getLongSumData().getPoints();
case SUMMARY:
return metricData.getDoubleSummaryData().getPoints();
case HISTOGRAM:
return metricData.getDoubleHistogramData().getPoints();
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(new double[] {10, 100, 1_000}, /* stateful= */ false)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregator.createHandle();
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ static AggregatorFactory count(AggregationTemporality temporality) {
return new CountAggregatorFactory(temporality);
}

/**
* Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of
* the measurements taken.
*
* @param stateful configures if the aggregator is stateful.
* @param boundaries configures the fixed bucket boundaries.
* @return an {@code AggregationFactory} that calculates histogram of recorded measurements.
*/
static AggregatorFactory histogram(double[] boundaries, boolean stateful) {
return new HistogramAggregatorFactory(boundaries, stateful);
}

/**
* Returns an {@code AggregationFactory} that calculates the last value of all recorded
* measurements.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;

final class DoubleHistogramAggregator extends AbstractAggregator<HistogramAccumulation> {
private final ImmutableDoubleArray boundaries;

DoubleHistogramAggregator(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
ImmutableDoubleArray boundaries,
boolean stateful) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful);
this.boundaries = boundaries;
}

@Override
public AggregatorHandle<HistogramAccumulation> createHandle() {
return new Handle(this.boundaries);
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
@Override
public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) {
long[] mergedCounts = new long[x.getCounts().length()];
for (int i = 0; i < x.getCounts().length(); ++i) {
mergedCounts[i] = x.getCounts().get(i) + y.getCounts().get(i);
}
return HistogramAccumulation.create(
x.getCount() + y.getCount(),
x.getSum() + y.getSum(),
ImmutableLongArray.copyOf(mergedCounts));
}

@Override
public final MetricData toMetricData(
Map<Labels, HistogramAccumulation> accumulationByLabels,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
return MetricData.createDoubleHistogram(
getResource(),
getInstrumentationLibraryInfo(),
getInstrumentDescriptor().getName(),
getInstrumentDescriptor().getDescription(),
getInstrumentDescriptor().getUnit(),
DoubleHistogramData.create(
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA,
MetricDataUtils.toDoubleHistogramPointList(
accumulationByLabels,
isStateful() ? startEpochNanos : lastCollectionEpoch,
epochNanos,
boundaries.toList())));
}

@Override
public HistogramAccumulation accumulateDouble(double value) {
return HistogramAccumulation.create(1, value, ImmutableLongArray.of(1));
}

@Override
public HistogramAccumulation accumulateLong(long value) {
return HistogramAccumulation.create(1, value, ImmutableLongArray.of(1));
}

static final class Handle extends AggregatorHandle<HistogramAccumulation> {
private final ImmutableDoubleArray boundaries;

private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private final State current;

Handle(ImmutableDoubleArray boundaries) {
this.boundaries = boundaries;
this.current = new State(this.boundaries.length() + 1);
}

// Benchmark shows that linear search performs better than binary search with ordinary
// buckets.
private int findBucketIndex(double value) {
for (int i = 0; i < boundaries.length(); ++i) {
if (value < boundaries.get(i)) {
return i;
}
}
return boundaries.length();
}

@Override
protected HistogramAccumulation doAccumulateThenReset() {
double sum;
ImmutableLongArray counts;
lock.lock();
try {
sum = current.sum;
counts = ImmutableLongArray.copyOf(current.counts);
current.reset();
} finally {
lock.unlock();
}

long totalCount = 0;
for (int i = 0; i < counts.length(); ++i) {
totalCount += counts.get(i);
}

return HistogramAccumulation.create(totalCount, sum, counts);
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = findBucketIndex(value);

lock.lock();
try {
current.record(bucketIndex, value);
} finally {
lock.unlock();
}
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
}

private static final class State {
private double sum;
private final long[] counts;

public State(int bucketSize) {
this.counts = new long[bucketSize];
reset();
}

private void reset() {
this.sum = 0;
Arrays.fill(this.counts, 0);
}

private void record(int bucketIndex, double value) {
this.sum += value;
this.counts[bucketIndex]++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import com.google.auto.value.AutoValue;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
abstract class HistogramAccumulation {
/**
* Creates a new {@link HistogramAccumulation} with the given values.
*
* @return a new {@link HistogramAccumulation} with the given values.
*/
static HistogramAccumulation create(long count, double sum, ImmutableLongArray counts) {
return new AutoValue_HistogramAccumulation(count, sum, counts);
}

HistogramAccumulation() {}

/**
* The number of measurements taken.
*
* @return the count of recorded measurements.
*/
abstract long getCount();

/**
* The sum of all measurements recorded.
*
* @return the sum of recorded measurements.
*/
abstract double getSum();

/**
* The counts in each bucket.
*
* @return the counts in each bucket.
*/
abstract ImmutableLongArray getCounts();
}
Loading