Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
22beaee
add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 22, 2021
ed0251b
add no-op histogram support for the OTLP/Prometheus exporters
beanliu Jan 25, 2021
c5697b6
add Histogram aggregator and accumulation
beanliu Jan 22, 2021
264979b
fixup! add MetricDataType.HISTOGRAM and related data container types
beanliu Jan 27, 2021
0859553
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
088f615
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
a7e9142
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
7b8cebd
fixup! add Histogram aggregator and accumulation
beanliu Jan 27, 2021
75c9e4b
fixup! add Histogram aggregator and accumulation
beanliu Jan 28, 2021
79c1e0e
use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
e12b6c8
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
d4c2a0d
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 28, 2021
ce83dbc
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 29, 2021
d46ba13
fixup! use Immutable(Long|Double)Array for histogram aggregation
beanliu Jan 31, 2021
09dd32b
remove ImmutableDoubleArray.of()
beanliu Feb 22, 2021
7696284
remove boundaries from HistogramAccumulation
beanliu Feb 22, 2021
df1d459
Merge remote-tracking branch 'upstream/main' into histogram-aggregati…
beanliu Feb 23, 2021
34ef27b
move ImmutableDoubleArray and ImmutableLongArray to sdk.metrics.aggre…
beanliu Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ static Collector.Type toMetricFamilyType(MetricData metricData) {
return Collector.Type.GAUGE;
case SUMMARY:
return Collector.Type.SUMMARY;
case HISTOGRAM:
return Collector.Type.HISTOGRAM;
}
return Collector.Type.UNTYPED;
}
Expand Down Expand Up @@ -122,6 +124,9 @@ static List<Sample> toSamples(
addSummarySamples(
(DoubleSummaryPointData) pointData, name, labelNames, labelValues, samples);
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
}
return samples;
Expand Down Expand Up @@ -189,6 +194,8 @@ private static Collection<? extends PointData> getPoints(MetricData metricData)
return metricData.getLongSumData().getPoints();
case SUMMARY:
return metricData.getDoubleSummaryData().getPoints();
case HISTOGRAM:
return metricData.getDoubleHistogramData().getPoints();
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ static Metric toProtoMetric(MetricData metricData) {
.addAllDataPoints(toDoubleDataPoints(doubleGaugeData.getPoints()))
.build());
break;
case HISTOGRAM:
// TODO: no-op, will add in the following PRs
break;
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.ImmutableDoubleArray;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(
ImmutableDoubleArray.copyOf(new double[] {10, 100, 1_000}), /* stateful= */ false)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.getEmpty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
public final void setup() {
aggregatorHandle = aggregator.createHandle();
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 10)
public void aggregate_10Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 5)
public void aggregate_5Threads() {
aggregatorHandle.recordDouble(100.0056);
}

