From 34fde9e02e073784729a3f67f8535aa8df712068 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 3 Mar 2026 16:37:27 +0100 Subject: [PATCH 01/17] Add pipeline and global metrics via OTLP --- config/logstash.yml | 27 ++ logstash-core/build.gradle | 8 + logstash-core/lib/logstash/agent.rb | 2 +- logstash-core/lib/logstash/environment.rb | 6 + .../instrument/periodic_poller/otel.rb | 344 ++++++++++++++++++ .../logstash/instrument/periodic_pollers.rb | 20 +- .../metrics/otel/OtelMetricsService.java | 311 ++++++++++++++++ 7 files changed, 715 insertions(+), 3 deletions(-) create mode 100644 logstash-core/lib/logstash/instrument/periodic_poller/otel.rb create mode 100644 logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java diff --git a/config/logstash.yml b/config/logstash.yml index c58843168ca..f1fcb0eda41 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -282,6 +282,33 @@ # # path.dead_letter_queue: # +# ------------ OpenTelemetry Metrics Settings -------------- +# Export Logstash metrics to an OpenTelemetry-compatible backend via OTLP. +# This allows you to send Logstash metrics to any OTLP-compatible collector +# or observability platform (e.g., Jaeger, Prometheus via OTel Collector, Grafana Cloud). +# +# Enable OpenTelemetry metrics export (default: false) +# +# otel.metrics.enabled: false +# +# OTLP endpoint URL. For gRPC, typically port 4317. For HTTP, typically port 4318. +# +# otel.metrics.endpoint: "http://localhost:4317" +# +# Export interval in seconds (default: 10) +# +# otel.metrics.interval: 10 +# +# Protocol to use for OTLP export: "grpc" (default) or "http" +# +# otel.metrics.protocol: "grpc" +# +# Additional resource attributes as comma-separated key=value pairs. +# These are attached to all exported metrics to identify this Logstash instance. +# Example: "environment=production,cluster=us-west,team=platform" +# +# otel.resource.attributes: +# # ------------ Debugging Settings -------------- # # Options for log.level: diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 76140a9d1f2..b7284110ca1 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -259,4 +259,12 @@ dependencies { api group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14' api group: 'commons-codec', name: 'commons-codec', version: '1.17.0' api group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.16' + + // OpenTelemetry SDK for metrics export + def otelVersion = '1.59.0' + implementation platform("io.opentelemetry:opentelemetry-bom:${otelVersion}") + implementation 'io.opentelemetry:opentelemetry-api' + implementation 'io.opentelemetry:opentelemetry-sdk' + implementation 'io.opentelemetry:opentelemetry-sdk-metrics' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp' } diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index ab854e8974b..a6637c3b6e5 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -518,7 +518,7 @@ def configure_metrics_collectors LogStash::Instrument::NullMetric.new(@collector) end - @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric, settings.get("queue.type"), self) + @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric, settings, self) @periodic_pollers.start end diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 343202177cc..9181a76152f 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -111,6 +111,12 @@ def self.as_java_range(r) Setting::StringSetting.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on Setting::NullableStringSetting.new("monitoring.cluster_uuid"), Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]) + # OpenTelemetry metrics export settings + Setting::BooleanSetting.new("otel.metrics.enabled", false), + Setting::StringSetting.new("otel.metrics.endpoint", "http://localhost:4317"), + Setting::NumericSetting.new("otel.metrics.interval", 10), # seconds + Setting::StringSetting.new("otel.metrics.protocol", "grpc", true, ["grpc", "http"]), + Setting::NullableStringSetting.new("otel.resource.attributes", nil, false) # key=value,key2=value2 format # post_process ].each {|setting| SETTINGS.register(setting) } diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb new file mode 100644 index 00000000000..7ddba780b5c --- /dev/null +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -0,0 +1,344 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/util/loggable" + +java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' +java_import 'io.opentelemetry.api.common.Attributes' +java_import 'io.opentelemetry.api.common.AttributeKey' + +module LogStash module Instrument module PeriodicPoller + # Exports Logstash metrics to an OpenTelemetry-compatible backend via OTLP. + # + # This class does NOT extend Base because: + # - All metrics use observable callbacks that the OTel SDK invokes at export time + # - The OTel SDK handles its own export timing via PeriodicMetricReader + # - No periodic polling/TimerTask is needed from the Ruby side + # + # Configuration in logstash.yml: + # otel.metrics.enabled: true + # otel.metrics.endpoint: "http://localhost:4317" + # otel.metrics.interval: 10 + # otel.metrics.protocol: "grpc" + # otel.resource.attributes: "environment=production,cluster=us-west" + # + class Otel + include LogStash::Util::Loggable + + def initialize(metric, agent, settings) + @agent = agent + @settings = settings + @metric_store = agent.metric.collector + + # Initialize the OTel service - SDK handles its own export timing + @otel_service = OtelMetricsService.new( + settings.get("otel.metrics.endpoint"), + agent.id, + agent.name, + settings.get("otel.metrics.interval"), + settings.get("otel.metrics.protocol"), + settings.get("otel.resource.attributes") + ) + + # Register all metrics with callbacks - SDK invokes them at export time + register_global_metrics + register_pipeline_metrics + register_cgroup_metrics + + logger.info("OpenTelemetry metrics poller initialized", + :endpoint => settings.get("otel.metrics.endpoint"), + :interval => settings.get("otel.metrics.interval")) + end + + def start + # No-op: OTel SDK handles export timing via PeriodicMetricReader + logger.debug("OpenTelemetry metrics exporter active") + end + + def stop + logger.info("Stopping OpenTelemetry metrics poller") + @otel_service.flush + @otel_service.shutdown + end + + private + + # Register Pipeline metrics from pipeline.rb + def register_pipeline_metrics + # Queue gauge (total across all pipelines) + register_gauge("logstash.queue.events", "Total events in queues", "{event}") do + get_total_queue_events + end + + # Per-pipeline metrics + register_pipeline_counters + register_pipeline_gauges + register_dlq_metrics + end + + # Register Dead Letter Queue metrics + def register_dlq_metrics + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + attrs = create_pipeline_attributes(pipeline_id) + + register_gauge( + "logstash.pipeline.dlq.queue_size", + "Current dead letter queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.dlq.max_queue_size", + "Maximum dead letter queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :max_queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.dlq.dropped_events", + "Events dropped when DLQ is full", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :dropped_events) + end + + register_gauge( + "logstash.pipeline.dlq.expired_events", + "Events expired and removed from DLQ", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :expired_events) + end + end + end + + # Register cgroup metrics + def register_cgroup_metrics + # Reports the total CPU time consumed by all tasks in this cgroup (including tasks lower in the hierarchy) + # - observable counter (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpuacct.usage", "Total CPU time consumed", "ns") do + get_metric_value(:os, :cgroup, :cpuacct, :usage_nanos) + end + + # A period of time in microseconds for how regularly a cgroup's access to CPU resources should be + # reallocated - gauges (can change at runtime) + register_gauge("logstash.os.cgroup.cpu.cfs_period", "CFS scheduling period", "us") do + get_metric_value(:os, :cgroup, :cpu, :cfs_period_micros) + end + + # Total amount of time in microseconds for which all tasks in a cgroup can run during one period + # - gauges (can change at runtime) + register_gauge("logstash.os.cgroup.cpu.cfs_quota", "CFS scheduling quota", "us") do + get_metric_value(:os, :cgroup, :cpu, :cfs_quota_micros) + end + + # Number of period intervals that have elapsed - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.elapsed_periods", "Number of elapsed CFS periods", "{period}") do + get_metric_value(:os, :cgroup, :cpu, :stat, :number_of_elapsed_periods) + end + + # Number of times the tasks in this cgroup were throttled - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.nr_times_throttled", "Number of times throttled", "{occurrence}") do + get_metric_value(:os, :cgroup, :cpu, :stat, :number_of_times_throttled) + end + + # Total time in nanoseconds for which tasks in this cgroup were throttled - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.time_throttled", "Total time throttled", "ns") do + get_metric_value(:os, :cgroup, :cpu, :stat, :time_throttled_nanos) + end + end + + def register_pipeline_gauges + # These will be registered for each running pipeline + # TODO: Handle dynamic pipeline add/remove + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + attrs = create_pipeline_attributes(pipeline_id) + + register_gauge( + "logstash.pipeline.queue.events", + "Events in pipeline queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :events) + end + + # Persistent queue capacity metrics + register_gauge( + "logstash.pipeline.queue.capacity.page_capacity", + "Size of each queue page", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :page_capacity_in_bytes) + end + + register_gauge( + "logstash.pipeline.queue.capacity.max_size", + "Maximum queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.queue.capacity.max_unread_events", + "Maximum unread events allowed in queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_unread_events) + end + + register_gauge( + "logstash.pipeline.queue.capacity.size", + "Current persisted queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :queue_size_in_bytes) + end + + # Persistent queue data/storage metrics + register_gauge( + "logstash.pipeline.queue.data.free_space", + "Free disk space where queue is stored", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :data, :free_space_in_bytes) + end + end + end + + # Register observable counters - SDK computes deltas from cumulative values + def register_global_metrics + # Global event counters + register_observable_counter("logstash.events.in", "Total events received", "{event}") do + get_metric_value(:stats, :events, :in) + end + + register_observable_counter("logstash.events.out", "Total events output", "{event}") do + get_metric_value(:stats, :events, :out) + end + + register_observable_counter("logstash.events.filtered", "Total events filtered", "{event}") do + get_metric_value(:stats, :events, :filtered) + end + end + + def register_pipeline_counters + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + attrs = create_pipeline_attributes(pipeline_id) + + register_observable_counter( + "logstash.pipeline.events.in", + "Events received by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :in) + end + + register_observable_counter( + "logstash.pipeline.events.out", + "Events output by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :out) + end + + register_observable_counter( + "logstash.pipeline.events.filtered", + "Events filtered by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :filtered) + end + end + end + + def register_gauge(name, description, unit, attributes = Attributes.empty, &block) + supplier = -> { + begin + value = block.call + value.nil? ? nil : value.to_java(:long) + rescue => e + logger.debug("Error getting gauge value for #{name}", :error => e.message) + nil + end + } + @otel_service.registerGauge(name, description, unit, supplier, attributes) + end + + def register_observable_counter(name, description, unit, attributes = Attributes.empty, &block) + supplier = -> { + begin + value = block.call + value.nil? ? nil : value.to_java(:long) + rescue => e + logger.debug("Error getting observable counter value for #{name}", :error => e.message) + nil + end + } + + @otel_service.registerObservableCounter(name, description, unit, supplier, attributes) + end + + # Helper to get metric values from the store + def get_metric_value(*path) + snapshot = @metric_store.snapshot_metric + store = snapshot.metric_store + + result = store.get_shallow(*path) + result.is_a?(Hash) ? nil : result&.value + rescue LogStash::Instrument::MetricStore::MetricNotFound + nil + end + + def get_pipeline_metric_value(pipeline_id, *path) + full_path = [:stats, :pipelines, pipeline_id.to_sym] + path + get_metric_value(*full_path) + end + + def get_total_queue_events + total = 0 + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, pipeline| + next if pipeline.system? + queue_events = get_pipeline_metric_value(pipeline_id, :queue, :events) + total += queue_events if queue_events + end + total + end + + def create_pipeline_attributes(pipeline_id) + Attributes.of( + AttributeKey.stringKey("pipeline.id"), pipeline_id.to_s + ) + end + end +end; end; end \ No newline at end of file diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index 30c731495d9..a05e09f0f01 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -29,14 +29,30 @@ module LogStash module Instrument class PeriodicPollers attr_reader :metric - def initialize(metric, queue_type, agent) + def initialize(metric, settings, agent) @metric = metric + @settings = settings || LogStash::SETTINGS @periodic_pollers = [PeriodicPoller::Os.new(metric), PeriodicPoller::JVM.new(metric), - PeriodicPoller::PersistentQueue.new(metric, queue_type, agent), + PeriodicPoller::PersistentQueue.new(metric, @settings.get("queue.type"), agent), PeriodicPoller::DeadLetterQueue.new(metric, agent), PeriodicPoller::FlowRate.new(metric, agent), PeriodicPoller::BatchStructure.new(metric, agent)] + PeriodicPoller::FlowRate.new(metric, agent)] + + # Add OpenTelemetry metrics exporter if enabled + if otel_metrics_enabled? + require "logstash/instrument/periodic_poller/otel" + @periodic_pollers << PeriodicPoller::Otel.new(metric, agent, @settings) + logger.info("OpenTelemetry metrics export enabled") + end + end + + def otel_metrics_enabled? + @settings.get("otel.metrics.enabled") + rescue => e + logger.debug("Could not read otel.metrics.enabled setting", :error => e.message) + false end def start diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java new file mode 100644 index 00000000000..1028f85c2ce --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -0,0 +1,311 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics.otel; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Service that manages OpenTelemetry metrics export for Logstash. + * + * This service: + * - Creates and configures the OTel SDK MeterProvider + * - Provides methods to create metrics instruments (counters, gauges) + * - Manages the lifecycle of the OTel exporter + * + * Usage from Ruby: + * java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' + * service = OtelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) + * service.registerGauge("metric.name", "description", "unit", -> { get_value() }, Attributes.empty) + */ +public class OtelMetricsService { + + private static final Logger LOGGER = LogManager.getLogger(OtelMetricsService.class); + + private final SdkMeterProvider meterProvider; + private final Meter meter; + private final Map counters = new ConcurrentHashMap<>(); + private final Map gauges = new ConcurrentHashMap<>(); + private final Map observableCounters = new ConcurrentHashMap<>(); + private volatile boolean running = false; + + /** + * Creates a new OTel metrics service. + * + * @param endpoint OTLP endpoint (e.g., "http://localhost:4317") + * @param nodeId Logstash node ID + * @param nodeName Logstash node name + * @param intervalSeconds Export interval in seconds + * @param protocol "grpc" or "http" + * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) + */ + public OtelMetricsService(String endpoint, String nodeId, String nodeName, + int intervalSeconds, String protocol, String resourceAttributes) { + LOGGER.info("Initializing OpenTelemetry metrics export to {} (protocol: {}, interval: {}s)", + endpoint, protocol, intervalSeconds); + + // Build resource attributes + AttributesBuilder resourceAttrsBuilder = Attributes.builder() + .put(AttributeKey.stringKey("service.name"), "logstash") + .put(AttributeKey.stringKey("service.instance.id"), nodeId) + .put(AttributeKey.stringKey("host.name"), nodeName); + + // Parse additional resource attributes if provided + if (resourceAttributes != null && !resourceAttributes.isEmpty()) { + parseResourceAttributes(resourceAttributes, resourceAttrsBuilder); + } + + Resource resource = Resource.getDefault().merge( + Resource.create(resourceAttrsBuilder.build()) + ); + + // Create the appropriate exporter based on protocol + MetricExporter exporter = createExporter(endpoint, protocol); + + // Create periodic reader + PeriodicMetricReader metricReader = PeriodicMetricReader.builder(exporter) + .setInterval(Duration.ofSeconds(intervalSeconds)) + .build(); + + // Create meter provider + this.meterProvider = SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader(metricReader) + .build(); + + // Get meter for creating instruments + this.meter = meterProvider.get("logstash"); + this.running = true; + + LOGGER.info("OpenTelemetry metrics service initialized successfully"); + } + + private MetricExporter createExporter(String endpoint, String protocol) { + if ("http".equalsIgnoreCase(protocol)) { + return OtlpHttpMetricExporter.builder() + .setEndpoint(endpoint + "/v1/metrics") + .setTimeout(Duration.ofSeconds(10)) + .build(); + } else { + // Default to gRPC + return OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(Duration.ofSeconds(10)) + .build(); + } + } + + private void parseResourceAttributes(String attributes, AttributesBuilder builder) { + for (String pair : attributes.split(",")) { + String[] keyValue = pair.trim().split("=", 2); + if (keyValue.length == 2) { + builder.put(AttributeKey.stringKey(keyValue[0].trim()), keyValue[1].trim()); + } + } + } + + /** + * Gets or creates a Counter instrument. + * Counters are for monotonically increasing values (events processed, bytes sent, etc.) + * + * @param name Metric name (e.g., "logstash.events.in") + * @param description Human-readable description + * @param unit Unit of measurement (e.g., "{events}", "By") + * @return The counter instrument + */ + public LongCounter getOrCreateCounter(String name, String description, String unit) { + return counters.computeIfAbsent(name, n -> + meter.counterBuilder(n) + .setDescription(description) + .setUnit(unit) + .build() + ); + } + + /** + * Increments a counter by the specified amount. + * + * @param name Metric name + * @param amount Amount to increment by + * @param attributes Attributes/labels for this measurement + */ + public void incrementCounter(String name, long amount, Attributes attributes) { + LongCounter counter = counters.get(name); + if (counter != null && amount > 0) { + counter.add(amount, attributes); + } + } + + /** + * Registers an observable gauge with a callback. + * The callback is invoked by the SDK when metrics are exported. + * + * @param name Metric name + * @param description Human-readable description + * @param unit Unit of measurement + * @param valueSupplier Callback that returns the current value + * @param attributes Attributes/labels for this gauge + */ + public void registerGauge(String name, String description, String unit, + Supplier valueSupplier, Attributes attributes) { + ObservableLongGauge gauge = meter.gaugeBuilder(name) + .setDescription(description) + .setUnit(unit) + .ofLongs() + .buildWithCallback(measurement -> { + try { + Long value = valueSupplier.get(); + if (value != null) { + measurement.record(value, attributes); + } + } catch (Exception e) { + LOGGER.debug("Error collecting gauge {}: {}", name, e.getMessage()); + } + }); + gauges.put(name, gauge); + } + + /** + * Registers an observable counter with a callback. + * Observable counters are for monotonically increasing values where you observe + * the cumulative total (e.g., CPU time consumed, total bytes read from disk). + * The SDK automatically computes deltas between observations. + * + * @param name Metric name + * @param description Human-readable description + * @param unit Unit of measurement + * @param valueSupplier Callback that returns the current cumulative value + * @param attributes Attributes/labels for this counter + */ + public void registerObservableCounter(String name, String description, String unit, + Supplier valueSupplier, Attributes attributes) { + ObservableLongCounter counter = meter.counterBuilder(name) + .setDescription(description) + .setUnit(unit) + .buildWithCallback(measurement -> { + try { + Long value = valueSupplier.get(); + if (value != null && value >= 0) { + measurement.record(value, attributes); + } + } catch (Exception e) { + LOGGER.debug("Error collecting observable counter {}: {}", name, e.getMessage()); + } + }); + observableCounters.put(name, counter); + } + + /** + * Registers an observable double gauge with a callback. + * + * @param name Metric name + * @param description Human-readable description + * @param unit Unit of measurement + * @param valueSupplier Callback that returns the current value + * @param attributes Attributes/labels for this gauge + */ + public void registerDoubleGauge(String name, String description, String unit, + Supplier valueSupplier, Attributes attributes) { + meter.gaugeBuilder(name) + .setDescription(description) + .setUnit(unit) + .buildWithCallback(measurement -> { + try { + Double value = valueSupplier.get(); + if (value != null && !value.isNaN()) { + measurement.record(value, attributes); + } + } catch (Exception e) { + LOGGER.debug("Error collecting gauge {}: {}", name, e.getMessage()); + } + }); + } + + /** + * Creates Attributes from a map of key-value pairs. + * Convenience method for Ruby callers. + * + * @param attributeMap Map of attribute names to values + * @return Attributes object + */ + public static Attributes createAttributes(Map attributeMap) { + if (attributeMap == null || attributeMap.isEmpty()) { + return Attributes.empty(); + } + AttributesBuilder builder = Attributes.builder(); + attributeMap.forEach((key, value) -> builder.put(AttributeKey.stringKey(key), value)); + return builder.build(); + } + + /** + * Gets the raw Meter for advanced use cases. + */ + public Meter getMeter() { + return meter; + } + + /** + * Returns whether the service is running. + */ + public boolean isRunning() { + return running; + } + + /** + * Forces an immediate flush of pending metrics. + */ + public void flush() { + if (meterProvider != null) { + meterProvider.forceFlush(); + } + } + + /** + * Shuts down the service and releases resources. + * Flushes any pending metrics before closing. + */ + public void shutdown() { + LOGGER.info("Shutting down OpenTelemetry metrics service"); + running = false; + if (meterProvider != null) { + meterProvider.close(); + } + LOGGER.info("OpenTelemetry metrics service shut down"); + } +} From 1f25ff5850ea7266de5b2974fb57386e8803ab17 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 6 Mar 2026 14:39:44 +0100 Subject: [PATCH 02/17] Use metric store to get metrics, refresh snapshot on collect --- .../instrument/periodic_poller/otel.rb | 71 +++++++++++++++---- .../logstash/instrument/periodic_pollers.rb | 2 +- .../metrics/otel/OtelMetricsService.java | 10 +-- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index 7ddba780b5c..e4ab5b0f8bc 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -16,6 +16,7 @@ # under the License. require "logstash/util/loggable" +require "logstash/instrument/periodic_poller/cgroup" java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' java_import 'io.opentelemetry.api.common.Attributes' @@ -24,10 +25,9 @@ module LogStash module Instrument module PeriodicPoller # Exports Logstash metrics to an OpenTelemetry-compatible backend via OTLP. # - # This class does NOT extend Base because: - # - All metrics use observable callbacks that the OTel SDK invokes at export time - # - The OTel SDK handles its own export timing via PeriodicMetricReader - # - No periodic polling/TimerTask is needed from the Ruby side + # This class extends Base to use the periodic polling mechanism for collecting + # metrics into the metric store. The OTel SDK callbacks then read from the + # refreshed snapshot during export. # # Configuration in logstash.yml: # otel.metrics.enabled: true @@ -36,8 +36,7 @@ module LogStash module Instrument module PeriodicPoller # otel.metrics.protocol: "grpc" # otel.resource.attributes: "environment=production,cluster=us-west" # - class Otel - include LogStash::Util::Loggable + class OTel < Base def initialize(metric, agent, settings) @agent = agent @@ -54,6 +53,12 @@ def initialize(metric, agent, settings) settings.get("otel.resource.attributes") ) + # Call Base initializer - sets up @metric and configures the TimerTask + super(metric, :polling_interval => settings.get("otel.metrics.interval")) + + # Take initial snapshot + @snapshot = @metric_store.snapshot_metric + # Register all metrics with callbacks - SDK invokes them at export time register_global_metrics register_pipeline_metrics @@ -64,19 +69,60 @@ def initialize(metric, agent, settings) :interval => settings.get("otel.metrics.interval")) end - def start - # No-op: OTel SDK handles export timing via PeriodicMetricReader - logger.debug("OpenTelemetry metrics exporter active") - end - def stop logger.info("Stopping OpenTelemetry metrics poller") + super @otel_service.flush @otel_service.shutdown end + def collect + collect_cgroup_metrics + collect_pipeline_metrics + collect_dlq_metrics + # Refresh snapshot after collecting metrics so OTel callbacks read fresh data + @snapshot = @metric_store.snapshot_metric + end + private + def collect_cgroup_metrics + if stats = Cgroup.get + save_metric([:os], :cgroup, stats) + end + end + + # Recursive function to save cgroup metrics to the metric store + def save_metric(namespace, k, v) + if v.is_a?(Hash) + v.each do |new_key, new_value| + n = namespace.dup + n << k.to_sym + save_metric(n, new_key, new_value) + end + else + metric.gauge(namespace, k.to_sym, v) + end + end + + def collect_dlq_metrics + pipelines = @agent.running_user_defined_pipelines + pipelines.each do |_, pipeline| + unless pipeline.nil? + pipeline.collect_dlq_stats + end + end + end + + def collect_pipeline_metrics + pipelines = @agent.running_user_defined_pipelines + pipelines.each do |_, pipeline| + unless pipeline.nil? + pipeline.collect_stats + end + end + end + # Register Pipeline metrics from pipeline.rb def register_pipeline_metrics # Queue gauge (total across all pipelines) @@ -311,8 +357,7 @@ def register_observable_counter(name, description, unit, attributes = Attributes # Helper to get metric values from the store def get_metric_value(*path) - snapshot = @metric_store.snapshot_metric - store = snapshot.metric_store + store = @snapshot.metric_store result = store.get_shallow(*path) result.is_a?(Hash) ? nil : result&.value diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index a05e09f0f01..01722c4e25e 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -43,7 +43,7 @@ def initialize(metric, settings, agent) # Add OpenTelemetry metrics exporter if enabled if otel_metrics_enabled? require "logstash/instrument/periodic_poller/otel" - @periodic_pollers << PeriodicPoller::Otel.new(metric, agent, @settings) + @periodic_pollers << PeriodicPoller::OTel.new(metric, agent, @settings) logger.info("OpenTelemetry metrics export enabled") end end diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java index 1028f85c2ce..c5ddd9d0969 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -50,13 +50,13 @@ * - Manages the lifecycle of the OTel exporter * * Usage from Ruby: - * java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' - * service = OtelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) + * java_import 'org.logstash.instrument.metrics.otel.OTelMetricsService' + * service = OTelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) * service.registerGauge("metric.name", "description", "unit", -> { get_value() }, Attributes.empty) */ -public class OtelMetricsService { +public class OTelMetricsService { - private static final Logger LOGGER = LogManager.getLogger(OtelMetricsService.class); + private static final Logger LOGGER = LogManager.getLogger(OTelMetricsService.class); private final SdkMeterProvider meterProvider; private final Meter meter; @@ -75,7 +75,7 @@ public class OtelMetricsService { * @param protocol "grpc" or "http" * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) */ - public OtelMetricsService(String endpoint, String nodeId, String nodeName, + public OTelMetricsService(String endpoint, String nodeId, String nodeName, int intervalSeconds, String protocol, String resourceAttributes) { LOGGER.info("Initializing OpenTelemetry metrics export to {} (protocol: {}, interval: {}s)", endpoint, protocol, intervalSeconds); From ab44dd1b32695adeda8b41206c07037c7dd0325a Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 9 Mar 2026 10:47:01 +0100 Subject: [PATCH 03/17] Use class methods on Os to collect CGroup metrics --- .../logstash/instrument/periodic_poller/os.rb | 29 ++++++++++--------- .../instrument/periodic_poller/otel.rb | 23 +++------------ 2 files changed, 19 insertions(+), 33 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/os.rb b/logstash-core/lib/logstash/instrument/periodic_poller/os.rb index 64fa87a13a2..f6c827ed078 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/os.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/os.rb @@ -25,25 +25,26 @@ def initialize(metric, options = {}) end def collect - collect_cgroup + self.class.collect_cgroup(metric) end - def collect_cgroup - if stats = Cgroup.get - save_metric([:os], :cgroup, stats) + class << self + def collect_cgroup(metric) + if stats = Cgroup.get + save_metric(metric, [:os], :cgroup, stats) + end end - end - # Recursive function to create the Cgroups values form the created hash - def save_metric(namespace, k, v) - if v.is_a?(Hash) - v.each do |new_key, new_value| - n = namespace.dup - n << k.to_sym - save_metric(n, new_key, new_value) + def save_metric(metric, namespace, k, v) + if v.is_a?(Hash) + v.each do |new_key, new_value| + n = namespace.dup + n << k.to_sym + save_metric(metric, n, new_key, new_value) + end + else + metric.gauge(namespace, k.to_sym, v) end - else - metric.gauge(namespace, k.to_sym, v) end end end diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index e4ab5b0f8bc..f131d0711d6 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -16,9 +16,9 @@ # under the License. require "logstash/util/loggable" -require "logstash/instrument/periodic_poller/cgroup" +require "logstash/instrument/periodic_poller/os" -java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' +java_import 'org.logstash.instrument.metrics.otel.OTelMetricsService' java_import 'io.opentelemetry.api.common.Attributes' java_import 'io.opentelemetry.api.common.AttributeKey' @@ -44,7 +44,7 @@ def initialize(metric, agent, settings) @metric_store = agent.metric.collector # Initialize the OTel service - SDK handles its own export timing - @otel_service = OtelMetricsService.new( + @otel_service = OTelMetricsService.new( settings.get("otel.metrics.endpoint"), agent.id, agent.name, @@ -87,22 +87,7 @@ def collect private def collect_cgroup_metrics - if stats = Cgroup.get - save_metric([:os], :cgroup, stats) - end - end - - # Recursive function to save cgroup metrics to the metric store - def save_metric(namespace, k, v) - if v.is_a?(Hash) - v.each do |new_key, new_value| - n = namespace.dup - n << k.to_sym - save_metric(n, new_key, new_value) - end - else - metric.gauge(namespace, k.to_sym, v) - end + Os.collect_cgroup(@metric) end def collect_dlq_metrics From 9e8190a4eb1f77bd4f192ff01c2918e7c3446d84 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 9 Mar 2026 11:45:31 +0100 Subject: [PATCH 04/17] Capture flow metrics also --- logstash-core/lib/logstash/instrument/periodic_poller/otel.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index f131d0711d6..a52a3ed5893 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -80,6 +80,7 @@ def collect collect_cgroup_metrics collect_pipeline_metrics collect_dlq_metrics + @agent.capture_flow_metrics # Refresh snapshot after collecting metrics so OTel callbacks read fresh data @snapshot = @metric_store.snapshot_metric end From c340fb84ea710f68febaed67114236d5fd3a43bb Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 9 Mar 2026 11:45:43 +0100 Subject: [PATCH 05/17] Add tests --- .../instrument/periodic_poller/otel_spec.rb | 312 ++++++++++++++++++ 1 file changed, 312 insertions(+) create mode 100644 logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb new file mode 100644 index 00000000000..4e183f936d7 --- /dev/null +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -0,0 +1,312 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require "logstash/instrument/periodic_poller/otel" +require "logstash/instrument/collector" + +describe LogStash::Instrument::PeriodicPoller::OTel do + let(:collector) { LogStash::Instrument::Collector.new } + let(:metric) { LogStash::Instrument::Metric.new(collector) } + let(:agent_metric) { double("agent_metric", :collector => collector) } + + let(:pipeline_id) { :main } + let(:pipeline) do + double("pipeline", + :system? => false, + :collect_stats => nil, + :collect_dlq_stats => nil + ) + end + let(:pipelines_registry) do + double("pipelines_registry", + :running_pipelines => { pipeline_id => pipeline } + ) + end + + let(:agent) do + double("agent", + :id => "test-node-id", + :name => "test-node-name", + :metric => agent_metric, + :pipelines_registry => pipelines_registry, + :running_user_defined_pipelines => { pipeline_id => pipeline }, + :capture_flow_metrics => nil + ) + end + + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("http://localhost:4317") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("grpc") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + end + end + + let(:otel_service) do + double("otel_service", + :registerGauge => nil, + :registerObservableCounter => nil, + :flush => nil, + :shutdown => nil + ) + end + + before do + allow(OTelMetricsService).to receive(:new).and_return(otel_service) + end + + subject(:otel_poller) { described_class.new(metric, agent, settings) } + + describe "#initialize" do + it "should initialize cleanly" do + expect { otel_poller }.not_to raise_error + end + + it "creates an OTelMetricsService with correct parameters" do + expect(OTelMetricsService).to receive(:new).with( + "http://localhost:4317", + "test-node-id", + "test-node-name", + 10, + "grpc", + nil + ).and_return(otel_service) + + otel_poller + end + + it "registers global metrics" do + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.in", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.out", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.filtered", anything, anything, anything, anything + ) + + otel_poller + end + + it "registers cgroup metrics" do + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpuacct.usage", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.os.cgroup.cpu.cfs_period", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.os.cgroup.cpu.cfs_quota", anything, anything, anything, anything + ) + + otel_poller + end + + it "registers pipeline metrics for each running pipeline" do + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.in", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.out", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.filtered", anything, anything, anything, anything + ) + + otel_poller + end + end + + describe "#collect" do + before do + otel_poller + end + + it "collects pipeline stats from each running pipeline" do + expect(pipeline).to receive(:collect_stats) + otel_poller.collect + end + + it "collects DLQ stats from each running pipeline" do + expect(pipeline).to receive(:collect_dlq_stats) + otel_poller.collect + end + + it "captures flow metrics from the agent" do + expect(agent).to receive(:capture_flow_metrics) + otel_poller.collect + end + + it "collects cgroup metrics via Os.collect_cgroup" do + expect(LogStash::Instrument::PeriodicPoller::Os).to receive(:collect_cgroup).with(metric) + otel_poller.collect + end + + it "refreshes the metric store snapshot" do + initial_snapshot = otel_poller.instance_variable_get(:@snapshot) + otel_poller.collect + new_snapshot = otel_poller.instance_variable_get(:@snapshot) + expect(new_snapshot).not_to be(initial_snapshot) + end + end + + describe "#stop" do + before do + otel_poller + end + + it "flushes and shuts down the OTel service" do + expect(otel_service).to receive(:flush) + expect(otel_service).to receive(:shutdown) + otel_poller.stop + end + end + + describe "metric value retrieval" do + before do + otel_poller + end + + context "when metrics exist in the store" do + before do + metric.gauge([:stats, :events], :in, 100) + metric.gauge([:stats, :events], :out, 50) + metric.gauge([:stats, :pipelines, pipeline_id, :queue], :events, 25) + otel_poller.collect + end + + it "retrieves global metric values" do + value = otel_poller.send(:get_metric_value, :stats, :events, :in) + expect(value).to eq(100) + end + + it "retrieves pipeline metric values" do + value = otel_poller.send(:get_pipeline_metric_value, pipeline_id, :queue, :events) + expect(value).to eq(25) + end + end + + context "when metrics do not exist" do + it "returns nil for missing metrics" do + value = otel_poller.send(:get_metric_value, :nonexistent, :path) + expect(value).to be_nil + end + + it "returns nil for missing pipeline metrics" do + value = otel_poller.send(:get_pipeline_metric_value, :nonexistent_pipeline, :events, :in) + expect(value).to be_nil + end + end + end + + describe "#get_total_queue_events" do + let(:pipeline2_id) { :secondary } + let(:pipeline2) { double("pipeline2", :system? => false) } + let(:system_pipeline) { double("system_pipeline", :system? => true) } + + let(:pipelines_registry) do + double("pipelines_registry", + :running_pipelines => { + pipeline_id => pipeline, + pipeline2_id => pipeline2, + :monitoring => system_pipeline + } + ) + end + + before do + otel_poller + metric.gauge([:stats, :pipelines, pipeline_id, :queue], :events, 10) + metric.gauge([:stats, :pipelines, pipeline2_id, :queue], :events, 20) + metric.gauge([:stats, :pipelines, :monitoring, :queue], :events, 5) + otel_poller.collect + end + + it "sums queue events from non-system pipelines only" do + total = otel_poller.send(:get_total_queue_events) + expect(total).to eq(30) + end + end + + context "with mocked cgroup environment" do + let(:relative_path) { "/docker/abc123" } + let(:proc_self_cgroup_content) do + %W(4:cpuacct:#{relative_path} + 3:cpu:#{relative_path}) + end + + let(:cpuacct_usage) { 1982 } + let(:cpu_period_micros) { 100000 } + let(:cpu_quota_micros) { 50000 } + let(:cpu_stat_file_content) do + ["nr_periods 10", "nr_throttled 2", "throttled_time 1000000"] + end + + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpuacct#{relative_path}/cpuacct.usage").and_return([cpuacct_usage]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_period_us").and_return([cpu_period_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_quota_us").and_return([cpu_quota_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.stat").and_return(cpu_stat_file_content) + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(proc_self_cgroup_content) + end + + describe "cgroup metrics collection" do + let(:snapshot_store) { metric.collector.snapshot_metric.metric_store } + + before do + otel_poller.collect + end + + def mval(*metric_path) + metric_path.reduce(snapshot_store.get_shallow(:os)) { |acc, k| acc[k] }.value + end + + it "collects cpuacct usage" do + expect(mval(:cgroup, :cpuacct, :usage_nanos)).to eq(cpuacct_usage) + end + + it "collects cpu cfs_period" do + expect(mval(:cgroup, :cpu, :cfs_period_micros)).to eq(cpu_period_micros) + end + + it "collects cpu cfs_quota" do + expect(mval(:cgroup, :cpu, :cfs_quota_micros)).to eq(cpu_quota_micros) + end + + it "collects cpu stat metrics" do + expect(mval(:cgroup, :cpu, :stat, :number_of_elapsed_periods)).to eq(10) + expect(mval(:cgroup, :cpu, :stat, :number_of_times_throttled)).to eq(2) + expect(mval(:cgroup, :cpu, :stat, :time_throttled_nanos)).to eq(1000000) + end + end + end + + describe "create_pipeline_attributes" do + before do + otel_poller + end + + it "creates Attributes with pipeline.id" do + attrs = otel_poller.send(:create_pipeline_attributes, :main) + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + end + end +end From 6568168877147c86cd0c44bbba1a635b2c919c2f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 13 Mar 2026 13:50:50 +0100 Subject: [PATCH 06/17] Add plugin metrics and refresh plugin and pipeline lists on collect --- logstash-core/lib/logstash/agent.rb | 2 +- logstash-core/lib/logstash/environment.rb | 3 +- .../instrument/periodic_poller/otel.rb | 356 +++++++++++------- .../logstash/instrument/periodic_pollers.rb | 1 + 4 files changed, 224 insertions(+), 138 deletions(-) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index a6637c3b6e5..21ad768b975 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -49,7 +49,7 @@ class LogStash::Agent include LogStash::Util::Loggable STARTED_AT = Time.now.freeze - attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus + attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus, :pipelines_registry attr_accessor :logger attr_reader :health_observer diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 9181a76152f..6bc12d6544d 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -110,8 +110,7 @@ def self.as_java_range(r) Setting::StringSetting.new("keystore.classname", "org.logstash.secret.store.backend.JavaKeyStore"), Setting::StringSetting.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on Setting::NullableStringSetting.new("monitoring.cluster_uuid"), - Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]) - # OpenTelemetry metrics export settings + Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]), Setting::BooleanSetting.new("otel.metrics.enabled", false), Setting::StringSetting.new("otel.metrics.endpoint", "http://localhost:4317"), Setting::NumericSetting.new("otel.metrics.interval", 10), # seconds diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index a52a3ed5893..4d427f9e559 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -17,6 +17,7 @@ require "logstash/util/loggable" require "logstash/instrument/periodic_poller/os" +require "set" java_import 'org.logstash.instrument.metrics.otel.OTelMetricsService' java_import 'io.opentelemetry.api.common.Attributes' @@ -59,9 +60,12 @@ def initialize(metric, agent, settings) # Take initial snapshot @snapshot = @metric_store.snapshot_metric - # Register all metrics with callbacks - SDK invokes them at export time + # Track which pipelines and plugins have been registered to avoid duplicates + @registered_pipelines = Set.new + @registered_plugins = Set.new + + # Register global and cgroup metrics immediately (not pipeline-specific) register_global_metrics - register_pipeline_metrics register_cgroup_metrics logger.info("OpenTelemetry metrics poller initialized", @@ -77,6 +81,12 @@ def stop end def collect + # Register metrics for any new pipelines that have started since initialization + register_new_pipeline_metrics + + # Register metrics for any new plugins (they appear after processing first event) + register_new_plugin_metrics + collect_cgroup_metrics collect_pipeline_metrics collect_dlq_metrics @@ -85,6 +95,24 @@ def collect @snapshot = @metric_store.snapshot_metric end + def register_new_plugin_metrics + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + register_plugin_metrics_for(pipeline_id) + end + end + + def register_new_pipeline_metrics + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + next if @registered_pipelines.include?(pipeline_id) + + logger.debug("Registering OTel metrics for pipeline", :pipeline_id => pipeline_id) + register_pipeline_counters_for(pipeline_id) + register_pipeline_gauges_for(pipeline_id) + register_dlq_metrics_for(pipeline_id) + @registered_pipelines.add(pipeline_id) + end + end + private def collect_cgroup_metrics @@ -109,59 +137,44 @@ def collect_pipeline_metrics end end - # Register Pipeline metrics from pipeline.rb - def register_pipeline_metrics - # Queue gauge (total across all pipelines) - register_gauge("logstash.queue.events", "Total events in queues", "{event}") do - get_total_queue_events + # Register Dead Letter Queue metrics for a specific pipeline + def register_dlq_metrics_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) + + register_gauge( + "logstash.pipeline.dlq.queue_size", + "Current dead letter queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :queue_size_in_bytes) end - # Per-pipeline metrics - register_pipeline_counters - register_pipeline_gauges - register_dlq_metrics - end - - # Register Dead Letter Queue metrics - def register_dlq_metrics - @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| - attrs = create_pipeline_attributes(pipeline_id) - - register_gauge( - "logstash.pipeline.dlq.queue_size", - "Current dead letter queue size", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :queue_size_in_bytes) - end - - register_gauge( - "logstash.pipeline.dlq.max_queue_size", - "Maximum dead letter queue size limit", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :max_queue_size_in_bytes) - end + register_gauge( + "logstash.pipeline.dlq.max_queue_size", + "Maximum dead letter queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :max_queue_size_in_bytes) + end - register_gauge( - "logstash.pipeline.dlq.dropped_events", - "Events dropped when DLQ is full", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :dropped_events) - end + register_gauge( + "logstash.pipeline.dlq.dropped_events", + "Events dropped when DLQ is full", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :dropped_events) + end - register_gauge( - "logstash.pipeline.dlq.expired_events", - "Events expired and removed from DLQ", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :expired_events) - end + register_gauge( + "logstash.pipeline.dlq.expired_events", + "Events expired and removed from DLQ", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :expired_events) end end @@ -201,67 +214,63 @@ def register_cgroup_metrics end end - def register_pipeline_gauges - # These will be registered for each running pipeline - # TODO: Handle dynamic pipeline add/remove - @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| - attrs = create_pipeline_attributes(pipeline_id) - - register_gauge( - "logstash.pipeline.queue.events", - "Events in pipeline queue", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :events) - end + def register_pipeline_gauges_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) - # Persistent queue capacity metrics - register_gauge( - "logstash.pipeline.queue.capacity.page_capacity", - "Size of each queue page", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :capacity, :page_capacity_in_bytes) - end + register_gauge( + "logstash.pipeline.queue.events", + "Events in pipeline queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :events) + end - register_gauge( - "logstash.pipeline.queue.capacity.max_size", - "Maximum queue size limit", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_queue_size_in_bytes) - end + # Persistent queue capacity metrics + register_gauge( + "logstash.pipeline.queue.capacity.page_capacity", + "Size of each queue page", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :page_capacity_in_bytes) + end - register_gauge( - "logstash.pipeline.queue.capacity.max_unread_events", - "Maximum unread events allowed in queue", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_unread_events) - end + register_gauge( + "logstash.pipeline.queue.capacity.max_size", + "Maximum queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_queue_size_in_bytes) + end - register_gauge( - "logstash.pipeline.queue.capacity.size", - "Current persisted queue size", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :capacity, :queue_size_in_bytes) - end + register_gauge( + "logstash.pipeline.queue.capacity.max_unread_events", + "Maximum unread events allowed in queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_unread_events) + end - # Persistent queue data/storage metrics - register_gauge( - "logstash.pipeline.queue.data.free_space", - "Free disk space where queue is stored", - "By", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :queue, :data, :free_space_in_bytes) - end + register_gauge( + "logstash.pipeline.queue.capacity.size", + "Current persisted queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :queue_size_in_bytes) + end + + # Persistent queue data/storage metrics + register_gauge( + "logstash.pipeline.queue.data.free_space", + "Free disk space where queue is stored", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :data, :free_space_in_bytes) end end @@ -279,38 +288,41 @@ def register_global_metrics register_observable_counter("logstash.events.filtered", "Total events filtered", "{event}") do get_metric_value(:stats, :events, :filtered) end + + # Global queue gauge (total across all pipelines) + register_gauge("logstash.queue.events", "Total events in queues", "{event}") do + get_total_queue_events + end end - def register_pipeline_counters - @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| - attrs = create_pipeline_attributes(pipeline_id) - - register_observable_counter( - "logstash.pipeline.events.in", - "Events received by pipeline", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :events, :in) - end + def register_pipeline_counters_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) - register_observable_counter( - "logstash.pipeline.events.out", - "Events output by pipeline", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :events, :out) - end + register_observable_counter( + "logstash.pipeline.events.in", + "Events received by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :in) + end - register_observable_counter( - "logstash.pipeline.events.filtered", - "Events filtered by pipeline", - "{event}", - attrs - ) do - get_pipeline_metric_value(pipeline_id, :events, :filtered) - end + register_observable_counter( + "logstash.pipeline.events.out", + "Events output by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :out) + end + + register_observable_counter( + "logstash.pipeline.events.filtered", + "Events filtered by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :filtered) end end @@ -371,5 +383,79 @@ def create_pipeline_attributes(pipeline_id) AttributeKey.stringKey("pipeline.id"), pipeline_id.to_s ) end + + def create_plugin_attributes(pipeline_id, plugin_type, plugin_id) + Attributes.of( + AttributeKey.stringKey("pipeline.id"), pipeline_id.to_s, + AttributeKey.stringKey("plugin.type"), plugin_type.to_s, + AttributeKey.stringKey("plugin.id"), plugin_id.to_s + ) + end + + # Register plugin metrics for a specific pipeline + # Called on each collect to discover newly available plugins + def register_plugin_metrics_for(pipeline_id) + [:filters, :outputs, :inputs].each do |plugin_type| + plugin_ids = get_plugin_ids(pipeline_id, plugin_type) + plugin_ids.each do |plugin_id| + plugin_key = "#{pipeline_id}:#{plugin_type}:#{plugin_id}" + next if @registered_plugins.include?(plugin_key) + + logger.debug("Registering OTel metrics for plugin", + :pipeline_id => pipeline_id, + :plugin_type => plugin_type, + :plugin_id => plugin_id) + register_plugin_counters_for(pipeline_id, plugin_type, plugin_id) + @registered_plugins.add(plugin_key) + end + end + end + + def get_plugin_ids(pipeline_id, plugin_type) + begin + store = @snapshot.metric_store + plugins_hash = store.get_shallow(:stats, :pipelines, pipeline_id.to_sym, :plugins, plugin_type) + return [] unless plugins_hash.is_a?(Hash) + plugins_hash.keys + rescue LogStash::Instrument::MetricStore::MetricNotFound + [] + end + end + + def register_plugin_counters_for(pipeline_id, plugin_type, plugin_id) + attrs = create_plugin_attributes(pipeline_id, plugin_type, plugin_id) + + register_observable_counter( + "logstash.plugin.events.in", + "Events received by plugin", + "{event}", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :in) + end + + register_observable_counter( + "logstash.plugin.events.out", + "Events output by plugin", + "{event}", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :out) + end + + register_observable_counter( + "logstash.plugin.events.duration", + "Time spent processing events", + "ms", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :duration_in_millis) + end + end + + def get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, *path) + full_path = [:stats, :pipelines, pipeline_id.to_sym, :plugins, plugin_type, plugin_id.to_sym] + path + get_metric_value(*full_path) + end end end; end; end \ No newline at end of file diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index 01722c4e25e..b2c5ac49e05 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -27,6 +27,7 @@ module LogStash module Instrument # of the stats, this class encapsulate the starting and stopping of the poller # if the unique timer uses too much resource we can refactor this behavior here. class PeriodicPollers + include LogStash::Util::Loggable attr_reader :metric def initialize(metric, settings, agent) From 19d69623a6a9ee4a7186feb38f52f30036ad5ccb Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 13 Mar 2026 15:56:51 +0100 Subject: [PATCH 07/17] Small udpates to otel periodic poller --- .../instrument/periodic_poller/otel.rb | 58 ++++++++++--------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index 4d427f9e559..561e093be79 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -42,7 +42,10 @@ class OTel < Base def initialize(metric, agent, settings) @agent = agent @settings = settings - @metric_store = agent.metric.collector + + # Call Base initializer - sets up @metric and configures the TimerTask + super(metric, :polling_interval => settings.get("otel.metrics.interval")) + @metric_store = @metric.collector # Initialize the OTel service - SDK handles its own export timing @otel_service = OTelMetricsService.new( @@ -54,13 +57,11 @@ def initialize(metric, agent, settings) settings.get("otel.resource.attributes") ) - # Call Base initializer - sets up @metric and configures the TimerTask - super(metric, :polling_interval => settings.get("otel.metrics.interval")) - # Take initial snapshot @snapshot = @metric_store.snapshot_metric - # Track which pipelines and plugins have been registered to avoid duplicates + # Track which pipelines and plugins have been registered to avoid duplicates when + # adding new ones in #collect. @registered_pipelines = Set.new @registered_plugins = Set.new @@ -87,6 +88,7 @@ def collect # Register metrics for any new plugins (they appear after processing first event) register_new_plugin_metrics + # Note: plugin metrics are pushed automatically during event processing, no need to collect them here. collect_cgroup_metrics collect_pipeline_metrics collect_dlq_metrics @@ -95,6 +97,8 @@ def collect @snapshot = @metric_store.snapshot_metric end + private + def register_new_plugin_metrics @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| register_plugin_metrics_for(pipeline_id) @@ -113,8 +117,6 @@ def register_new_pipeline_metrics end end - private - def collect_cgroup_metrics Os.collect_cgroup(@metric) end @@ -137,6 +139,27 @@ def collect_pipeline_metrics end end + # Register observable counters - SDK computes deltas from cumulative values + def register_global_metrics + # Global event counters + register_observable_counter("logstash.events.in", "Total events received", "{event}") do + get_metric_value(:stats, :events, :in) + end + + register_observable_counter("logstash.events.out", "Total events output", "{event}") do + get_metric_value(:stats, :events, :out) + end + + register_observable_counter("logstash.events.filtered", "Total events filtered", "{event}") do + get_metric_value(:stats, :events, :filtered) + end + + # Global queue gauge (total across all pipelines) + register_gauge("logstash.queue.events", "Total events in queues", "{event}") do + get_total_queue_events + end + end + # Register Dead Letter Queue metrics for a specific pipeline def register_dlq_metrics_for(pipeline_id) attrs = create_pipeline_attributes(pipeline_id) @@ -274,27 +297,6 @@ def register_pipeline_gauges_for(pipeline_id) end end - # Register observable counters - SDK computes deltas from cumulative values - def register_global_metrics - # Global event counters - register_observable_counter("logstash.events.in", "Total events received", "{event}") do - get_metric_value(:stats, :events, :in) - end - - register_observable_counter("logstash.events.out", "Total events output", "{event}") do - get_metric_value(:stats, :events, :out) - end - - register_observable_counter("logstash.events.filtered", "Total events filtered", "{event}") do - get_metric_value(:stats, :events, :filtered) - end - - # Global queue gauge (total across all pipelines) - register_gauge("logstash.queue.events", "Total events in queues", "{event}") do - get_total_queue_events - end - end - def register_pipeline_counters_for(pipeline_id) attrs = create_pipeline_attributes(pipeline_id) From 6d59cc7c728277263111132c97218d789984bab7 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 13 Mar 2026 15:57:10 +0100 Subject: [PATCH 08/17] Add missing tests --- .../instrument/periodic_poller/otel_spec.rb | 192 +++++++++++++++++- 1 file changed, 186 insertions(+), 6 deletions(-) diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb index 4e183f936d7..e7550dcb87e 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -101,6 +101,9 @@ expect(otel_service).to receive(:registerObservableCounter).with( "logstash.events.filtered", anything, anything, anything, anything ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.queue.events", anything, anything, anything, anything + ) otel_poller end @@ -115,11 +118,21 @@ expect(otel_service).to receive(:registerGauge).with( "logstash.os.cgroup.cpu.cfs_quota", anything, anything, anything, anything ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.elapsed_periods", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.nr_times_throttled", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.time_throttled", anything, anything, anything, anything + ) otel_poller end - it "registers pipeline metrics for each running pipeline" do + it "registers pipeline metrics for each running pipeline on first collect" do + # Pipeline event counters expect(otel_service).to receive(:registerObservableCounter).with( "logstash.pipeline.events.in", anything, anything, anything, anything ) @@ -130,7 +143,41 @@ "logstash.pipeline.events.filtered", anything, anything, anything, anything ) - otel_poller + # Pipeline queue gauges + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.page_capacity", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.max_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.max_unread_events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.data.free_space", anything, anything, anything, anything + ) + + # Pipeline DLQ gauges + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.queue_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.max_queue_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.dropped_events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.expired_events", anything, anything, anything, anything + ) + + otel_poller.collect end end @@ -165,6 +212,62 @@ new_snapshot = otel_poller.instance_variable_get(:@snapshot) expect(new_snapshot).not_to be(initial_snapshot) end + + context "with plugins" do + let(:metric_store) do + double("metric_store").tap do |store| + allow(store).to receive(:get_shallow).and_return(nil) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :filters) + .and_return({ :mutate_abc123 => {} }) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :outputs) + .and_return({ :elasticsearch_def456 => {} }) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :inputs) + .and_return({ :beats_ghi789 => {} }) + end + end + + let(:snapshot) do + double("snapshot", :metric_store => metric_store) + end + + before do + # First collect registers pipeline metrics + otel_poller.collect + # Stub snapshot and set it on the poller so plugin discovery works + allow(collector).to receive(:snapshot_metric).and_return(snapshot) + otel_poller.instance_variable_set(:@snapshot, snapshot) + end + + it "registers plugin metrics for filters, outputs, and inputs" do + # Expect 3 metrics per plugin (in, out, duration) x 3 plugins = 9 calls + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.in", anything, anything, anything, anything + ).exactly(3).times + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.out", anything, anything, anything, anything + ).exactly(3).times + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.duration", anything, anything, anything, anything + ).exactly(3).times + + otel_poller.collect + end + + it "only registers each plugin once across multiple collects" do + # First collect in this test registers plugins + otel_poller.collect + + # Second collect should not register the same plugins again + expect(otel_service).not_to receive(:registerObservableCounter).with( + "logstash.plugin.events.in", anything, anything, anything, anything + ) + + otel_poller.collect + end + end end describe "#stop" do @@ -213,6 +316,58 @@ value = otel_poller.send(:get_pipeline_metric_value, :nonexistent_pipeline, :events, :in) expect(value).to be_nil end + + it "returns nil for missing plugin metrics" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :nonexistent_plugin, :events, :in) + expect(value).to be_nil + end + end + + context "#get_total_queue_events" do + let(:pipeline2) { double("pipeline2", :system? => false) } + let(:system_pipeline) { double("system_pipeline", :system? => true) } + + before do + metric.gauge([:stats, :pipelines, :main, :queue], :events, 10) + metric.gauge([:stats, :pipelines, :secondary, :queue], :events, 20) + metric.gauge([:stats, :pipelines, :monitoring, :queue], :events, 100) + + allow(pipelines_registry).to receive(:running_pipelines).and_return({ + :main => pipeline, + :secondary => pipeline2, + :monitoring => system_pipeline + }) + + otel_poller.collect + end + + it "sums queue events across all user pipelines" do + total = otel_poller.send(:get_total_queue_events) + expect(total).to eq(30) + end + + it "excludes system pipelines from the total" do + total = otel_poller.send(:get_total_queue_events) + expect(total).not_to eq(130) + end + end + + context "#get_plugin_metric_value" do + before do + metric.gauge([:stats, :pipelines, :main, :plugins, :filters, :mutate_abc, :events], :in, 500) + metric.gauge([:stats, :pipelines, :main, :plugins, :filters, :mutate_abc, :events], :out, 450) + otel_poller.collect + end + + it "retrieves plugin metric values" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :mutate_abc, :events, :in) + expect(value).to eq(500) + end + + it "retrieves different plugin metric paths" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :mutate_abc, :events, :out) + expect(value).to eq(450) + end end end @@ -299,14 +454,39 @@ def mval(*metric_path) end end - describe "create_pipeline_attributes" do + describe "attribute creation" do before do otel_poller end - it "creates Attributes with pipeline.id" do - attrs = otel_poller.send(:create_pipeline_attributes, :main) - expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + describe "#create_pipeline_attributes" do + it "creates Attributes with pipeline.id" do + attrs = otel_poller.send(:create_pipeline_attributes, :main) + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + end + + it "converts symbol pipeline_id to string" do + attrs = otel_poller.send(:create_pipeline_attributes, :my_pipeline) + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("my_pipeline") + end + end + + describe "#create_plugin_attributes" do + it "creates Attributes with pipeline.id, plugin.type, and plugin.id" do + attrs = otel_poller.send(:create_plugin_attributes, :main, :filters, :mutate_abc123) + + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + expect(attrs.get(AttributeKey.stringKey("plugin.type"))).to eq("filters") + expect(attrs.get(AttributeKey.stringKey("plugin.id"))).to eq("mutate_abc123") + end + + it "converts all symbol arguments to strings" do + attrs = otel_poller.send(:create_plugin_attributes, :secondary, :outputs, :elasticsearch_xyz) + + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("secondary") + expect(attrs.get(AttributeKey.stringKey("plugin.type"))).to eq("outputs") + expect(attrs.get(AttributeKey.stringKey("plugin.id"))).to eq("elasticsearch_xyz") + end end end end From f20ff617cc62a878f4be089b47e9d7471095e235 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 13 Mar 2026 16:09:19 +0100 Subject: [PATCH 09/17] Rename OTel to Otel --- .../instrument/periodic_poller/otel.rb | 4 +- .../instrument/periodic_poller/otel_spec.rb | 6 +- .../metrics/otel/OtelMetricsService.java | 104 +----------------- 3 files changed, 11 insertions(+), 103 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index 561e093be79..f5f7f8652a5 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -19,7 +19,7 @@ require "logstash/instrument/periodic_poller/os" require "set" -java_import 'org.logstash.instrument.metrics.otel.OTelMetricsService' +java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' java_import 'io.opentelemetry.api.common.Attributes' java_import 'io.opentelemetry.api.common.AttributeKey' @@ -48,7 +48,7 @@ def initialize(metric, agent, settings) @metric_store = @metric.collector # Initialize the OTel service - SDK handles its own export timing - @otel_service = OTelMetricsService.new( + @otel_service = OtelMetricsService.new( settings.get("otel.metrics.endpoint"), agent.id, agent.name, diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb index e7550dcb87e..ddac2ee2452 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -68,7 +68,7 @@ end before do - allow(OTelMetricsService).to receive(:new).and_return(otel_service) + allow(OtelMetricsService).to receive(:new).and_return(otel_service) end subject(:otel_poller) { described_class.new(metric, agent, settings) } @@ -78,8 +78,8 @@ expect { otel_poller }.not_to raise_error end - it "creates an OTelMetricsService with correct parameters" do - expect(OTelMetricsService).to receive(:new).with( + it "creates an OtelMetricsService with correct parameters" do + expect(OtelMetricsService).to receive(:new).with( "http://localhost:4317", "test-node-id", "test-node-name", diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java index c5ddd9d0969..1b0bf79070a 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -22,7 +22,6 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; @@ -50,20 +49,19 @@ * - Manages the lifecycle of the OTel exporter * * Usage from Ruby: - * java_import 'org.logstash.instrument.metrics.otel.OTelMetricsService' - * service = OTelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) + * java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' + * service = OtelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) * service.registerGauge("metric.name", "description", "unit", -> { get_value() }, Attributes.empty) */ -public class OTelMetricsService { +public class OtelMetricsService { - private static final Logger LOGGER = LogManager.getLogger(OTelMetricsService.class); + private static final Logger LOGGER = LogManager.getLogger(OtelMetricsService.class); private final SdkMeterProvider meterProvider; private final Meter meter; - private final Map counters = new ConcurrentHashMap<>(); + // Keep references to prevent garbage collection of observable instruments private final Map gauges = new ConcurrentHashMap<>(); private final Map observableCounters = new ConcurrentHashMap<>(); - private volatile boolean running = false; /** * Creates a new OTel metrics service. @@ -75,7 +73,7 @@ public class OTelMetricsService { * @param protocol "grpc" or "http" * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) */ - public OTelMetricsService(String endpoint, String nodeId, String nodeName, + public OtelMetricsService(String endpoint, String nodeId, String nodeName, int intervalSeconds, String protocol, String resourceAttributes) { LOGGER.info("Initializing OpenTelemetry metrics export to {} (protocol: {}, interval: {}s)", endpoint, protocol, intervalSeconds); @@ -111,7 +109,6 @@ public OTelMetricsService(String endpoint, String nodeId, String nodeName, // Get meter for creating instruments this.meter = meterProvider.get("logstash"); - this.running = true; LOGGER.info("OpenTelemetry metrics service initialized successfully"); } @@ -140,38 +137,6 @@ private void parseResourceAttributes(String attributes, AttributesBuilder builde } } - /** - * Gets or creates a Counter instrument. - * Counters are for monotonically increasing values (events processed, bytes sent, etc.) - * - * @param name Metric name (e.g., "logstash.events.in") - * @param description Human-readable description - * @param unit Unit of measurement (e.g., "{events}", "By") - * @return The counter instrument - */ - public LongCounter getOrCreateCounter(String name, String description, String unit) { - return counters.computeIfAbsent(name, n -> - meter.counterBuilder(n) - .setDescription(description) - .setUnit(unit) - .build() - ); - } - - /** - * Increments a counter by the specified amount. - * - * @param name Metric name - * @param amount Amount to increment by - * @param attributes Attributes/labels for this measurement - */ - public void incrementCounter(String name, long amount, Attributes attributes) { - LongCounter counter = counters.get(name); - if (counter != null && amount > 0) { - counter.add(amount, attributes); - } - } - /** * Registers an observable gauge with a callback. * The callback is invoked by the SDK when metrics are exported. @@ -231,62 +196,6 @@ public void registerObservableCounter(String name, String description, String un observableCounters.put(name, counter); } - /** - * Registers an observable double gauge with a callback. - * - * @param name Metric name - * @param description Human-readable description - * @param unit Unit of measurement - * @param valueSupplier Callback that returns the current value - * @param attributes Attributes/labels for this gauge - */ - public void registerDoubleGauge(String name, String description, String unit, - Supplier valueSupplier, Attributes attributes) { - meter.gaugeBuilder(name) - .setDescription(description) - .setUnit(unit) - .buildWithCallback(measurement -> { - try { - Double value = valueSupplier.get(); - if (value != null && !value.isNaN()) { - measurement.record(value, attributes); - } - } catch (Exception e) { - LOGGER.debug("Error collecting gauge {}: {}", name, e.getMessage()); - } - }); - } - - /** - * Creates Attributes from a map of key-value pairs. - * Convenience method for Ruby callers. - * - * @param attributeMap Map of attribute names to values - * @return Attributes object - */ - public static Attributes createAttributes(Map attributeMap) { - if (attributeMap == null || attributeMap.isEmpty()) { - return Attributes.empty(); - } - AttributesBuilder builder = Attributes.builder(); - attributeMap.forEach((key, value) -> builder.put(AttributeKey.stringKey(key), value)); - return builder.build(); - } - - /** - * Gets the raw Meter for advanced use cases. - */ - public Meter getMeter() { - return meter; - } - - /** - * Returns whether the service is running. - */ - public boolean isRunning() { - return running; - } - /** * Forces an immediate flush of pending metrics. */ @@ -302,7 +211,6 @@ public void flush() { */ public void shutdown() { LOGGER.info("Shutting down OpenTelemetry metrics service"); - running = false; if (meterProvider != null) { meterProvider.close(); } From 70c0e3e8915e848278aa14e090ca211d77904eec Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 16 Mar 2026 13:58:30 +0100 Subject: [PATCH 10/17] Support authorization header for java OTel SDK --- config/logstash.yml | 5 ++ logstash-core/lib/logstash/environment.rb | 1 + .../instrument/periodic_poller/otel.rb | 4 +- .../instrument/periodic_poller/otel_spec.rb | 54 +++++++++++++++++++ .../metrics/otel/OtelMetricsService.java | 37 ++++++++----- 5 files changed, 87 insertions(+), 14 deletions(-) diff --git a/config/logstash.yml b/config/logstash.yml index f1fcb0eda41..af0b378612b 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -303,6 +303,11 @@ # # otel.metrics.protocol: "grpc" # +# Authorization header for authenticated OTLP endpoints. +# Examples: "ApiKey xxx" or "Bearer xxx" +# +# otel.metrics.authorization_header: +# # Additional resource attributes as comma-separated key=value pairs. # These are attached to all exported metrics to identify this Logstash instance. # Example: "environment=production,cluster=us-west,team=platform" diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 6bc12d6544d..a1c04551b5c 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -115,6 +115,7 @@ def self.as_java_range(r) Setting::StringSetting.new("otel.metrics.endpoint", "http://localhost:4317"), Setting::NumericSetting.new("otel.metrics.interval", 10), # seconds Setting::StringSetting.new("otel.metrics.protocol", "grpc", true, ["grpc", "http"]), + Setting::NullableStringSetting.new("otel.metrics.authorization_header", nil, false), # e.g., "ApiKey xxx" or "Bearer xxx" Setting::NullableStringSetting.new("otel.resource.attributes", nil, false) # key=value,key2=value2 format # post_process ].each {|setting| SETTINGS.register(setting) } diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index f5f7f8652a5..bd0497012a9 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -35,6 +35,7 @@ module LogStash module Instrument module PeriodicPoller # otel.metrics.endpoint: "http://localhost:4317" # otel.metrics.interval: 10 # otel.metrics.protocol: "grpc" + # otel.metrics.authorization_header: "ApiKey xxx" # or "Bearer xxx" # otel.resource.attributes: "environment=production,cluster=us-west" # class OTel < Base @@ -54,7 +55,8 @@ def initialize(metric, agent, settings) agent.name, settings.get("otel.metrics.interval"), settings.get("otel.metrics.protocol"), - settings.get("otel.resource.attributes") + settings.get("otel.resource.attributes"), + settings.get("otel.metrics.authorization_header") ) # Take initial snapshot diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb index ddac2ee2452..52de17581dc 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -55,6 +55,7 @@ allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) allow(s).to receive(:get).with("otel.metrics.protocol").and_return("grpc") allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return(nil) end end @@ -85,12 +86,65 @@ "test-node-name", 10, "grpc", + nil, nil ).and_return(otel_service) otel_poller end + context "with authorization header" do + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("https://apm.example.com") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("http") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return("ApiKey my-secret-key") + end + end + + it "passes authorization_header to OtelMetricsService" do + expect(OtelMetricsService).to receive(:new).with( + "https://apm.example.com", + "test-node-id", + "test-node-name", + 10, + "http", + nil, + "ApiKey my-secret-key" + ).and_return(otel_service) + + otel_poller + end + end + + context "with Bearer token authorization" do + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("https://apm.example.com") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("http") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return("Bearer my-bearer-token") + end + end + + it "passes Bearer token to OtelMetricsService" do + expect(OtelMetricsService).to receive(:new).with( + "https://apm.example.com", + "test-node-id", + "test-node-name", + 10, + "http", + nil, + "Bearer my-bearer-token" + ).and_return(otel_service) + + otel_poller + end + end + it "registers global metrics" do expect(otel_service).to receive(:registerObservableCounter).with( "logstash.events.in", anything, anything, anything, anything diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java index 1b0bf79070a..aa69f13d70a 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -63,20 +63,25 @@ public class OtelMetricsService { private final Map gauges = new ConcurrentHashMap<>(); private final Map observableCounters = new ConcurrentHashMap<>(); + private final String authorizationHeader; + /** * Creates a new OTel metrics service. * - * @param endpoint OTLP endpoint (e.g., "http://localhost:4317") - * @param nodeId Logstash node ID - * @param nodeName Logstash node name - * @param intervalSeconds Export interval in seconds - * @param protocol "grpc" or "http" - * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) + * @param endpoint OTLP endpoint (e.g., "http://localhost:4317") + * @param nodeId Logstash node ID + * @param nodeName Logstash node name + * @param intervalSeconds Export interval in seconds + * @param protocol "grpc" or "http" + * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) + * @param authorizationHeader Authorization header value (e.g., "ApiKey xxx" or "Bearer xxx"), or null */ public OtelMetricsService(String endpoint, String nodeId, String nodeName, - int intervalSeconds, String protocol, String resourceAttributes) { + int intervalSeconds, String protocol, String resourceAttributes, + String authorizationHeader) { LOGGER.info("Initializing OpenTelemetry metrics export to {} (protocol: {}, interval: {}s)", endpoint, protocol, intervalSeconds); + this.authorizationHeader = authorizationHeader; // Build resource attributes AttributesBuilder resourceAttrsBuilder = Attributes.builder() @@ -115,16 +120,22 @@ public OtelMetricsService(String endpoint, String nodeId, String nodeName, private MetricExporter createExporter(String endpoint, String protocol) { if ("http".equalsIgnoreCase(protocol)) { - return OtlpHttpMetricExporter.builder() + var builder = OtlpHttpMetricExporter.builder() .setEndpoint(endpoint + "/v1/metrics") - .setTimeout(Duration.ofSeconds(10)) - .build(); + .setTimeout(Duration.ofSeconds(10)); + if (authorizationHeader != null && !authorizationHeader.isEmpty()) { + builder.addHeader("Authorization", authorizationHeader); + } + return builder.build(); } else { // Default to gRPC - return OtlpGrpcMetricExporter.builder() + var builder = OtlpGrpcMetricExporter.builder() .setEndpoint(endpoint) - .setTimeout(Duration.ofSeconds(10)) - .build(); + .setTimeout(Duration.ofSeconds(10)); + if (authorizationHeader != null && !authorizationHeader.isEmpty()) { + builder.addHeader("Authorization", authorizationHeader); + } + return builder.build(); } } From 9b99ff3ba4bffe118e8db2fd7e9c53c27ace8e02 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 17 Mar 2026 14:37:11 +0100 Subject: [PATCH 11/17] Fix rebase merge mistake --- logstash-core/lib/logstash/instrument/periodic_pollers.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index b2c5ac49e05..b225d793035 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -39,7 +39,6 @@ def initialize(metric, settings, agent) PeriodicPoller::DeadLetterQueue.new(metric, agent), PeriodicPoller::FlowRate.new(metric, agent), PeriodicPoller::BatchStructure.new(metric, agent)] - PeriodicPoller::FlowRate.new(metric, agent)] # Add OpenTelemetry metrics exporter if enabled if otel_metrics_enabled? From b2c60cabfb58069339cb9d1ea30be50fb0f97319 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 17 Mar 2026 14:38:28 +0100 Subject: [PATCH 12/17] Add newline at end of file --- logstash-core/lib/logstash/instrument/periodic_poller/otel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index bd0497012a9..fd985090582 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -462,4 +462,4 @@ def get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, *path) get_metric_value(*full_path) end end -end; end; end \ No newline at end of file +end; end; end From fde38dc17c40827e5ef04d5444eb8e3f7510b0ef Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 17 Mar 2026 14:43:34 +0100 Subject: [PATCH 13/17] Rename OTel ruby class to Otel to match Java SDK convention --- logstash-core/lib/logstash/instrument/periodic_poller/otel.rb | 2 +- logstash-core/lib/logstash/instrument/periodic_pollers.rb | 2 +- .../spec/logstash/instrument/periodic_poller/otel_spec.rb | 4 ++-- .../logstash/instrument/metrics/otel/OtelMetricsService.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index fd985090582..f7b58b4abba 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -38,7 +38,7 @@ module LogStash module Instrument module PeriodicPoller # otel.metrics.authorization_header: "ApiKey xxx" # or "Bearer xxx" # otel.resource.attributes: "environment=production,cluster=us-west" # - class OTel < Base + class Otel < Base def initialize(metric, agent, settings) @agent = agent diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index b225d793035..38e1d14d16c 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -43,7 +43,7 @@ def initialize(metric, settings, agent) # Add OpenTelemetry metrics exporter if enabled if otel_metrics_enabled? require "logstash/instrument/periodic_poller/otel" - @periodic_pollers << PeriodicPoller::OTel.new(metric, agent, @settings) + @periodic_pollers << PeriodicPoller::Otel.new(metric, agent, @settings) logger.info("OpenTelemetry metrics export enabled") end end diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb index 52de17581dc..925a0055f63 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -19,7 +19,7 @@ require "logstash/instrument/periodic_poller/otel" require "logstash/instrument/collector" -describe LogStash::Instrument::PeriodicPoller::OTel do +describe LogStash::Instrument::PeriodicPoller::Otel do let(:collector) { LogStash::Instrument::Collector.new } let(:metric) { LogStash::Instrument::Metric.new(collector) } let(:agent_metric) { double("agent_metric", :collector => collector) } @@ -329,7 +329,7 @@ otel_poller end - it "flushes and shuts down the OTel service" do + it "flushes and shuts down the Otel service" do expect(otel_service).to receive(:flush) expect(otel_service).to receive(:shutdown) otel_poller.stop diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java index aa69f13d70a..52dda97db0a 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -66,7 +66,7 @@ public class OtelMetricsService { private final String authorizationHeader; /** - * Creates a new OTel metrics service. + * Creates a new Otel metrics service. * * @param endpoint OTLP endpoint (e.g., "http://localhost:4317") * @param nodeId Logstash node ID From d4185bb449cb8052440c67ef4f6d6f1ab96f0ee2 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 17 Mar 2026 15:25:36 +0100 Subject: [PATCH 14/17] Add dependencies to license mapping --- .../src/main/resources/licenseMapping.csv | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv index 9960fc726c0..cabf2fa7e31 100644 --- a/tools/dependencies-report/src/main/resources/licenseMapping.csv +++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv @@ -26,6 +26,7 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "bundler:",https://bundler.io/,MIT "cabin:",https://github.com/jordansissel/ruby-cabin,Apache-2.0 "cgi:",https://github.com/ruby/cgi,BSD-2-Clause +"org.checkerframework:checker-qual:",https://github.com/typetools/checker-framework,MIT "clamp:",http://github.com/mdub/clamp,MIT "coderay:",http://coderay.rubychan.de,MIT "com.fasterxml.jackson.core:jackson-annotations:",https://github.com/FasterXML/jackson-annotations,Apache-2.0 @@ -34,8 +35,20 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:",https://github.com/FasterXML/jackson-dataformats-binary,Apache-2.0 "com.fasterxml.jackson.module:jackson-module-afterburner:",https://github.com/FasterXML/jackson-modules-base,Apache-2.0 "com.github.luben:zstd-jni:1.5.7-4",https://github.com/luben/zstd-jni,BSD-2-Clause +"com.google.code.findbugs:jsr305:",http://findbugs.sourceforge.net/,Apache-2.0 +"com.google.errorprone:error_prone_annotations:",https://github.com/google/error-prone,Apache-2.0 "com.google.googlejavaformat:google-java-format:",https://github.com/google/google-java-format,Apache-2.0 "com.google.guava:guava:",https://github.com/google/guava,Apache-2.0 +"com.google.protobuf:protobuf-java:",https://github.com/protocolbuffers/protobuf,BSD-3-Clause +"io.grpc:grpc-api:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-context:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-core:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-netty:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-netty-shaded:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-protobuf:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-protobuf-lite:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.grpc:grpc-stub:",https://github.com/grpc/grpc-java,Apache-2.0 +"io.perfmark:perfmark-api:",https://github.com/perfmark/perfmark,Apache-2.0 "com.google.j2objc:j2objc-annotations:",https://github.com/google/j2objc/,Apache-2.0 "commons-codec:commons-codec:",http://commons.apache.org/proper/commons-codec/,Apache-2.0 "commons-logging:commons-logging:",http://commons.apache.org/proper/commons-logging/,Apache-2.0 @@ -97,6 +110,18 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "io-console:",https://github.com/ruby/io-console,BSD-2-Clause "io-wait:",https://github.com/ruby/io-wait,BSD-2-Clause "io.netty:netty-all:",https://github.com/netty/netty,Apache-2.0 +"io.opentelemetry:opentelemetry-api:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-context:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-exporter-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-exporter-otlp:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-exporter-otlp-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-exporter-sender-okhttp:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk-logs:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk-metrics:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk-trace:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry.semconv:opentelemetry-semconv:",https://github.com/open-telemetry/semantic-conventions-java,Apache-2.0 "ipaddr:",https://github.com/ruby/ipaddr,BSD-2-Clause "irb:",https://github.com/ruby/irb,BSD-2-Clause "jar-dependencies:",https://github.com/mkristian/jar-dependencies,MIT @@ -110,6 +135,11 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "jruby-stdin-channel:","https://github.com/colinsurprenant/jruby-stdin-channel",Apache-2.0 "jruby:",https://jruby.org,EPL-2.0 "json:",http://json-jruby.rubyforge.org/,Ruby +"org.jetbrains.kotlin:kotlin-stdlib:",https://github.com/JetBrains/kotlin,Apache-2.0 +"org.jetbrains.kotlin:kotlin-stdlib-common:",https://github.com/JetBrains/kotlin,Apache-2.0 +"org.jetbrains.kotlin:kotlin-stdlib-jdk7:",https://github.com/JetBrains/kotlin,Apache-2.0 +"org.jetbrains.kotlin:kotlin-stdlib-jdk8:",https://github.com/JetBrains/kotlin,Apache-2.0 +"org.jetbrains:annotations:",https://github.com/JetBrains/java-annotations,Apache-2.0 "logger:",https://github.com/ruby/logger,BSD-2-Clause "lru_redux:","https://github.com/SamSaffron/lru_redux/",MIT "mail:","https://github.com/mikel/mail/",MIT @@ -136,6 +166,9 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "nio4r:","https://github.com/socketry/nio4r",MIT "nkf:",https://github.com/ruby/nkf,BSD-2-Clause "nokogiri:","http://nokogiri.org/",MIT +"com.squareup.okhttp3:okhttp:",https://github.com/square/okhttp,Apache-2.0 +"com.squareup.okio:okio:",https://github.com/square/okio,Apache-2.0 +"com.squareup.okio:okio-jvm:",https://github.com/square/okio,Apache-2.0 "observer:",https://github.com/ruby/observer,BSD-2-Clause "open-uri:",https://github.com/ruby/open-uri,BSD-2-Clause "open3:",https://github.com/ruby/open3,BSD-2-Clause From 0b3eb129f47cf6e05af4dee80d95334b649dd58f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 31 Mar 2026 12:48:03 +0200 Subject: [PATCH 15/17] Fix licensing and NOTICE files --- .../src/main/resources/licenseMapping.csv | 31 +------------------ ...opentelemetry!opentelemetry-api-NOTICE.txt | 7 +++++ ...opentelemetry!opentelemetry-bom-NOTICE.txt | 7 +++++ ...try!opentelemetry-exporter-otlp-NOTICE.txt | 7 +++++ ...opentelemetry!opentelemetry-sdk-NOTICE.txt | 7 +++++ ...metry!opentelemetry-sdk-metrics-NOTICE.txt | 7 +++++ 6 files changed, 36 insertions(+), 30 deletions(-) create mode 100644 tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt create mode 100644 tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt create mode 100644 tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt create mode 100644 tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt create mode 100644 tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv index cabf2fa7e31..a944c8cde7e 100644 --- a/tools/dependencies-report/src/main/resources/licenseMapping.csv +++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv @@ -26,7 +26,6 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "bundler:",https://bundler.io/,MIT "cabin:",https://github.com/jordansissel/ruby-cabin,Apache-2.0 "cgi:",https://github.com/ruby/cgi,BSD-2-Clause -"org.checkerframework:checker-qual:",https://github.com/typetools/checker-framework,MIT "clamp:",http://github.com/mdub/clamp,MIT "coderay:",http://coderay.rubychan.de,MIT "com.fasterxml.jackson.core:jackson-annotations:",https://github.com/FasterXML/jackson-annotations,Apache-2.0 @@ -35,21 +34,8 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:",https://github.com/FasterXML/jackson-dataformats-binary,Apache-2.0 "com.fasterxml.jackson.module:jackson-module-afterburner:",https://github.com/FasterXML/jackson-modules-base,Apache-2.0 "com.github.luben:zstd-jni:1.5.7-4",https://github.com/luben/zstd-jni,BSD-2-Clause -"com.google.code.findbugs:jsr305:",http://findbugs.sourceforge.net/,Apache-2.0 -"com.google.errorprone:error_prone_annotations:",https://github.com/google/error-prone,Apache-2.0 "com.google.googlejavaformat:google-java-format:",https://github.com/google/google-java-format,Apache-2.0 "com.google.guava:guava:",https://github.com/google/guava,Apache-2.0 -"com.google.protobuf:protobuf-java:",https://github.com/protocolbuffers/protobuf,BSD-3-Clause -"io.grpc:grpc-api:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-context:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-core:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-netty:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-netty-shaded:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-protobuf:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-protobuf-lite:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.grpc:grpc-stub:",https://github.com/grpc/grpc-java,Apache-2.0 -"io.perfmark:perfmark-api:",https://github.com/perfmark/perfmark,Apache-2.0 -"com.google.j2objc:j2objc-annotations:",https://github.com/google/j2objc/,Apache-2.0 "commons-codec:commons-codec:",http://commons.apache.org/proper/commons-codec/,Apache-2.0 "commons-logging:commons-logging:",http://commons.apache.org/proper/commons-logging/,Apache-2.0 "concurrent-ruby:",http://www.concurrent-ruby.com,MIT @@ -111,17 +97,10 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "io-wait:",https://github.com/ruby/io-wait,BSD-2-Clause "io.netty:netty-all:",https://github.com/netty/netty,Apache-2.0 "io.opentelemetry:opentelemetry-api:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-context:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-exporter-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-bom:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 "io.opentelemetry:opentelemetry-exporter-otlp:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-exporter-otlp-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-exporter-sender-okhttp:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 "io.opentelemetry:opentelemetry-sdk:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-sdk-common:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-sdk-logs:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 "io.opentelemetry:opentelemetry-sdk-metrics:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry:opentelemetry-sdk-trace:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 -"io.opentelemetry.semconv:opentelemetry-semconv:",https://github.com/open-telemetry/semantic-conventions-java,Apache-2.0 "ipaddr:",https://github.com/ruby/ipaddr,BSD-2-Clause "irb:",https://github.com/ruby/irb,BSD-2-Clause "jar-dependencies:",https://github.com/mkristian/jar-dependencies,MIT @@ -135,11 +114,6 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "jruby-stdin-channel:","https://github.com/colinsurprenant/jruby-stdin-channel",Apache-2.0 "jruby:",https://jruby.org,EPL-2.0 "json:",http://json-jruby.rubyforge.org/,Ruby -"org.jetbrains.kotlin:kotlin-stdlib:",https://github.com/JetBrains/kotlin,Apache-2.0 -"org.jetbrains.kotlin:kotlin-stdlib-common:",https://github.com/JetBrains/kotlin,Apache-2.0 -"org.jetbrains.kotlin:kotlin-stdlib-jdk7:",https://github.com/JetBrains/kotlin,Apache-2.0 -"org.jetbrains.kotlin:kotlin-stdlib-jdk8:",https://github.com/JetBrains/kotlin,Apache-2.0 -"org.jetbrains:annotations:",https://github.com/JetBrains/java-annotations,Apache-2.0 "logger:",https://github.com/ruby/logger,BSD-2-Clause "lru_redux:","https://github.com/SamSaffron/lru_redux/",MIT "mail:","https://github.com/mikel/mail/",MIT @@ -166,9 +140,6 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "nio4r:","https://github.com/socketry/nio4r",MIT "nkf:",https://github.com/ruby/nkf,BSD-2-Clause "nokogiri:","http://nokogiri.org/",MIT -"com.squareup.okhttp3:okhttp:",https://github.com/square/okhttp,Apache-2.0 -"com.squareup.okio:okio:",https://github.com/square/okio,Apache-2.0 -"com.squareup.okio:okio-jvm:",https://github.com/square/okio,Apache-2.0 "observer:",https://github.com/ruby/observer,BSD-2-Clause "open-uri:",https://github.com/ruby/open-uri,BSD-2-Clause "open3:",https://github.com/ruby/open3,BSD-2-Clause diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. From 073a15cd4b93337427ac381adab7f2d05c26efd5 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Apr 2026 11:51:27 +0200 Subject: [PATCH 16/17] OTel should be spelled Otel for consistency --- .../lib/logstash/instrument/periodic_poller/otel.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb index f7b58b4abba..3014dbe12da 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -48,7 +48,7 @@ def initialize(metric, agent, settings) super(metric, :polling_interval => settings.get("otel.metrics.interval")) @metric_store = @metric.collector - # Initialize the OTel service - SDK handles its own export timing + # Initialize the Otel service - SDK handles its own export timing @otel_service = OtelMetricsService.new( settings.get("otel.metrics.endpoint"), agent.id, @@ -95,7 +95,7 @@ def collect collect_pipeline_metrics collect_dlq_metrics @agent.capture_flow_metrics - # Refresh snapshot after collecting metrics so OTel callbacks read fresh data + # Refresh snapshot after collecting metrics so Otel callbacks read fresh data @snapshot = @metric_store.snapshot_metric end @@ -111,7 +111,7 @@ def register_new_pipeline_metrics @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| next if @registered_pipelines.include?(pipeline_id) - logger.debug("Registering OTel metrics for pipeline", :pipeline_id => pipeline_id) + logger.debug("Registering Otel metrics for pipeline", :pipeline_id => pipeline_id) register_pipeline_counters_for(pipeline_id) register_pipeline_gauges_for(pipeline_id) register_dlq_metrics_for(pipeline_id) @@ -405,7 +405,7 @@ def register_plugin_metrics_for(pipeline_id) plugin_key = "#{pipeline_id}:#{plugin_type}:#{plugin_id}" next if @registered_plugins.include?(plugin_key) - logger.debug("Registering OTel metrics for plugin", + logger.debug("Registering Otel metrics for plugin", :pipeline_id => pipeline_id, :plugin_type => plugin_type, :plugin_id => plugin_id) From 7ea06868de24bb819c26609e0071f8b8dc7d7ea7 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Apr 2026 17:38:41 +0200 Subject: [PATCH 17/17] Add documentation --- docs/reference/logstash-settings-file.md | 6 + docs/reference/monitoring-logstash.md | 6 +- .../monitoring-with-opentelemetry.md | 212 ++++++++++++++++++ docs/reference/toc.yml | 1 + 4 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 docs/reference/monitoring-with-opentelemetry.md diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index a83c125cadc..d187acab2e0 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -99,4 +99,10 @@ The `logstash.yml` file includes these settings. | `path.plugins` | Where to find custom plugins. You can specify this setting multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: `PATH/logstash/TYPE/NAME.rb` where `TYPE` is `inputs`, `filters`, `outputs`, or `codecs`, and `NAME` is the name of the plugin. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). | | `allow_superuser` | Setting to `true` to allow or `false` to block running Logstash as a superuser. | `false` | | `pipeline.buffer.type` | Determine where to allocate memory buffers, for plugins that leverage them.Defaults to `heap` but can be switched to `direct` to instruct Logstash to prefer allocation of buffers in direct memory. | `heap` Check out [Buffer Allocation types](/reference/jvm-settings.md#off-heap-buffers-allocation) for more info. | +| `otel.metrics.enabled` | Enable or disable OpenTelemetry metrics export. See [Monitoring with OpenTelemetry](/reference/monitoring-with-opentelemetry.md). | `false` | +| `otel.metrics.endpoint` | The OTLP endpoint URL for metrics export. For gRPC, typically port 4317. For HTTP, typically port 4318. | `http://localhost:4317` | +| `otel.metrics.interval` | Export interval in seconds. Controls how frequently metrics are sent to the OTLP endpoint. | `10` | +| `otel.metrics.protocol` | Protocol to use for OTLP export. Valid values are `grpc` or `http`. | `grpc` | +| `otel.metrics.authorization_header` | Authorization header for authenticated OTLP endpoints. Examples: `ApiKey xxx` or `Bearer xxx`. | *N/A* | +| `otel.resource.attributes` | Additional OpenTelemetry resource attributes as comma-separated key=value pairs. Example: `environment=production,cluster=us-west`. | *N/A* | diff --git a/docs/reference/monitoring-logstash.md b/docs/reference/monitoring-logstash.md index 52034e8db3e..a7a0b88560c 100644 --- a/docs/reference/monitoring-logstash.md +++ b/docs/reference/monitoring-logstash.md @@ -18,8 +18,10 @@ The metrics collected by Logstash include: You can use monitoring APIs provided by Logstash to retrieve these metrics. These APIs are available by default without requiring any extra configuration. -Alternatively, you can [configure Elastic Stack monitoring features](monitoring-logstash-legacy.md) to send -data to a monitoring cluster. +Alternatively, you can: + +* [Export metrics via OpenTelemetry](monitoring-with-opentelemetry.md) to send metrics to any OTLP-compatible backend, including Elastic Cloud's native OTLP endpoint. +* [Configure Elastic Stack monitoring features](monitoring-logstash-legacy.md) to send data to a monitoring cluster. ## APIs for monitoring Logstash [monitoring] diff --git a/docs/reference/monitoring-with-opentelemetry.md b/docs/reference/monitoring-with-opentelemetry.md new file mode 100644 index 00000000000..defc52fca8a --- /dev/null +++ b/docs/reference/monitoring-with-opentelemetry.md @@ -0,0 +1,212 @@ +--- +mapped_pages: + - https://www.elastic.co/guide/en/logstash/current/monitoring-with-opentelemetry.html +applies_to: + stack: preview +--- + +# Monitoring Logstash with OpenTelemetry + +Logstash can export metrics to any OpenTelemetry Protocol (OTLP) compatible backend, enabling integration with observability platforms like Elastic, Prometheus, etc. + +## Overview + +The OpenTelemetry metrics exporter sends Logstash runtime metrics directly via OTLP (OpenTelemetry Protocol). This provides a standardized way to collect and export metrics without requiring an intermediate collector, though you can also route metrics through an OpenTelemetry Collector if needed. + +## Configuration + +To enable OpenTelemetry metrics export, add the following settings to your `logstash.yml` file: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "http://localhost:4317" +otel.metrics.interval: 10 +otel.metrics.protocol: "grpc" +``` + +### Settings + +| Setting | Description | Default | +| --- | --- | --- | +| `otel.metrics.enabled` | Enable or disable OpenTelemetry metrics export. | `false` | +| `otel.metrics.endpoint` | The OTLP endpoint URL. For gRPC, typically port 4317. For HTTP, typically port 4318. | `http://localhost:4317` | +| `otel.metrics.interval` | Export interval in seconds. Controls how frequently metrics are sent to the endpoint. | `10` | +| `otel.metrics.protocol` | Protocol to use for OTLP export. Valid values are `grpc` or `http`. | `grpc` | +| `otel.metrics.authorization_header` | Authorization header for authenticated endpoints. Examples: `ApiKey xxx` or `Bearer xxx`. | *N/A* | +| `otel.resource.attributes` | Additional resource attributes as comma-separated key=value pairs. Example: `environment=production,cluster=us-west`. | *N/A* | + +## Sending metrics to Elastic Cloud + +To send metrics directly to Elastic Cloud's native OTLP endpoint: + +1. Get your Elastic Cloud OTLP endpoint from your deployment's APM integration settings +2. Create an API key with appropriate permissions +3. Configure Logstash: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "https://your-deployment.apm.us-central1.gcp.cloud.es.io:443" +otel.metrics.protocol: "http" +otel.metrics.authorization_header: "ApiKey your-base64-encoded-api-key" +``` + +## Sending metrics to an OpenTelemetry Collector + +You can also send metrics to an OpenTelemetry Collector, which can then forward them to multiple backends: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "http://otel-collector:4317" +otel.metrics.protocol: "grpc" +``` + +Example OpenTelemetry Collector configuration to forward to Elasticsearch: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + elasticsearch: + endpoints: ["https://your-elasticsearch-host:9200"] + api_key: "your-api-key" + mapping: + mode: otel + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [elasticsearch] +``` + +## Exported metrics + +Logstash exports the following metrics via OpenTelemetry: + +### Global metrics + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.events.in` | Counter | `{event}` | Total events received across all pipelines | +| `logstash.events.out` | Counter | `{event}` | Total events output across all pipelines | +| `logstash.events.filtered` | Counter | `{event}` | Total events filtered across all pipelines | +| `logstash.queue.events` | Gauge | `{event}` | Total events currently in queues | + +### Pipeline metrics + +Pipeline metrics include a `pipeline.id` attribute to identify the pipeline. + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.events.in` | Counter | `{event}` | Events received by pipeline | +| `logstash.pipeline.events.out` | Counter | `{event}` | Events output by pipeline | +| `logstash.pipeline.events.filtered` | Counter | `{event}` | Events filtered by pipeline | +| `logstash.pipeline.queue.events` | Gauge | `{event}` | Events in pipeline queue | + +### Persistent queue metrics + +These metrics are available when using persistent queues (`queue.type: persisted`). + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.queue.capacity.page_capacity` | Gauge | `By` | Size of each queue page in bytes | +| `logstash.pipeline.queue.capacity.max_size` | Gauge | `By` | Maximum queue size limit in bytes | +| `logstash.pipeline.queue.capacity.max_unread_events` | Gauge | `{event}` | Maximum unread events allowed | +| `logstash.pipeline.queue.capacity.size` | Gauge | `By` | Current persisted queue size in bytes | +| `logstash.pipeline.queue.data.free_space` | Gauge | `By` | Free disk space where queue is stored | + +### Dead letter queue metrics + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.dlq.queue_size` | Gauge | `By` | Current dead letter queue size in bytes | +| `logstash.pipeline.dlq.max_queue_size` | Gauge | `By` | Maximum DLQ size limit in bytes | +| `logstash.pipeline.dlq.dropped_events` | Gauge | `{event}` | Events dropped when DLQ is full | +| `logstash.pipeline.dlq.expired_events` | Gauge | `{event}` | Events expired and removed from DLQ | + +### Plugin metrics + +Plugin metrics include `pipeline.id`, `plugin.type`, and `plugin.id` attributes. + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.plugin.events.in` | Counter | `{event}` | Events received by plugin | +| `logstash.plugin.events.out` | Counter | `{event}` | Events output by plugin | +| `logstash.plugin.events.duration` | Counter | `ms` | Time spent processing events | + +### Cgroup metrics (Linux only) + +These metrics are available when running on Linux with cgroups enabled (e.g., in containers). + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.os.cgroup.cpuacct.usage` | Counter | `ns` | Total CPU time consumed | +| `logstash.os.cgroup.cpu.cfs_period` | Gauge | `us` | CFS scheduling period | +| `logstash.os.cgroup.cpu.cfs_quota` | Gauge | `us` | CFS scheduling quota | +| `logstash.os.cgroup.cpu.stat.elapsed_periods` | Counter | `{period}` | Number of elapsed CFS periods | +| `logstash.os.cgroup.cpu.stat.nr_times_throttled` | Counter | `{occurrence}` | Number of times throttled | +| `logstash.os.cgroup.cpu.stat.time_throttled` | Counter | `ns` | Total time throttled | + +## Resource attributes + +The following resource attributes are automatically added to all metrics: + +| Attribute | Description | +| --- | --- | +| `service.name` | Always set to `logstash` | +| `service.instance.id` | The Logstash node ID | +| `service.version` | The Logstash version | +| `host.name` | The configured node name | + +Additional resource attributes can be added using the `otel.resource.attributes` setting. + +## Viewing metrics in Kibana + +When sending metrics to Elastic Cloud via the native OTLP endpoint, metrics are stored in APM data streams (`.ds-metrics-apm.app.logstash-*`). You can view them in: + +1. **Observability > APM > Services** - Find your Logstash service +2. **Observability > Metrics Explorer** - Query metrics directly +3. **Discover** - Search the `metrics-apm.app.logstash-*` data view + +When using an OpenTelemetry Collector with the Elasticsearch exporter, create a data view matching your configured index pattern (e.g., `metrics-otel-*`). + +## Troubleshooting + +### Enable debug logging + +To see detailed OpenTelemetry SDK logs, add the following to `config/log4j2.properties`: + +```properties +logger.otel.name = io.opentelemetry +logger.otel.level = debug +``` + +### Common issues + +**Connection refused errors** + +Verify the endpoint is accessible: +- For gRPC (default): Port 4317 +- For HTTP: Port 4318 with `/v1/metrics` path automatically appended + +**Authentication errors** + +Ensure the `otel.metrics.authorization_header` is correctly formatted: +- For API keys: `ApiKey base64-encoded-key` +- For Bearer tokens: `Bearer your-token` + +**Metrics not appearing** + +- Check that `otel.metrics.enabled` is set to `true` +- Verify the export interval hasn't been set too high +- Check Logstash logs for export errors diff --git a/docs/reference/toc.yml b/docs/reference/toc.yml index e27cf354956..49ed9a021db 100644 --- a/docs/reference/toc.yml +++ b/docs/reference/toc.yml @@ -108,6 +108,7 @@ toc: - file: logstash-pipeline-viewer.md - file: monitoring-troubleshooting.md - file: monitoring-logstash.md + - file: monitoring-with-opentelemetry.md - file: working-with-plugins.md children: - file: plugin-concepts.md