Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
22beaee
add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 22, 2021
ed0251b
add no-op histogram support for the OTLP/Prometheus exporters
beanliu Jan 25, 2021
c5697b6
add Histogram aggregator and accumulation
beanliu Jan 22, 2021
264979b
fixup! add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 27, 2021
0859553
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
088f615
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
a7e9142
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
7b8cebd
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
75c9e4b
fixup! add Histogram aggregator and accumulation
beanliu Jan 28, 2021
79c1e0e
use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
e12b6c8
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
d4c2a0d
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
ce83dbc
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 29, 2021
d46ba13
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 31, 2021
09dd32b
remove ImmutableDoubleArray.of()
beanliu Feb 22, 2021
7696284
remove boundaries from HistogramAccumulation
beanliu Feb 22, 2021
df1d459
Merge remote-tracking branch 'upstream/main' into histogram-aggregati…
beanliu Feb 23, 2021
34ef27b
move ImmutableDoubleArray and ImmutableLongArray to sdk.metrics.aggre…
beanliu Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ static Collector.Type toMetricFamilyType(MetricData metricData) {
return Collector.Type.GAUGE;
case SUMMARY:
return Collector.Type.SUMMARY;
case HISTOGRAM:
return Collector.Type.HISTOGRAM;
}
return Collector.Type.UNTYPED;
}
Expand Down Expand Up @@ -122,6 +124,9 @@ static List<Sample> toSamples(
addSummarySamples(
(DoubleSummaryPointData) pointData, name, labelNames, labelValues, samples);
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
}
return samples;
Expand Down Expand Up @@ -189,6 +194,8 @@ private static Collection<? extends PointData> getPoints(MetricData metricData)
return metricData.getLongSumData().getPoints();
case SUMMARY:
return metricData.getDoubleSummaryData().getPoints();
case HISTOGRAM:
return metricData.getDoubleHistogramData().getPoints();
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ static Metric toProtoMetric(MetricData metricData) {
.addAllDataPoints(toDoubleDataPoints(doubleGaugeData.getPoints()))
.build());
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(new double[] {10, 100, 1_000}, /* stateful= */ false)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.getEmpty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregator.createHandle();
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ static AggregatorFactory count(AggregationTemporality temporality) {
return new CountAggregatorFactory(temporality);
}

/**
* Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of
* the measurements taken.
*
* @param stateful configures if the aggregator is stateful.
* @param boundaries configures the fixed bucket boundaries.
* @return an {@code AggregationFactory} that calculates histogram of recorded measurements.
*/
static AggregatorFactory histogram(double[] boundaries, boolean stateful) {
return new HistogramAggregatorFactory(boundaries, stateful);
}

/**
* Returns an {@code AggregationFactory} that calculates the last value of all recorded
* measurements.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

final class DoubleHistogramAggregator extends AbstractAggregator<HistogramAccumulation> {
private static final Logger logger = Logger.getLogger(DoubleHistogramAggregator.class.getName());

private static boolean LoggedMergingInvalidBoundaries = false;
Comment thread
beanliu marked this conversation as resolved.
Outdated

private final double[] boundaries;

DoubleHistogramAggregator(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
double[] boundaries,
boolean stateful) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful);
this.boundaries = boundaries;
}

private static List<Double> asUnmodifiableDoubleList(double[] xs) {
List<Double> result = new ArrayList<>(xs.length);
for (double x : xs) {
result.add(x);
}
return result;
}

private static List<Long> asUnmodifiableLongList(long[] xs) {
List<Long> result = new ArrayList<>(xs.length);
for (long x : xs) {
result.add(x);
}
return result;
}

@Override
public AggregatorHandle<HistogramAccumulation> createHandle() {
return new Handle(this.boundaries);
}

@Override
public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) {
if (!x.getBoundaries().equals(y.getBoundaries())) {
// If this happens, it's a pretty severe bug in the SDK.
if (!LoggedMergingInvalidBoundaries) {
logger.log(
Level.SEVERE,
"can't merge histograms with different boundaries, something's very wrong: "
+ "x.boundaries="
+ x.getBoundaries()
+ " y.boundaries="
+ y.getBoundaries());
LoggedMergingInvalidBoundaries = true;
}
return HistogramAccumulation.create(
0, 0, Collections.emptyList(), Collections.singletonList(0L));
}

long[] mergedCounts = new long[x.getCounts().size()];
for (int i = 0; i < x.getCounts().size(); ++i) {
mergedCounts[i] = x.getCounts().get(i) + y.getCounts().get(i);
}
return HistogramAccumulation.create(
x.getCount() + y.getCount(),
x.getSum() + y.getSum(),
x.getBoundaries(),
asUnmodifiableLongList(mergedCounts));
}

@Override
public final MetricData toMetricData(
Map<Labels, HistogramAccumulation> accumulationByLabels,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
return MetricData.createDoubleHistogram(
getResource(),
getInstrumentationLibraryInfo(),
getInstrumentDescriptor().getName(),
getInstrumentDescriptor().getDescription(),
getInstrumentDescriptor().getUnit(),
DoubleHistogramData.create(
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA,
MetricDataUtils.toDoubleHistogramPointList(
accumulationByLabels,
isStateful() ? startEpochNanos : lastCollectionEpoch,
epochNanos)));
}

@Override
public HistogramAccumulation accumulateDouble(double value) {
return HistogramAccumulation.create(
1, value, Collections.emptyList(), Collections.singletonList(1L));
}

@Override
public HistogramAccumulation accumulateLong(long value) {
return HistogramAccumulation.create(
1, value, Collections.emptyList(), Collections.singletonList(1L));
}

static final class Handle extends AggregatorHandle<HistogramAccumulation> {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Comment thread
beanliu marked this conversation as resolved.
Outdated

@GuardedBy("lock")
private final State current;

Handle(double[] boundaries) {
current = new State(boundaries);
}

@Override
protected HistogramAccumulation doAccumulateThenReset() {
List<Long> counts;
lock.writeLock().lock();
Comment thread
beanliu marked this conversation as resolved.
Outdated
try {
counts = asUnmodifiableLongList(current.counts);
current.reset();
} finally {
lock.writeLock().unlock();
}

return HistogramAccumulation.create(
counts.stream().mapToLong(i -> i).sum(),
current.sum,
asUnmodifiableDoubleList(current.boundaries),
asUnmodifiableLongList(current.counts));
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = current.findBucketIndex(value);

lock.writeLock().lock();
try {
current.record(bucketIndex, value);
} finally {
lock.writeLock().unlock();
}
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
}

private static final class State {
private double sum;
private final double[] boundaries;
private final long[] counts;

public State(double[] boundaries) {
this.boundaries = Arrays.copyOf(boundaries, boundaries.length);
this.counts = new long[this.boundaries.length + 1];
reset();
}

// Benchmark shows that linear search performs better with ordinary buckets.
Comment thread
beanliu marked this conversation as resolved.
Outdated
private int findBucketIndex(double value) {
for (int i = 0; i < this.boundaries.length; ++i) {
if (value < this.boundaries[i]) {
return i;
}
}
return this.boundaries.length;
}

private void reset() {
this.sum = 0;
Arrays.fill(this.counts, 0);
}

private void record(int bucketIndex, double value) {
this.sum += value;
this.counts[bucketIndex]++;
}
}
}
}
Loading