Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public RawMetricSamplesHandler(MeasurementAccessor accessor, ExecutionMetricSamp
}

@Override
public void processMeasurements(List<Measurement> measurements) {
public void processMeasurements(ExecutionContext executionContext, List<Measurement> measurements) {
List<?> castedMeasurements = measurements;
if (measurements.size() > 0) {
accessor.saveManyMeasurements((List<Object>) castedMeasurements);
}
}

@Override
public void processMetrics(List<ExecutionMetricSample> metrics) {
public void processMetrics(ExecutionContext executionContext, List<ExecutionMetricSample> metrics) {
if (metrics != null && !metrics.isEmpty()) {
executionMetricSampleAccessor.save(metrics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.LoggerFactory;
import step.controller.services.async.AsyncTaskManager;
import step.controller.services.async.AsyncTaskManagerPlugin;
import step.core.Constants;
import step.core.GlobalContext;
import step.core.collections.Collection;
import step.core.collections.CollectionFactory;
Expand All @@ -13,6 +14,9 @@
import step.core.deployment.WebApplicationConfigurationManager;
import step.core.entities.Entity;
import step.core.entities.EntityConstants;
import step.core.execution.model.ExecutionNoticeSeverity;
import step.core.execution.notices.ExecutionNoticeManager;
import step.core.execution.notices.ExecutionNoticeType;
import step.core.metrics.MetricsConstants;
import step.core.plugins.AbstractControllerPlugin;
import step.core.plugins.Plugin;
Expand Down Expand Up @@ -126,7 +130,21 @@ public void serverStart(GlobalContext context) {
TimeSeriesIngestionPipeline mainIngestionPipeline = timeSeries.getIngestionPipeline();

TimeSeriesAggregationPipeline aggregationPipeline = timeSeries.getAggregationPipeline();
TimeSeriesMetricSamplesHandler handler = new TimeSeriesMetricSamplesHandler(timeSeries, includedAttributes, excludedAttributes);
// Safeguard against high cardinality on user-defined metric/measurement labels during executions.
int maxUniqueLabelValues = configuration.getPropertyAsInteger("timeseries.attributes.max-unique-label-values", 20);
ExecutionNoticeManager executionNoticeManager = context.require(ExecutionNoticeManager.class);
// The documentation URL is keyed on the Step "doc" version (the minor component, e.g. "30" for 3.30.0),
// derived from the version constant so it tracks version upgrades automatically.
String docUrl = "https://step.dev/knowledgebase/" + Constants.STEP_VERSION.getMinor()
+ "/userdocs/analytics/measurements-and-metrics/#label-cardinality-safeguard";
executionNoticeManager.register(new ExecutionNoticeType(
TimeSeriesMetricSamplesHandler.CARDINALITY_NOTICE_TYPE_ID,
"Time-series",
ExecutionNoticeSeverity.WARNING,
"High cardinality detected on the custom metric label <b>{labelName}</b> of metric <b>{metricName}</b>. " +
"Unique values exceeding the quota of {quota} were dismissed and are reported under a single placeholder value. " +
"<a href=\"" + docUrl + "\" target=\"_blank\">Learn more</a>"));
TimeSeriesMetricSamplesHandler handler = new TimeSeriesMetricSamplesHandler(timeSeries, includedAttributes, excludedAttributes, maxUniqueLabelValues, executionNoticeManager);

context.put(TimeSeries.class, timeSeries);
context.put(TimeSeriesIngestionPipeline.class, mainIngestionPipeline);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package step.plugins.timeseries;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.execution.ExecutionContext;
import step.core.execution.model.ExecutionNotice;
import step.core.execution.notices.ExecutionNoticeManager;
import step.core.metrics.InstrumentType;
import step.core.metrics.MetricSample;
import step.core.timeseries.TimeSeries;
Expand All @@ -15,6 +19,7 @@
import step.core.metrics.ExecutionMetricSample;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static step.core.metrics.StepMetricSample.METRIC_TYPE;
import static step.core.metrics.MetricsConstants.INSTRUMENT_TYPE_ATTRIBUTE;
Expand All @@ -26,11 +31,44 @@
*/
public class TimeSeriesMetricSamplesHandler implements MetricSamplesHandler {

private static final Logger logger = LoggerFactory.getLogger(TimeSeriesMetricSamplesHandler.class);

/** Static value substituted for label values dropped once a label exceeds its unique-value quota. */
public static final String QUOTA_EXCEEDED_VALUE = "values dismissed due to quota exceeded";

/** Id of the execution notice type raised when a label's unique-value quota is exceeded. */
public static final String CARDINALITY_NOTICE_TYPE_ID = "timeseries.label-cardinality-quota-exceeded";

private final TimeSeries timeSeries;

private final Set<String> handledAttributes;
private final Set<String> excludedAttributes;

/**
* Maximum number of unique values tolerated per (execution, metric name, label name) before new
* values are masked with {@link #QUOTA_EXCEEDED_VALUE}. A value {@code <= 0} disables the safeguard.
*/
private final int maxUniqueLabelValues;

/**
* In-memory cardinality tracking, bounded to the lifecycle of an execution and cleaned up in
* {@link #afterExecutionEnd(ExecutionContext)}:
* {@code executionId -> metricName -> labelName -> set of observed values}.
*/
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, Set<String>>>> labelValueTracking = new ConcurrentHashMap<>();

/**
* Tracks the {@code metricName + '\0' + labelName} keys for which a quota warning has already been
* raised, per execution, so the user is notified only once per label.
*/
private final ConcurrentHashMap<String, Set<String>> warnedLabels = new ConcurrentHashMap<>();

/**
* Used to raise an execution notice the first time a label exceeds its quota. May be {@code null}
* for re-ingestion/rebuild and test paths, in which case the quota breach is only logged.
*/
private final ExecutionNoticeManager executionNoticeManager;

/**
* @param timeSeries the time series to ingest metric samples into
* @param handledAttributes allowlist of attribute keys to retain when building bucket
Expand All @@ -48,31 +86,58 @@ public class TimeSeriesMetricSamplesHandler implements MetricSamplesHandler {
* supported and throws {@link IllegalArgumentException}.
*/
public TimeSeriesMetricSamplesHandler(TimeSeries timeSeries, Set<String> handledAttributes, Set<String> excludedAttributes) {
// Cardinality safeguard disabled by default: used by re-ingestion/rebuild paths (raw data was
// already filtered at first ingestion) and by tests that don't exercise the quota.
this(timeSeries, handledAttributes, excludedAttributes, 0, null);
}

/**
* @param maxUniqueLabelValues maximum number of unique values per (execution, metric name, label name)
* before new user-defined label values are masked. {@code <= 0} disables the safeguard.
*/
public TimeSeriesMetricSamplesHandler(TimeSeries timeSeries, Set<String> handledAttributes, Set<String> excludedAttributes, int maxUniqueLabelValues) {
this(timeSeries, handledAttributes, excludedAttributes, maxUniqueLabelValues, null);
}

/**
* @param maxUniqueLabelValues maximum number of unique values per (execution, metric name, label name)
* before new user-defined label values are masked. {@code <= 0} disables the safeguard.
* @param executionNoticeManager manager used to raise an execution notice the first time a label exceeds
* its quota; may be {@code null} (the breach is then only logged).
*/
public TimeSeriesMetricSamplesHandler(TimeSeries timeSeries, Set<String> handledAttributes, Set<String> excludedAttributes, int maxUniqueLabelValues, ExecutionNoticeManager executionNoticeManager) {
this.timeSeries = timeSeries;
this.handledAttributes = Objects.requireNonNull(handledAttributes);
this.excludedAttributes = Objects.requireNonNull(excludedAttributes);
this.maxUniqueLabelValues = maxUniqueLabelValues;
this.executionNoticeManager = executionNoticeManager;
if (!handledAttributes.isEmpty() && !excludedAttributes.isEmpty()) {
throw new IllegalArgumentException("Either a set of handled attributes or a set of excluded attributes is required, setting both is not supported.");
}
MetricHeartbeatRegistry.getInstance().registerHandler(this);
}

@Override
public void processMeasurements(List<Measurement> measurements) {
measurements.forEach(this::processMeasurement);
public void processMeasurements(ExecutionContext executionContext, List<Measurement> measurements) {
measurements.forEach(m -> processMeasurement(executionContext, m));
}

/** Re-ingestion entry point (no execution context, no cardinality safeguard). */
public void processMeasurement(Measurement measurement) {
processMeasurement(null, measurement);
}

public void processMeasurement(ExecutionContext executionContext, Measurement measurement) {
long begin = measurement.getBegin();
long value = measurement.getValue();

BucketAttributes bucketAttributes = measurementToBucketAttributes(measurement);
BucketAttributes bucketAttributes = measurementToBucketAttributes(executionContext, measurement);
bucketAttributes.put(METRIC_TYPE, RESPONSE_TIME);
TimeSeriesIngestionPipeline ingestionPipeline = this.timeSeries.getIngestionPipeline();
ingestionPipeline.ingestPoint(bucketAttributes, begin, value);
}

private BucketAttributes measurementToBucketAttributes(Measurement measurement) {
private BucketAttributes measurementToBucketAttributes(ExecutionContext executionContext, Measurement measurement) {
Map<String, Object> bucketAttributesMap = new HashMap<>();
if (handledAttributes.isEmpty()) {
bucketAttributesMap.putAll(measurement);
Expand All @@ -84,12 +149,15 @@ private BucketAttributes measurementToBucketAttributes(Measurement measurement)
}
});
}
// Apply the cardinality safeguard to the keyword-author-defined custom data labels only.
applyLabelQuota(executionContext, measurement.getExecId(), measurement.getName(),
measurement.getCustomMetricLabelKeys(), bucketAttributesMap);
return new BucketAttributes(bucketAttributesMap);
}

private void processThreadGroupAsMeasurement(Measurement measurement) {
if (measurement != null) {
BucketAttributes bucketAttributes = measurementToBucketAttributes(measurement);
BucketAttributes bucketAttributes = measurementToBucketAttributes(null, measurement);
bucketAttributes.put(METRIC_TYPE, measurement.getType());
bucketAttributes.put(INSTRUMENT_TYPE_ATTRIBUTE.getName(), InstrumentType.GAUGE.toLowerCase());
TimeSeriesIngestionPipeline ingestionPipeline = this.timeSeries.getIngestionPipeline();
Expand All @@ -111,27 +179,33 @@ private void processThreadGroupAsMeasurement(Measurement measurement) {
* full distribution for percentile queries. Empty intervals (count == 0) are skipped.
*/
@Override
public void processMetrics(List<ExecutionMetricSample> metrics) {
metrics.forEach(this::processMetric);
public void processMetrics(ExecutionContext executionContext, List<ExecutionMetricSample> metrics) {
metrics.forEach(mm -> processMetric(executionContext, mm));
}

@Override
public void processControllerMetrics(List<ControllerMetricSample> metrics) {
metrics.forEach(this::processMetric);
// Controller metrics are not bound to an execution and carry no user-defined labels.
metrics.forEach(mm -> processMetric(null, mm));
}

/** Re-ingestion entry point (no execution context, no cardinality safeguard). */
public void processMetric(StepMetricSample mm) {
processMetric(null, mm);
}

public void processMetric(ExecutionContext executionContext, StepMetricSample mm) {
MetricSample sample = mm.sample;
long begin = sample.getSampleTime();
BucketAttributes attributes = metricSampleToBucketAttributes(mm);
BucketAttributes attributes = metricSampleToBucketAttributes(executionContext, mm);
String instrumentType = sample.getType().toLowerCase();
attributes.put(METRIC_TYPE, mm.metricType != null ? mm.metricType : instrumentType);
attributes.put(INSTRUMENT_TYPE_ATTRIBUTE.getName(), instrumentType);
TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.getIngestionPipeline();
ingestionPipeline.ingestBucket(buildMetricBucket(attributes, begin, sample));
}

private BucketAttributes metricSampleToBucketAttributes(StepMetricSample mm) {
private BucketAttributes metricSampleToBucketAttributes(ExecutionContext executionContext, StepMetricSample mm) {
Map<String, Object> attributesMap = new HashMap<>();
Map<String, String> effectiveLabels = mm.getEffectiveLabels();
if (handledAttributes.isEmpty()) {
Expand All @@ -146,9 +220,88 @@ private BucketAttributes metricSampleToBucketAttributes(StepMetricSample mm) {
}
// Metric name always present under "name" for consistent time-series grouping
attributesMap.put("name", mm.sample.getName());
// Apply the cardinality safeguard to the keyword-developer-defined labels only (sample.getLabels()),
// not to the context labels merged in by getEffectiveLabels(). Only execution metrics are tracked.
if (mm instanceof ExecutionMetricSample) {
ExecutionMetricSample ems = (ExecutionMetricSample) mm;
Set<String> userLabelKeys = (mm.sample.getLabels() != null) ? mm.sample.getLabels().keySet() : null;
applyLabelQuota(executionContext, ems.eId, mm.sample.getName(), userLabelKeys, attributesMap);
}
return new BucketAttributes(attributesMap);
}

/**
* Enforces the per-execution unique-value quota on the given user-defined label keys, mutating
* {@code attributes} in place. Values beyond the quota are replaced with {@link #QUOTA_EXCEEDED_VALUE}.
* No-op when the safeguard is disabled, when there is no execution scope, or when there are no
* user-defined labels to police.
*/
private void applyLabelQuota(ExecutionContext executionContext, String execId, String metricName,
Set<String> userLabelKeys, Map<String, Object> attributes) {
if (maxUniqueLabelValues <= 0 || execId == null || execId.isEmpty()
|| userLabelKeys == null || userLabelKeys.isEmpty()) {
return;
}
String name = (metricName != null) ? metricName : "";
for (String labelName : userLabelKeys) {
Object value = attributes.get(labelName);
if (value == null) {
continue;
}
String masked = maskIfOverQuota(executionContext, execId, name, labelName, String.valueOf(value));
attributes.put(labelName, masked);
}
}

/**
* Atomically records the value for the given (execution, metric, label) tuple and decides whether it
* passes through or must be masked:
* <ul>
* <li>known value, or new value while under quota → returned unchanged (and recorded);</li>
* <li>new value once the quota is reached → returned as {@link #QUOTA_EXCEEDED_VALUE}, and a warning
* is raised once per label.</li>
* </ul>
*/
private String maskIfOverQuota(ExecutionContext executionContext, String execId, String metricName,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me time to understand that value means labelValue and not the value of the metric itself

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored

String labelName, String value) {
Set<String> values = labelValueTracking
.computeIfAbsent(execId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(metricName, k -> new ConcurrentHashMap<>())
.computeIfAbsent(labelName, k -> ConcurrentHashMap.newKeySet());
if (values.contains(value)) {
return value;
}
// Serialize the size-check-and-add so concurrent live ingestion can't overshoot the quota.
synchronized (values) {
if (values.contains(value)) {
return value;
}
if (values.size() < maxUniqueLabelValues) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do the check to ensure that the number of label values per label remains under the defined max. Don't we want to also ensure that we don't exceed the number of labels?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added now also a quota on the number of labels per exec id and measurement/metric name

values.add(value);
return value;
}
}
raiseQuotaWarningOnce(executionContext, execId, metricName, labelName);
return QUOTA_EXCEEDED_VALUE;
}

private void raiseQuotaWarningOnce(ExecutionContext executionContext, String execId, String metricName, String labelName) {
Set<String> warned = warnedLabels.computeIfAbsent(execId, k -> ConcurrentHashMap.newKeySet());
if (!warned.add(metricName + '\0' + labelName)) {
return;
}
logger.warn("Execution {}: high cardinality detected on custom metric label [{}] for metric [{}]. " +
"Unique values exceeding the quota of {} have been dismissed (replaced with \"{}\").",
execId, labelName, metricName, maxUniqueLabelValues, QUOTA_EXCEEDED_VALUE);
if (executionNoticeManager != null) {
ExecutionNotice notice = new ExecutionNotice(CARDINALITY_NOTICE_TYPE_ID, Map.of(
"labelName", labelName,
"metricName", metricName,
"quota", String.valueOf(maxUniqueLabelValues)));
executionNoticeManager.raiseNotice(executionContext, notice);
}
}

private static Bucket buildMetricBucket(BucketAttributes attributes, long begin, MetricSample sample) {
Bucket bucket = new Bucket();
bucket.setBegin(begin);
Expand Down Expand Up @@ -192,4 +345,14 @@ public Set<String> getExcludedAttributes() {
public void flush() {
this.timeSeries.getIngestionPipeline().flush();
}

@Override
public void afterExecutionEnd(ExecutionContext context) {
// Explicitly release the per-execution cardinality tracking state as soon as the execution ends.
if (context != null) {
String executionId = context.getExecutionId();
labelValueTracking.remove(executionId);
warnedLabels.remove(executionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void counter_ingestsAccumulatedDiffAsPoint() {

ExecutionMetricSample mm = buildMetricMeasurement(snapshot);

handler.processMetrics(List.of(mm));
handler.processMetrics(null, List.of(mm));
handler.flush();

List<Bucket> buckets = allBuckets();
Expand All @@ -75,7 +75,7 @@ public void gauge_ingestsFullBucket() {
3, 57, 15, 42, 42, null);
ExecutionMetricSample mm = buildMetricMeasurement(snapshot);

handler.processMetrics(List.of(mm));
handler.processMetrics(null, List.of(mm));
handler.flush();

List<Bucket> buckets = allBuckets();
Expand All @@ -100,7 +100,7 @@ public void histogram_ingestsFullBucketWithDistribution() {
2, 300, 100, 200, 200, dist);
ExecutionMetricSample mm = buildMetricMeasurement(snapshot);

handler.processMetrics(List.of(mm));
handler.processMetrics(null, List.of(mm));
handler.flush();

List<Bucket> buckets = allBuckets();
Expand All @@ -126,7 +126,7 @@ public void onlyHandledAttributesAreIncludedInBucket() {
1, 80, 80, 80, 80, null);
ExecutionMetricSample mm = buildMetricMeasurement(snapshot);

handler.processMetrics(List.of(mm));
handler.processMetrics(null, List.of(mm));
handler.flush();

List<Bucket> buckets = allBuckets();
Expand Down
Loading