Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TemporalMetricStorage
// Lock while building metrics
mutable opentelemetry::common::SpinLockMutex lock_;
const AggregationConfig *aggregation_config_;
opentelemetry::common::SystemTimestamp last_delta_collection_ts_;
bool has_last_delta_collection_ts_ = false;
};
} // namespace metrics
} // namespace sdk
Expand Down
10 changes: 8 additions & 2 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,24 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
// no other reader configured to collect those data.
if (collectors.size() == 1 && aggregation_temporarily == AggregationTemporality::kDelta)
{
if (!has_last_delta_collection_ts_)
{
last_delta_collection_ts_ = sdk_start_ts;
has_last_delta_collection_ts_ = true;
}
// If no metrics, early return
if (delta_metrics->Size() == 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the empty-collection path, the function early-returns when delta_metrics->Size() == 0 before updating last_delta_collection_ts_. If a cycle at t2 is empty and the next cycle at t3 has data, the emitted point will carry start_ts = t1 (the last non-empty collection) rather than t2, even though the delta map only contains activity from (t2, t3]. Successive delta windows then overlap instead of abutting, which violates the OTel data model's requirement that start_ts(n) == end_ts(n-1) and causes rate computations (value / (end - start)) to under-report. The slow path doesn't have this issue because last_reported_metrics_[collector].collection_ts is updated on every call, including empty ones.

{
last_delta_collection_ts_ = collection_ts;
return true;
}
// Create MetricData directly
MetricData metric_data;
metric_data.instrument_descriptor = instrument_descriptor_;
metric_data.aggregation_temporality = AggregationTemporality::kDelta;
metric_data.start_ts = sdk_start_ts;
metric_data.start_ts = last_delta_collection_ts_;
metric_data.end_ts = collection_ts;
last_delta_collection_ts_ = collection_ts;

// Direct conversion of delta metrics to point data
delta_metrics->GetAllEntries(
Expand Down Expand Up @@ -92,7 +99,6 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
auto present = unreported_metrics_.find(collector);
if (present == unreported_metrics_.end())
{
// no unreported metrics for the collector, return.
return true;
}
auto unreported_list = std::move(present->second);
Expand Down
65 changes: 65 additions & 0 deletions sdk/test/metrics/sync_metric_storage_counter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,71 @@ TEST_P(WritableMetricStorageTestFixture, DoubleCounterSumAggregation)
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}

TEST(SyncMetricStorageTest, DeltaCounterStartTimestampTracksEmptyCycles)
{
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
std::shared_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::SyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor,
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(),
#endif
nullptr);

std::map<std::string, std::string> attributes = {{"RequestType", "GET"}};
std::shared_ptr<CollectorHandle> collector(
new MockCollectorHandle(AggregationTemporality::kDelta));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

auto sdk_start_ts = std::chrono::system_clock::now();
auto collection_ts1 = sdk_start_ts + std::chrono::seconds(1);
auto collection_ts2 = sdk_start_ts + std::chrono::seconds(2);
auto collection_ts3 = sdk_start_ts + std::chrono::seconds(3);

storage.RecordLong(10, KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});

MetricData metric_cycle1;
bool cycle1_called = false;
storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts1,
[&](const MetricData &metric_data) {
metric_cycle1 = metric_data;
cycle1_called = true;
return true;
});
EXPECT_TRUE(cycle1_called);

bool cycle2_called = false;
storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts2,
[&](const MetricData &) {
cycle2_called = true;
return true;
});
EXPECT_FALSE(cycle2_called); // Check if Empty cycle in the middle

storage.RecordLong(20, KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});

MetricData metric_cycle3;
bool cycle3_called = false;
storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts3,
[&](const MetricData &metric_data) {
metric_cycle3 = metric_data;
cycle3_called = true;
return true;
});
EXPECT_TRUE(cycle3_called);
// Check that the fast path correctly preserved the timestamp from the empty
// collection cycle (cycle 2)
EXPECT_EQ(metric_cycle3.start_ts, collection_ts2);
EXPECT_EQ(metric_cycle1.start_ts, sdk_start_ts);
EXPECT_EQ(metric_cycle1.end_ts, collection_ts1);
EXPECT_EQ(metric_cycle3.end_ts, collection_ts3);
}
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble,
WritableMetricStorageTestFixture,
::testing::Values(AggregationTemporality::kCumulative,
Expand Down
Loading