@Benchmark
@Fork(1)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(value = 1)
public void aggregate_1Threads() {
aggregatorHandle.recordDouble(100.0056);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.ImmutableDoubleArray;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;
Expand Down Expand Up @@ -45,6 +46,18 @@ static AggregatorFactory count(AggregationTemporality temporality) {
return new CountAggregatorFactory(temporality);
}

/**
* Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of
* the measurements taken.
*
* @param stateful configures if the aggregator is stateful.
* @param boundaries configures the fixed bucket boundaries.
* @return an {@code AggregationFactory} that calculates histogram of recorded measurements.
*/
static AggregatorFactory histogram(ImmutableDoubleArray boundaries, boolean stateful) {
return new HistogramAggregatorFactory(boundaries, stateful);
}

/**
* Returns an {@code AggregationFactory} that calculates the last value of all recorded
* measurements.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.ImmutableDoubleArray;
import io.opentelemetry.sdk.metrics.common.ImmutableLongArray;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

final class DoubleHistogramAggregator extends AbstractAggregator<HistogramAccumulation> {
private static final Logger logger = Logger.getLogger(DoubleHistogramAggregator.class.getName());

private static volatile boolean loggedMergingInvalidBoundaries = false;

private final ImmutableDoubleArray boundaries;

DoubleHistogramAggregator(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor,
ImmutableDoubleArray boundaries,
boolean stateful) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful);
this.boundaries = boundaries;
}

@Override
public AggregatorHandle<HistogramAccumulation> createHandle() {
return new Handle(this.boundaries);
}

@Override
public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) {
if (!x.getBoundaries().equals(y.getBoundaries())) {
// If this happens, it's a pretty severe bug in the SDK.
if (!loggedMergingInvalidBoundaries) {
logger.log(
Level.SEVERE,
"can't merge histograms with different boundaries, something's very wrong: "
+ "x.boundaries="
+ x.getBoundaries()
+ " y.boundaries="
+ y.getBoundaries());
loggedMergingInvalidBoundaries = true;
}
return HistogramAccumulation.create(
0, 0, ImmutableDoubleArray.of(), ImmutableLongArray.of(0));
}

long[] mergedCounts = new long[x.getCounts().length()];
for (int i = 0; i < x.getCounts().length(); ++i) {
mergedCounts[i] = x.getCounts().get(i) + y.getCounts().get(i);
}
return HistogramAccumulation.create(
x.getCount() + y.getCount(),
x.getSum() + y.getSum(),
x.getBoundaries(),
ImmutableLongArray.copyOf(mergedCounts));
}

@Override
public final MetricData toMetricData(
Map<Labels, HistogramAccumulation> accumulationByLabels,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
return MetricData.createDoubleHistogram(
getResource(),
getInstrumentationLibraryInfo(),
getInstrumentDescriptor().getName(),
getInstrumentDescriptor().getDescription(),
getInstrumentDescriptor().getUnit(),
DoubleHistogramData.create(
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA,
MetricDataUtils.toDoubleHistogramPointList(
accumulationByLabels,
isStateful() ? startEpochNanos : lastCollectionEpoch,
epochNanos)));
}

@Override
public HistogramAccumulation accumulateDouble(double value) {
return HistogramAccumulation.create(
1, value, ImmutableDoubleArray.of(), ImmutableLongArray.of(1));
}

@Override
public HistogramAccumulation accumulateLong(long value) {
return HistogramAccumulation.create(
1, value, ImmutableDoubleArray.of(), ImmutableLongArray.of(1));
}

static final class Handle extends AggregatorHandle<HistogramAccumulation> {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Comment thread
beanliu marked this conversation as resolved.
Outdated

@GuardedBy("lock")
private final State current;

Handle(ImmutableDoubleArray boundaries) {
current = new State(boundaries);
}

@Override
protected HistogramAccumulation doAccumulateThenReset() {
double sum;
ImmutableLongArray counts;
lock.writeLock().lock();
Comment thread
beanliu marked this conversation as resolved.
Outdated
try {
sum = current.sum;
counts = ImmutableLongArray.copyOf(current.counts);
current.reset();
} finally {
lock.writeLock().unlock();
}

long totalCount = 0;
for (int i = 0; i < counts.length(); ++i) {
totalCount += counts.get(i);
}

return HistogramAccumulation.create(totalCount, sum, current.boundaries, counts);
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = current.findBucketIndex(value);

lock.writeLock().lock();
try {
current.record(bucketIndex, value);
} finally {
lock.writeLock().unlock();
}
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
}

private static final class State {
private double sum;
private final ImmutableDoubleArray boundaries;
private final long[] counts;

public State(ImmutableDoubleArray boundaries) {
this.boundaries = boundaries;
this.counts = new long[this.boundaries.length() + 1];
reset();
}

// Benchmark shows that linear search performs better than binary search with ordinary
// buckets.
private int findBucketIndex(double value) {
for (int i = 0; i < this.boundaries.length(); ++i) {
if (value < this.boundaries.get(i)) {
return i;
}
}
return this.boundaries.length();
}

private void reset() {
this.sum = 0;
Arrays.fill(this.counts, 0);
}

private void record(int bucketIndex, double value) {
this.sum += value;
this.counts[bucketIndex]++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.aggregator;

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.metrics.common.ImmutableDoubleArray;
import io.opentelemetry.sdk.metrics.common.ImmutableLongArray;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
public abstract class HistogramAccumulation {
Comment thread
beanliu marked this conversation as resolved.
Outdated
Comment thread
beanliu marked this conversation as resolved.
Outdated
/**
* Creates a new {@link HistogramAccumulation} with the given values.
*
* @return a new {@link HistogramAccumulation} with the given values.
*/
static HistogramAccumulation create(
long count, double sum, ImmutableDoubleArray boundaries, ImmutableLongArray counts) {
return new AutoValue_HistogramAccumulation(count, sum, boundaries, counts);
}

HistogramAccumulation() {}

/**
* The number of measurements taken.
*
* @return the count of recorded measurements.
*/
abstract long getCount();

/**
* The sum of all measurements recorded.
*
* @return the sum of recorded measurements.
*/
abstract double getSum();

/**
* The bucket boundaries. For a Histogram with N defined boundaries, e.g, [x, y, z]. There are N+1
* counts: [-inf, x), [x, y), [y, z), [z, +inf].
*
* @return the bucket boundaries in increasing order.
*/
abstract ImmutableDoubleArray getBoundaries();

/**
* The counts in each bucket.
*
* @return the counts in each bucket.
*/
abstract ImmutableLongArray getCounts();

final DoubleHistogramPointData toPoint(long startEpochNanos, long epochNanos, Labels labels) {
return DoubleHistogramPointData.create(
startEpochNanos, epochNanos, labels, getSum(), getCount(), getBoundaries(), getCounts());
}
}
Loading