diff --git a/config/logstash.yml b/config/logstash.yml index c58843168ca..af0b378612b 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -282,6 +282,38 @@ # # path.dead_letter_queue: # +# ------------ OpenTelemetry Metrics Settings -------------- +# Export Logstash metrics to an OpenTelemetry-compatible backend via OTLP. +# This allows you to send Logstash metrics to any OTLP-compatible collector +# or observability platform (e.g., Jaeger, Prometheus via OTel Collector, Grafana Cloud). +# +# Enable OpenTelemetry metrics export (default: false) +# +# otel.metrics.enabled: false +# +# OTLP endpoint URL. For gRPC, typically port 4317. For HTTP, typically port 4318. +# +# otel.metrics.endpoint: "http://localhost:4317" +# +# Export interval in seconds (default: 10) +# +# otel.metrics.interval: 10 +# +# Protocol to use for OTLP export: "grpc" (default) or "http" +# +# otel.metrics.protocol: "grpc" +# +# Authorization header for authenticated OTLP endpoints. +# Examples: "ApiKey xxx" or "Bearer xxx" +# +# otel.metrics.authorization_header: +# +# Additional resource attributes as comma-separated key=value pairs. +# These are attached to all exported metrics to identify this Logstash instance. +# Example: "environment=production,cluster=us-west,team=platform" +# +# otel.resource.attributes: +# # ------------ Debugging Settings -------------- # # Options for log.level: diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index a83c125cadc..d187acab2e0 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -99,4 +99,10 @@ The `logstash.yml` file includes these settings. | `path.plugins` | Where to find custom plugins. You can specify this setting multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: `PATH/logstash/TYPE/NAME.rb` where `TYPE` is `inputs`, `filters`, `outputs`, or `codecs`, and `NAME` is the name of the plugin. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). | | `allow_superuser` | Setting to `true` to allow or `false` to block running Logstash as a superuser. | `false` | | `pipeline.buffer.type` | Determine where to allocate memory buffers, for plugins that leverage them.Defaults to `heap` but can be switched to `direct` to instruct Logstash to prefer allocation of buffers in direct memory. | `heap` Check out [Buffer Allocation types](/reference/jvm-settings.md#off-heap-buffers-allocation) for more info. | +| `otel.metrics.enabled` | Enable or disable OpenTelemetry metrics export. See [Monitoring with OpenTelemetry](/reference/monitoring-with-opentelemetry.md). | `false` | +| `otel.metrics.endpoint` | The OTLP endpoint URL for metrics export. For gRPC, typically port 4317. For HTTP, typically port 4318. | `http://localhost:4317` | +| `otel.metrics.interval` | Export interval in seconds. Controls how frequently metrics are sent to the OTLP endpoint. | `10` | +| `otel.metrics.protocol` | Protocol to use for OTLP export. Valid values are `grpc` or `http`. | `grpc` | +| `otel.metrics.authorization_header` | Authorization header for authenticated OTLP endpoints. Examples: `ApiKey xxx` or `Bearer xxx`. | *N/A* | +| `otel.resource.attributes` | Additional OpenTelemetry resource attributes as comma-separated key=value pairs. Example: `environment=production,cluster=us-west`. | *N/A* | diff --git a/docs/reference/monitoring-logstash.md b/docs/reference/monitoring-logstash.md index 52034e8db3e..a7a0b88560c 100644 --- a/docs/reference/monitoring-logstash.md +++ b/docs/reference/monitoring-logstash.md @@ -18,8 +18,10 @@ The metrics collected by Logstash include: You can use monitoring APIs provided by Logstash to retrieve these metrics. These APIs are available by default without requiring any extra configuration. -Alternatively, you can [configure Elastic Stack monitoring features](monitoring-logstash-legacy.md) to send -data to a monitoring cluster. +Alternatively, you can: + +* [Export metrics via OpenTelemetry](monitoring-with-opentelemetry.md) to send metrics to any OTLP-compatible backend, including Elastic Cloud's native OTLP endpoint. +* [Configure Elastic Stack monitoring features](monitoring-logstash-legacy.md) to send data to a monitoring cluster. ## APIs for monitoring Logstash [monitoring] diff --git a/docs/reference/monitoring-with-opentelemetry.md b/docs/reference/monitoring-with-opentelemetry.md new file mode 100644 index 00000000000..defc52fca8a --- /dev/null +++ b/docs/reference/monitoring-with-opentelemetry.md @@ -0,0 +1,212 @@ +--- +mapped_pages: + - https://www.elastic.co/guide/en/logstash/current/monitoring-with-opentelemetry.html +applies_to: + stack: preview +--- + +# Monitoring Logstash with OpenTelemetry + +Logstash can export metrics to any OpenTelemetry Protocol (OTLP) compatible backend, enabling integration with observability platforms like Elastic, Prometheus, etc. + +## Overview + +The OpenTelemetry metrics exporter sends Logstash runtime metrics directly via OTLP (OpenTelemetry Protocol). This provides a standardized way to collect and export metrics without requiring an intermediate collector, though you can also route metrics through an OpenTelemetry Collector if needed. + +## Configuration + +To enable OpenTelemetry metrics export, add the following settings to your `logstash.yml` file: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "http://localhost:4317" +otel.metrics.interval: 10 +otel.metrics.protocol: "grpc" +``` + +### Settings + +| Setting | Description | Default | +| --- | --- | --- | +| `otel.metrics.enabled` | Enable or disable OpenTelemetry metrics export. | `false` | +| `otel.metrics.endpoint` | The OTLP endpoint URL. For gRPC, typically port 4317. For HTTP, typically port 4318. | `http://localhost:4317` | +| `otel.metrics.interval` | Export interval in seconds. Controls how frequently metrics are sent to the endpoint. | `10` | +| `otel.metrics.protocol` | Protocol to use for OTLP export. Valid values are `grpc` or `http`. | `grpc` | +| `otel.metrics.authorization_header` | Authorization header for authenticated endpoints. Examples: `ApiKey xxx` or `Bearer xxx`. | *N/A* | +| `otel.resource.attributes` | Additional resource attributes as comma-separated key=value pairs. Example: `environment=production,cluster=us-west`. | *N/A* | + +## Sending metrics to Elastic Cloud + +To send metrics directly to Elastic Cloud's native OTLP endpoint: + +1. Get your Elastic Cloud OTLP endpoint from your deployment's APM integration settings +2. Create an API key with appropriate permissions +3. Configure Logstash: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "https://your-deployment.apm.us-central1.gcp.cloud.es.io:443" +otel.metrics.protocol: "http" +otel.metrics.authorization_header: "ApiKey your-base64-encoded-api-key" +``` + +## Sending metrics to an OpenTelemetry Collector + +You can also send metrics to an OpenTelemetry Collector, which can then forward them to multiple backends: + +```yaml +otel.metrics.enabled: true +otel.metrics.endpoint: "http://otel-collector:4317" +otel.metrics.protocol: "grpc" +``` + +Example OpenTelemetry Collector configuration to forward to Elasticsearch: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + elasticsearch: + endpoints: ["https://your-elasticsearch-host:9200"] + api_key: "your-api-key" + mapping: + mode: otel + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [elasticsearch] +``` + +## Exported metrics + +Logstash exports the following metrics via OpenTelemetry: + +### Global metrics + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.events.in` | Counter | `{event}` | Total events received across all pipelines | +| `logstash.events.out` | Counter | `{event}` | Total events output across all pipelines | +| `logstash.events.filtered` | Counter | `{event}` | Total events filtered across all pipelines | +| `logstash.queue.events` | Gauge | `{event}` | Total events currently in queues | + +### Pipeline metrics + +Pipeline metrics include a `pipeline.id` attribute to identify the pipeline. + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.events.in` | Counter | `{event}` | Events received by pipeline | +| `logstash.pipeline.events.out` | Counter | `{event}` | Events output by pipeline | +| `logstash.pipeline.events.filtered` | Counter | `{event}` | Events filtered by pipeline | +| `logstash.pipeline.queue.events` | Gauge | `{event}` | Events in pipeline queue | + +### Persistent queue metrics + +These metrics are available when using persistent queues (`queue.type: persisted`). + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.queue.capacity.page_capacity` | Gauge | `By` | Size of each queue page in bytes | +| `logstash.pipeline.queue.capacity.max_size` | Gauge | `By` | Maximum queue size limit in bytes | +| `logstash.pipeline.queue.capacity.max_unread_events` | Gauge | `{event}` | Maximum unread events allowed | +| `logstash.pipeline.queue.capacity.size` | Gauge | `By` | Current persisted queue size in bytes | +| `logstash.pipeline.queue.data.free_space` | Gauge | `By` | Free disk space where queue is stored | + +### Dead letter queue metrics + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.pipeline.dlq.queue_size` | Gauge | `By` | Current dead letter queue size in bytes | +| `logstash.pipeline.dlq.max_queue_size` | Gauge | `By` | Maximum DLQ size limit in bytes | +| `logstash.pipeline.dlq.dropped_events` | Gauge | `{event}` | Events dropped when DLQ is full | +| `logstash.pipeline.dlq.expired_events` | Gauge | `{event}` | Events expired and removed from DLQ | + +### Plugin metrics + +Plugin metrics include `pipeline.id`, `plugin.type`, and `plugin.id` attributes. + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.plugin.events.in` | Counter | `{event}` | Events received by plugin | +| `logstash.plugin.events.out` | Counter | `{event}` | Events output by plugin | +| `logstash.plugin.events.duration` | Counter | `ms` | Time spent processing events | + +### Cgroup metrics (Linux only) + +These metrics are available when running on Linux with cgroups enabled (e.g., in containers). + +| Metric name | Type | Unit | Description | +| --- | --- | --- | --- | +| `logstash.os.cgroup.cpuacct.usage` | Counter | `ns` | Total CPU time consumed | +| `logstash.os.cgroup.cpu.cfs_period` | Gauge | `us` | CFS scheduling period | +| `logstash.os.cgroup.cpu.cfs_quota` | Gauge | `us` | CFS scheduling quota | +| `logstash.os.cgroup.cpu.stat.elapsed_periods` | Counter | `{period}` | Number of elapsed CFS periods | +| `logstash.os.cgroup.cpu.stat.nr_times_throttled` | Counter | `{occurrence}` | Number of times throttled | +| `logstash.os.cgroup.cpu.stat.time_throttled` | Counter | `ns` | Total time throttled | + +## Resource attributes + +The following resource attributes are automatically added to all metrics: + +| Attribute | Description | +| --- | --- | +| `service.name` | Always set to `logstash` | +| `service.instance.id` | The Logstash node ID | +| `service.version` | The Logstash version | +| `host.name` | The configured node name | + +Additional resource attributes can be added using the `otel.resource.attributes` setting. + +## Viewing metrics in Kibana + +When sending metrics to Elastic Cloud via the native OTLP endpoint, metrics are stored in APM data streams (`.ds-metrics-apm.app.logstash-*`). You can view them in: + +1. **Observability > APM > Services** - Find your Logstash service +2. **Observability > Metrics Explorer** - Query metrics directly +3. **Discover** - Search the `metrics-apm.app.logstash-*` data view + +When using an OpenTelemetry Collector with the Elasticsearch exporter, create a data view matching your configured index pattern (e.g., `metrics-otel-*`). + +## Troubleshooting + +### Enable debug logging + +To see detailed OpenTelemetry SDK logs, add the following to `config/log4j2.properties`: + +```properties +logger.otel.name = io.opentelemetry +logger.otel.level = debug +``` + +### Common issues + +**Connection refused errors** + +Verify the endpoint is accessible: +- For gRPC (default): Port 4317 +- For HTTP: Port 4318 with `/v1/metrics` path automatically appended + +**Authentication errors** + +Ensure the `otel.metrics.authorization_header` is correctly formatted: +- For API keys: `ApiKey base64-encoded-key` +- For Bearer tokens: `Bearer your-token` + +**Metrics not appearing** + +- Check that `otel.metrics.enabled` is set to `true` +- Verify the export interval hasn't been set too high +- Check Logstash logs for export errors diff --git a/docs/reference/toc.yml b/docs/reference/toc.yml index e27cf354956..49ed9a021db 100644 --- a/docs/reference/toc.yml +++ b/docs/reference/toc.yml @@ -108,6 +108,7 @@ toc: - file: logstash-pipeline-viewer.md - file: monitoring-troubleshooting.md - file: monitoring-logstash.md + - file: monitoring-with-opentelemetry.md - file: working-with-plugins.md children: - file: plugin-concepts.md diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 76140a9d1f2..b7284110ca1 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -259,4 +259,12 @@ dependencies { api group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14' api group: 'commons-codec', name: 'commons-codec', version: '1.17.0' api group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.16' + + // OpenTelemetry SDK for metrics export + def otelVersion = '1.59.0' + implementation platform("io.opentelemetry:opentelemetry-bom:${otelVersion}") + implementation 'io.opentelemetry:opentelemetry-api' + implementation 'io.opentelemetry:opentelemetry-sdk' + implementation 'io.opentelemetry:opentelemetry-sdk-metrics' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp' } diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index ab854e8974b..21ad768b975 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -49,7 +49,7 @@ class LogStash::Agent include LogStash::Util::Loggable STARTED_AT = Time.now.freeze - attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus + attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus, :pipelines_registry attr_accessor :logger attr_reader :health_observer @@ -518,7 +518,7 @@ def configure_metrics_collectors LogStash::Instrument::NullMetric.new(@collector) end - @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric, settings.get("queue.type"), self) + @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric, settings, self) @periodic_pollers.start end diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 343202177cc..a1c04551b5c 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -110,7 +110,13 @@ def self.as_java_range(r) Setting::StringSetting.new("keystore.classname", "org.logstash.secret.store.backend.JavaKeyStore"), Setting::StringSetting.new("keystore.file", ::File.join(::File.join(LogStash::Environment::LOGSTASH_HOME, "config"), "logstash.keystore"), false), # will be populated on Setting::NullableStringSetting.new("monitoring.cluster_uuid"), - Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]) + Setting::StringSetting.new("pipeline.buffer.type", "heap", true, ["direct", "heap"]), + Setting::BooleanSetting.new("otel.metrics.enabled", false), + Setting::StringSetting.new("otel.metrics.endpoint", "http://localhost:4317"), + Setting::NumericSetting.new("otel.metrics.interval", 10), # seconds + Setting::StringSetting.new("otel.metrics.protocol", "grpc", true, ["grpc", "http"]), + Setting::NullableStringSetting.new("otel.metrics.authorization_header", nil, false), # e.g., "ApiKey xxx" or "Bearer xxx" + Setting::NullableStringSetting.new("otel.resource.attributes", nil, false) # key=value,key2=value2 format # post_process ].each {|setting| SETTINGS.register(setting) } diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/os.rb b/logstash-core/lib/logstash/instrument/periodic_poller/os.rb index 64fa87a13a2..f6c827ed078 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/os.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/os.rb @@ -25,25 +25,26 @@ def initialize(metric, options = {}) end def collect - collect_cgroup + self.class.collect_cgroup(metric) end - def collect_cgroup - if stats = Cgroup.get - save_metric([:os], :cgroup, stats) + class << self + def collect_cgroup(metric) + if stats = Cgroup.get + save_metric(metric, [:os], :cgroup, stats) + end end - end - # Recursive function to create the Cgroups values form the created hash - def save_metric(namespace, k, v) - if v.is_a?(Hash) - v.each do |new_key, new_value| - n = namespace.dup - n << k.to_sym - save_metric(n, new_key, new_value) + def save_metric(metric, namespace, k, v) + if v.is_a?(Hash) + v.each do |new_key, new_value| + n = namespace.dup + n << k.to_sym + save_metric(metric, n, new_key, new_value) + end + else + metric.gauge(namespace, k.to_sym, v) end - else - metric.gauge(namespace, k.to_sym, v) end end end diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb new file mode 100644 index 00000000000..3014dbe12da --- /dev/null +++ b/logstash-core/lib/logstash/instrument/periodic_poller/otel.rb @@ -0,0 +1,465 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/util/loggable" +require "logstash/instrument/periodic_poller/os" +require "set" + +java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' +java_import 'io.opentelemetry.api.common.Attributes' +java_import 'io.opentelemetry.api.common.AttributeKey' + +module LogStash module Instrument module PeriodicPoller + # Exports Logstash metrics to an OpenTelemetry-compatible backend via OTLP. + # + # This class extends Base to use the periodic polling mechanism for collecting + # metrics into the metric store. The OTel SDK callbacks then read from the + # refreshed snapshot during export. + # + # Configuration in logstash.yml: + # otel.metrics.enabled: true + # otel.metrics.endpoint: "http://localhost:4317" + # otel.metrics.interval: 10 + # otel.metrics.protocol: "grpc" + # otel.metrics.authorization_header: "ApiKey xxx" # or "Bearer xxx" + # otel.resource.attributes: "environment=production,cluster=us-west" + # + class Otel < Base + + def initialize(metric, agent, settings) + @agent = agent + @settings = settings + + # Call Base initializer - sets up @metric and configures the TimerTask + super(metric, :polling_interval => settings.get("otel.metrics.interval")) + @metric_store = @metric.collector + + # Initialize the Otel service - SDK handles its own export timing + @otel_service = OtelMetricsService.new( + settings.get("otel.metrics.endpoint"), + agent.id, + agent.name, + settings.get("otel.metrics.interval"), + settings.get("otel.metrics.protocol"), + settings.get("otel.resource.attributes"), + settings.get("otel.metrics.authorization_header") + ) + + # Take initial snapshot + @snapshot = @metric_store.snapshot_metric + + # Track which pipelines and plugins have been registered to avoid duplicates when + # adding new ones in #collect. + @registered_pipelines = Set.new + @registered_plugins = Set.new + + # Register global and cgroup metrics immediately (not pipeline-specific) + register_global_metrics + register_cgroup_metrics + + logger.info("OpenTelemetry metrics poller initialized", + :endpoint => settings.get("otel.metrics.endpoint"), + :interval => settings.get("otel.metrics.interval")) + end + + def stop + logger.info("Stopping OpenTelemetry metrics poller") + super + @otel_service.flush + @otel_service.shutdown + end + + def collect + # Register metrics for any new pipelines that have started since initialization + register_new_pipeline_metrics + + # Register metrics for any new plugins (they appear after processing first event) + register_new_plugin_metrics + + # Note: plugin metrics are pushed automatically during event processing, no need to collect them here. + collect_cgroup_metrics + collect_pipeline_metrics + collect_dlq_metrics + @agent.capture_flow_metrics + # Refresh snapshot after collecting metrics so Otel callbacks read fresh data + @snapshot = @metric_store.snapshot_metric + end + + private + + def register_new_plugin_metrics + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + register_plugin_metrics_for(pipeline_id) + end + end + + def register_new_pipeline_metrics + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, _pipeline| + next if @registered_pipelines.include?(pipeline_id) + + logger.debug("Registering Otel metrics for pipeline", :pipeline_id => pipeline_id) + register_pipeline_counters_for(pipeline_id) + register_pipeline_gauges_for(pipeline_id) + register_dlq_metrics_for(pipeline_id) + @registered_pipelines.add(pipeline_id) + end + end + + def collect_cgroup_metrics + Os.collect_cgroup(@metric) + end + + def collect_dlq_metrics + pipelines = @agent.running_user_defined_pipelines + pipelines.each do |_, pipeline| + unless pipeline.nil? + pipeline.collect_dlq_stats + end + end + end + + def collect_pipeline_metrics + pipelines = @agent.running_user_defined_pipelines + pipelines.each do |_, pipeline| + unless pipeline.nil? + pipeline.collect_stats + end + end + end + + # Register observable counters - SDK computes deltas from cumulative values + def register_global_metrics + # Global event counters + register_observable_counter("logstash.events.in", "Total events received", "{event}") do + get_metric_value(:stats, :events, :in) + end + + register_observable_counter("logstash.events.out", "Total events output", "{event}") do + get_metric_value(:stats, :events, :out) + end + + register_observable_counter("logstash.events.filtered", "Total events filtered", "{event}") do + get_metric_value(:stats, :events, :filtered) + end + + # Global queue gauge (total across all pipelines) + register_gauge("logstash.queue.events", "Total events in queues", "{event}") do + get_total_queue_events + end + end + + # Register Dead Letter Queue metrics for a specific pipeline + def register_dlq_metrics_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) + + register_gauge( + "logstash.pipeline.dlq.queue_size", + "Current dead letter queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.dlq.max_queue_size", + "Maximum dead letter queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :max_queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.dlq.dropped_events", + "Events dropped when DLQ is full", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :dropped_events) + end + + register_gauge( + "logstash.pipeline.dlq.expired_events", + "Events expired and removed from DLQ", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :dead_letter_queue, :expired_events) + end + end + + # Register cgroup metrics + def register_cgroup_metrics + # Reports the total CPU time consumed by all tasks in this cgroup (including tasks lower in the hierarchy) + # - observable counter (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpuacct.usage", "Total CPU time consumed", "ns") do + get_metric_value(:os, :cgroup, :cpuacct, :usage_nanos) + end + + # A period of time in microseconds for how regularly a cgroup's access to CPU resources should be + # reallocated - gauges (can change at runtime) + register_gauge("logstash.os.cgroup.cpu.cfs_period", "CFS scheduling period", "us") do + get_metric_value(:os, :cgroup, :cpu, :cfs_period_micros) + end + + # Total amount of time in microseconds for which all tasks in a cgroup can run during one period + # - gauges (can change at runtime) + register_gauge("logstash.os.cgroup.cpu.cfs_quota", "CFS scheduling quota", "us") do + get_metric_value(:os, :cgroup, :cpu, :cfs_quota_micros) + end + + # Number of period intervals that have elapsed - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.elapsed_periods", "Number of elapsed CFS periods", "{period}") do + get_metric_value(:os, :cgroup, :cpu, :stat, :number_of_elapsed_periods) + end + + # Number of times the tasks in this cgroup were throttled - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.nr_times_throttled", "Number of times throttled", "{occurrence}") do + get_metric_value(:os, :cgroup, :cpu, :stat, :number_of_times_throttled) + end + + # Total time in nanoseconds for which tasks in this cgroup were throttled - observable counters (monotonically increasing) + register_observable_counter("logstash.os.cgroup.cpu.stat.time_throttled", "Total time throttled", "ns") do + get_metric_value(:os, :cgroup, :cpu, :stat, :time_throttled_nanos) + end + end + + def register_pipeline_gauges_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) + + register_gauge( + "logstash.pipeline.queue.events", + "Events in pipeline queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :events) + end + + # Persistent queue capacity metrics + register_gauge( + "logstash.pipeline.queue.capacity.page_capacity", + "Size of each queue page", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :page_capacity_in_bytes) + end + + register_gauge( + "logstash.pipeline.queue.capacity.max_size", + "Maximum queue size limit", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_queue_size_in_bytes) + end + + register_gauge( + "logstash.pipeline.queue.capacity.max_unread_events", + "Maximum unread events allowed in queue", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :max_unread_events) + end + + register_gauge( + "logstash.pipeline.queue.capacity.size", + "Current persisted queue size", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :capacity, :queue_size_in_bytes) + end + + # Persistent queue data/storage metrics + register_gauge( + "logstash.pipeline.queue.data.free_space", + "Free disk space where queue is stored", + "By", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :queue, :data, :free_space_in_bytes) + end + end + + def register_pipeline_counters_for(pipeline_id) + attrs = create_pipeline_attributes(pipeline_id) + + register_observable_counter( + "logstash.pipeline.events.in", + "Events received by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :in) + end + + register_observable_counter( + "logstash.pipeline.events.out", + "Events output by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :out) + end + + register_observable_counter( + "logstash.pipeline.events.filtered", + "Events filtered by pipeline", + "{event}", + attrs + ) do + get_pipeline_metric_value(pipeline_id, :events, :filtered) + end + end + + def register_gauge(name, description, unit, attributes = Attributes.empty, &block) + supplier = -> { + begin + value = block.call + value.nil? ? nil : value.to_java(:long) + rescue => e + logger.debug("Error getting gauge value for #{name}", :error => e.message) + nil + end + } + @otel_service.registerGauge(name, description, unit, supplier, attributes) + end + + def register_observable_counter(name, description, unit, attributes = Attributes.empty, &block) + supplier = -> { + begin + value = block.call + value.nil? ? nil : value.to_java(:long) + rescue => e + logger.debug("Error getting observable counter value for #{name}", :error => e.message) + nil + end + } + + @otel_service.registerObservableCounter(name, description, unit, supplier, attributes) + end + + # Helper to get metric values from the store + def get_metric_value(*path) + store = @snapshot.metric_store + + result = store.get_shallow(*path) + result.is_a?(Hash) ? nil : result&.value + rescue LogStash::Instrument::MetricStore::MetricNotFound + nil + end + + def get_pipeline_metric_value(pipeline_id, *path) + full_path = [:stats, :pipelines, pipeline_id.to_sym] + path + get_metric_value(*full_path) + end + + def get_total_queue_events + total = 0 + @agent.pipelines_registry.running_pipelines.each do |pipeline_id, pipeline| + next if pipeline.system? + queue_events = get_pipeline_metric_value(pipeline_id, :queue, :events) + total += queue_events if queue_events + end + total + end + + def create_pipeline_attributes(pipeline_id) + Attributes.of( + AttributeKey.stringKey("pipeline.id"), pipeline_id.to_s + ) + end + + def create_plugin_attributes(pipeline_id, plugin_type, plugin_id) + Attributes.of( + AttributeKey.stringKey("pipeline.id"), pipeline_id.to_s, + AttributeKey.stringKey("plugin.type"), plugin_type.to_s, + AttributeKey.stringKey("plugin.id"), plugin_id.to_s + ) + end + + # Register plugin metrics for a specific pipeline + # Called on each collect to discover newly available plugins + def register_plugin_metrics_for(pipeline_id) + [:filters, :outputs, :inputs].each do |plugin_type| + plugin_ids = get_plugin_ids(pipeline_id, plugin_type) + plugin_ids.each do |plugin_id| + plugin_key = "#{pipeline_id}:#{plugin_type}:#{plugin_id}" + next if @registered_plugins.include?(plugin_key) + + logger.debug("Registering Otel metrics for plugin", + :pipeline_id => pipeline_id, + :plugin_type => plugin_type, + :plugin_id => plugin_id) + register_plugin_counters_for(pipeline_id, plugin_type, plugin_id) + @registered_plugins.add(plugin_key) + end + end + end + + def get_plugin_ids(pipeline_id, plugin_type) + begin + store = @snapshot.metric_store + plugins_hash = store.get_shallow(:stats, :pipelines, pipeline_id.to_sym, :plugins, plugin_type) + return [] unless plugins_hash.is_a?(Hash) + plugins_hash.keys + rescue LogStash::Instrument::MetricStore::MetricNotFound + [] + end + end + + def register_plugin_counters_for(pipeline_id, plugin_type, plugin_id) + attrs = create_plugin_attributes(pipeline_id, plugin_type, plugin_id) + + register_observable_counter( + "logstash.plugin.events.in", + "Events received by plugin", + "{event}", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :in) + end + + register_observable_counter( + "logstash.plugin.events.out", + "Events output by plugin", + "{event}", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :out) + end + + register_observable_counter( + "logstash.plugin.events.duration", + "Time spent processing events", + "ms", + attrs + ) do + get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, :events, :duration_in_millis) + end + end + + def get_plugin_metric_value(pipeline_id, plugin_type, plugin_id, *path) + full_path = [:stats, :pipelines, pipeline_id.to_sym, :plugins, plugin_type, plugin_id.to_sym] + path + get_metric_value(*full_path) + end + end +end; end; end diff --git a/logstash-core/lib/logstash/instrument/periodic_pollers.rb b/logstash-core/lib/logstash/instrument/periodic_pollers.rb index 30c731495d9..38e1d14d16c 100644 --- a/logstash-core/lib/logstash/instrument/periodic_pollers.rb +++ b/logstash-core/lib/logstash/instrument/periodic_pollers.rb @@ -27,16 +27,32 @@ module LogStash module Instrument # of the stats, this class encapsulate the starting and stopping of the poller # if the unique timer uses too much resource we can refactor this behavior here. class PeriodicPollers + include LogStash::Util::Loggable attr_reader :metric - def initialize(metric, queue_type, agent) + def initialize(metric, settings, agent) @metric = metric + @settings = settings || LogStash::SETTINGS @periodic_pollers = [PeriodicPoller::Os.new(metric), PeriodicPoller::JVM.new(metric), - PeriodicPoller::PersistentQueue.new(metric, queue_type, agent), + PeriodicPoller::PersistentQueue.new(metric, @settings.get("queue.type"), agent), PeriodicPoller::DeadLetterQueue.new(metric, agent), PeriodicPoller::FlowRate.new(metric, agent), PeriodicPoller::BatchStructure.new(metric, agent)] + + # Add OpenTelemetry metrics exporter if enabled + if otel_metrics_enabled? + require "logstash/instrument/periodic_poller/otel" + @periodic_pollers << PeriodicPoller::Otel.new(metric, agent, @settings) + logger.info("OpenTelemetry metrics export enabled") + end + end + + def otel_metrics_enabled? + @settings.get("otel.metrics.enabled") + rescue => e + logger.debug("Could not read otel.metrics.enabled setting", :error => e.message) + false end def start diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb new file mode 100644 index 00000000000..925a0055f63 --- /dev/null +++ b/logstash-core/spec/logstash/instrument/periodic_poller/otel_spec.rb @@ -0,0 +1,546 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require "logstash/instrument/periodic_poller/otel" +require "logstash/instrument/collector" + +describe LogStash::Instrument::PeriodicPoller::Otel do + let(:collector) { LogStash::Instrument::Collector.new } + let(:metric) { LogStash::Instrument::Metric.new(collector) } + let(:agent_metric) { double("agent_metric", :collector => collector) } + + let(:pipeline_id) { :main } + let(:pipeline) do + double("pipeline", + :system? => false, + :collect_stats => nil, + :collect_dlq_stats => nil + ) + end + let(:pipelines_registry) do + double("pipelines_registry", + :running_pipelines => { pipeline_id => pipeline } + ) + end + + let(:agent) do + double("agent", + :id => "test-node-id", + :name => "test-node-name", + :metric => agent_metric, + :pipelines_registry => pipelines_registry, + :running_user_defined_pipelines => { pipeline_id => pipeline }, + :capture_flow_metrics => nil + ) + end + + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("http://localhost:4317") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("grpc") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return(nil) + end + end + + let(:otel_service) do + double("otel_service", + :registerGauge => nil, + :registerObservableCounter => nil, + :flush => nil, + :shutdown => nil + ) + end + + before do + allow(OtelMetricsService).to receive(:new).and_return(otel_service) + end + + subject(:otel_poller) { described_class.new(metric, agent, settings) } + + describe "#initialize" do + it "should initialize cleanly" do + expect { otel_poller }.not_to raise_error + end + + it "creates an OtelMetricsService with correct parameters" do + expect(OtelMetricsService).to receive(:new).with( + "http://localhost:4317", + "test-node-id", + "test-node-name", + 10, + "grpc", + nil, + nil + ).and_return(otel_service) + + otel_poller + end + + context "with authorization header" do + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("https://apm.example.com") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("http") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return("ApiKey my-secret-key") + end + end + + it "passes authorization_header to OtelMetricsService" do + expect(OtelMetricsService).to receive(:new).with( + "https://apm.example.com", + "test-node-id", + "test-node-name", + 10, + "http", + nil, + "ApiKey my-secret-key" + ).and_return(otel_service) + + otel_poller + end + end + + context "with Bearer token authorization" do + let(:settings) do + double("settings").tap do |s| + allow(s).to receive(:get).with("otel.metrics.endpoint").and_return("https://apm.example.com") + allow(s).to receive(:get).with("otel.metrics.interval").and_return(10) + allow(s).to receive(:get).with("otel.metrics.protocol").and_return("http") + allow(s).to receive(:get).with("otel.resource.attributes").and_return(nil) + allow(s).to receive(:get).with("otel.metrics.authorization_header").and_return("Bearer my-bearer-token") + end + end + + it "passes Bearer token to OtelMetricsService" do + expect(OtelMetricsService).to receive(:new).with( + "https://apm.example.com", + "test-node-id", + "test-node-name", + 10, + "http", + nil, + "Bearer my-bearer-token" + ).and_return(otel_service) + + otel_poller + end + end + + it "registers global metrics" do + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.in", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.out", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.events.filtered", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.queue.events", anything, anything, anything, anything + ) + + otel_poller + end + + it "registers cgroup metrics" do + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpuacct.usage", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.os.cgroup.cpu.cfs_period", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.os.cgroup.cpu.cfs_quota", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.elapsed_periods", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.nr_times_throttled", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.os.cgroup.cpu.stat.time_throttled", anything, anything, anything, anything + ) + + otel_poller + end + + it "registers pipeline metrics for each running pipeline on first collect" do + # Pipeline event counters + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.in", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.out", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.pipeline.events.filtered", anything, anything, anything, anything + ) + + # Pipeline queue gauges + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.page_capacity", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.max_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.max_unread_events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.capacity.size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.queue.data.free_space", anything, anything, anything, anything + ) + + # Pipeline DLQ gauges + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.queue_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.max_queue_size", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.dropped_events", anything, anything, anything, anything + ) + expect(otel_service).to receive(:registerGauge).with( + "logstash.pipeline.dlq.expired_events", anything, anything, anything, anything + ) + + otel_poller.collect + end + end + + describe "#collect" do + before do + otel_poller + end + + it "collects pipeline stats from each running pipeline" do + expect(pipeline).to receive(:collect_stats) + otel_poller.collect + end + + it "collects DLQ stats from each running pipeline" do + expect(pipeline).to receive(:collect_dlq_stats) + otel_poller.collect + end + + it "captures flow metrics from the agent" do + expect(agent).to receive(:capture_flow_metrics) + otel_poller.collect + end + + it "collects cgroup metrics via Os.collect_cgroup" do + expect(LogStash::Instrument::PeriodicPoller::Os).to receive(:collect_cgroup).with(metric) + otel_poller.collect + end + + it "refreshes the metric store snapshot" do + initial_snapshot = otel_poller.instance_variable_get(:@snapshot) + otel_poller.collect + new_snapshot = otel_poller.instance_variable_get(:@snapshot) + expect(new_snapshot).not_to be(initial_snapshot) + end + + context "with plugins" do + let(:metric_store) do + double("metric_store").tap do |store| + allow(store).to receive(:get_shallow).and_return(nil) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :filters) + .and_return({ :mutate_abc123 => {} }) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :outputs) + .and_return({ :elasticsearch_def456 => {} }) + allow(store).to receive(:get_shallow) + .with(:stats, :pipelines, :main, :plugins, :inputs) + .and_return({ :beats_ghi789 => {} }) + end + end + + let(:snapshot) do + double("snapshot", :metric_store => metric_store) + end + + before do + # First collect registers pipeline metrics + otel_poller.collect + # Stub snapshot and set it on the poller so plugin discovery works + allow(collector).to receive(:snapshot_metric).and_return(snapshot) + otel_poller.instance_variable_set(:@snapshot, snapshot) + end + + it "registers plugin metrics for filters, outputs, and inputs" do + # Expect 3 metrics per plugin (in, out, duration) x 3 plugins = 9 calls + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.in", anything, anything, anything, anything + ).exactly(3).times + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.out", anything, anything, anything, anything + ).exactly(3).times + expect(otel_service).to receive(:registerObservableCounter).with( + "logstash.plugin.events.duration", anything, anything, anything, anything + ).exactly(3).times + + otel_poller.collect + end + + it "only registers each plugin once across multiple collects" do + # First collect in this test registers plugins + otel_poller.collect + + # Second collect should not register the same plugins again + expect(otel_service).not_to receive(:registerObservableCounter).with( + "logstash.plugin.events.in", anything, anything, anything, anything + ) + + otel_poller.collect + end + end + end + + describe "#stop" do + before do + otel_poller + end + + it "flushes and shuts down the Otel service" do + expect(otel_service).to receive(:flush) + expect(otel_service).to receive(:shutdown) + otel_poller.stop + end + end + + describe "metric value retrieval" do + before do + otel_poller + end + + context "when metrics exist in the store" do + before do + metric.gauge([:stats, :events], :in, 100) + metric.gauge([:stats, :events], :out, 50) + metric.gauge([:stats, :pipelines, pipeline_id, :queue], :events, 25) + otel_poller.collect + end + + it "retrieves global metric values" do + value = otel_poller.send(:get_metric_value, :stats, :events, :in) + expect(value).to eq(100) + end + + it "retrieves pipeline metric values" do + value = otel_poller.send(:get_pipeline_metric_value, pipeline_id, :queue, :events) + expect(value).to eq(25) + end + end + + context "when metrics do not exist" do + it "returns nil for missing metrics" do + value = otel_poller.send(:get_metric_value, :nonexistent, :path) + expect(value).to be_nil + end + + it "returns nil for missing pipeline metrics" do + value = otel_poller.send(:get_pipeline_metric_value, :nonexistent_pipeline, :events, :in) + expect(value).to be_nil + end + + it "returns nil for missing plugin metrics" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :nonexistent_plugin, :events, :in) + expect(value).to be_nil + end + end + + context "#get_total_queue_events" do + let(:pipeline2) { double("pipeline2", :system? => false) } + let(:system_pipeline) { double("system_pipeline", :system? => true) } + + before do + metric.gauge([:stats, :pipelines, :main, :queue], :events, 10) + metric.gauge([:stats, :pipelines, :secondary, :queue], :events, 20) + metric.gauge([:stats, :pipelines, :monitoring, :queue], :events, 100) + + allow(pipelines_registry).to receive(:running_pipelines).and_return({ + :main => pipeline, + :secondary => pipeline2, + :monitoring => system_pipeline + }) + + otel_poller.collect + end + + it "sums queue events across all user pipelines" do + total = otel_poller.send(:get_total_queue_events) + expect(total).to eq(30) + end + + it "excludes system pipelines from the total" do + total = otel_poller.send(:get_total_queue_events) + expect(total).not_to eq(130) + end + end + + context "#get_plugin_metric_value" do + before do + metric.gauge([:stats, :pipelines, :main, :plugins, :filters, :mutate_abc, :events], :in, 500) + metric.gauge([:stats, :pipelines, :main, :plugins, :filters, :mutate_abc, :events], :out, 450) + otel_poller.collect + end + + it "retrieves plugin metric values" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :mutate_abc, :events, :in) + expect(value).to eq(500) + end + + it "retrieves different plugin metric paths" do + value = otel_poller.send(:get_plugin_metric_value, :main, :filters, :mutate_abc, :events, :out) + expect(value).to eq(450) + end + end + end + + describe "#get_total_queue_events" do + let(:pipeline2_id) { :secondary } + let(:pipeline2) { double("pipeline2", :system? => false) } + let(:system_pipeline) { double("system_pipeline", :system? => true) } + + let(:pipelines_registry) do + double("pipelines_registry", + :running_pipelines => { + pipeline_id => pipeline, + pipeline2_id => pipeline2, + :monitoring => system_pipeline + } + ) + end + + before do + otel_poller + metric.gauge([:stats, :pipelines, pipeline_id, :queue], :events, 10) + metric.gauge([:stats, :pipelines, pipeline2_id, :queue], :events, 20) + metric.gauge([:stats, :pipelines, :monitoring, :queue], :events, 5) + otel_poller.collect + end + + it "sums queue events from non-system pipelines only" do + total = otel_poller.send(:get_total_queue_events) + expect(total).to eq(30) + end + end + + context "with mocked cgroup environment" do + let(:relative_path) { "/docker/abc123" } + let(:proc_self_cgroup_content) do + %W(4:cpuacct:#{relative_path} + 3:cpu:#{relative_path}) + end + + let(:cpuacct_usage) { 1982 } + let(:cpu_period_micros) { 100000 } + let(:cpu_quota_micros) { 50000 } + let(:cpu_stat_file_content) do + ["nr_periods 10", "nr_throttled 2", "throttled_time 1000000"] + end + + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpuacct#{relative_path}/cpuacct.usage").and_return([cpuacct_usage]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_period_us").and_return([cpu_period_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_quota_us").and_return([cpu_quota_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.stat").and_return(cpu_stat_file_content) + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(proc_self_cgroup_content) + end + + describe "cgroup metrics collection" do + let(:snapshot_store) { metric.collector.snapshot_metric.metric_store } + + before do + otel_poller.collect + end + + def mval(*metric_path) + metric_path.reduce(snapshot_store.get_shallow(:os)) { |acc, k| acc[k] }.value + end + + it "collects cpuacct usage" do + expect(mval(:cgroup, :cpuacct, :usage_nanos)).to eq(cpuacct_usage) + end + + it "collects cpu cfs_period" do + expect(mval(:cgroup, :cpu, :cfs_period_micros)).to eq(cpu_period_micros) + end + + it "collects cpu cfs_quota" do + expect(mval(:cgroup, :cpu, :cfs_quota_micros)).to eq(cpu_quota_micros) + end + + it "collects cpu stat metrics" do + expect(mval(:cgroup, :cpu, :stat, :number_of_elapsed_periods)).to eq(10) + expect(mval(:cgroup, :cpu, :stat, :number_of_times_throttled)).to eq(2) + expect(mval(:cgroup, :cpu, :stat, :time_throttled_nanos)).to eq(1000000) + end + end + end + + describe "attribute creation" do + before do + otel_poller + end + + describe "#create_pipeline_attributes" do + it "creates Attributes with pipeline.id" do + attrs = otel_poller.send(:create_pipeline_attributes, :main) + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + end + + it "converts symbol pipeline_id to string" do + attrs = otel_poller.send(:create_pipeline_attributes, :my_pipeline) + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("my_pipeline") + end + end + + describe "#create_plugin_attributes" do + it "creates Attributes with pipeline.id, plugin.type, and plugin.id" do + attrs = otel_poller.send(:create_plugin_attributes, :main, :filters, :mutate_abc123) + + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("main") + expect(attrs.get(AttributeKey.stringKey("plugin.type"))).to eq("filters") + expect(attrs.get(AttributeKey.stringKey("plugin.id"))).to eq("mutate_abc123") + end + + it "converts all symbol arguments to strings" do + attrs = otel_poller.send(:create_plugin_attributes, :secondary, :outputs, :elasticsearch_xyz) + + expect(attrs.get(AttributeKey.stringKey("pipeline.id"))).to eq("secondary") + expect(attrs.get(AttributeKey.stringKey("plugin.type"))).to eq("outputs") + expect(attrs.get(AttributeKey.stringKey("plugin.id"))).to eq("elasticsearch_xyz") + end + end + end +end diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java new file mode 100644 index 00000000000..52dda97db0a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/otel/OtelMetricsService.java @@ -0,0 +1,230 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics.otel; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Service that manages OpenTelemetry metrics export for Logstash. + * + * This service: + * - Creates and configures the OTel SDK MeterProvider + * - Provides methods to create metrics instruments (counters, gauges) + * - Manages the lifecycle of the OTel exporter + * + * Usage from Ruby: + * java_import 'org.logstash.instrument.metrics.otel.OtelMetricsService' + * service = OtelMetricsService.new(endpoint, node_id, node_name, interval_secs, "grpc", nil) + * service.registerGauge("metric.name", "description", "unit", -> { get_value() }, Attributes.empty) + */ +public class OtelMetricsService { + + private static final Logger LOGGER = LogManager.getLogger(OtelMetricsService.class); + + private final SdkMeterProvider meterProvider; + private final Meter meter; + // Keep references to prevent garbage collection of observable instruments + private final Map gauges = new ConcurrentHashMap<>(); + private final Map observableCounters = new ConcurrentHashMap<>(); + + private final String authorizationHeader; + + /** + * Creates a new Otel metrics service. + * + * @param endpoint OTLP endpoint (e.g., "http://localhost:4317") + * @param nodeId Logstash node ID + * @param nodeName Logstash node name + * @param intervalSeconds Export interval in seconds + * @param protocol "grpc" or "http" + * @param resourceAttributes Additional resource attributes (comma-separated key=value pairs) + * @param authorizationHeader Authorization header value (e.g., "ApiKey xxx" or "Bearer xxx"), or null + */ + public OtelMetricsService(String endpoint, String nodeId, String nodeName, + int intervalSeconds, String protocol, String resourceAttributes, + String authorizationHeader) { + LOGGER.info("Initializing OpenTelemetry metrics export to {} (protocol: {}, interval: {}s)", + endpoint, protocol, intervalSeconds); + this.authorizationHeader = authorizationHeader; + + // Build resource attributes + AttributesBuilder resourceAttrsBuilder = Attributes.builder() + .put(AttributeKey.stringKey("service.name"), "logstash") + .put(AttributeKey.stringKey("service.instance.id"), nodeId) + .put(AttributeKey.stringKey("host.name"), nodeName); + + // Parse additional resource attributes if provided + if (resourceAttributes != null && !resourceAttributes.isEmpty()) { + parseResourceAttributes(resourceAttributes, resourceAttrsBuilder); + } + + Resource resource = Resource.getDefault().merge( + Resource.create(resourceAttrsBuilder.build()) + ); + + // Create the appropriate exporter based on protocol + MetricExporter exporter = createExporter(endpoint, protocol); + + // Create periodic reader + PeriodicMetricReader metricReader = PeriodicMetricReader.builder(exporter) + .setInterval(Duration.ofSeconds(intervalSeconds)) + .build(); + + // Create meter provider + this.meterProvider = SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader(metricReader) + .build(); + + // Get meter for creating instruments + this.meter = meterProvider.get("logstash"); + + LOGGER.info("OpenTelemetry metrics service initialized successfully"); + } + + private MetricExporter createExporter(String endpoint, String protocol) { + if ("http".equalsIgnoreCase(protocol)) { + var builder = OtlpHttpMetricExporter.builder() + .setEndpoint(endpoint + "/v1/metrics") + .setTimeout(Duration.ofSeconds(10)); + if (authorizationHeader != null && !authorizationHeader.isEmpty()) { + builder.addHeader("Authorization", authorizationHeader); + } + return builder.build(); + } else { + // Default to gRPC + var builder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(Duration.ofSeconds(10)); + if (authorizationHeader != null && !authorizationHeader.isEmpty()) { + builder.addHeader("Authorization", authorizationHeader); + } + return builder.build(); + } + } + + private void parseResourceAttributes(String attributes, AttributesBuilder builder) { + for (String pair : attributes.split(",")) { + String[] keyValue = pair.trim().split("=", 2); + if (keyValue.length == 2) { + builder.put(AttributeKey.stringKey(keyValue[0].trim()), keyValue[1].trim()); + } + } + } + + /** + * Registers an observable gauge with a callback. + * The callback is invoked by the SDK when metrics are exported. + * + * @param name Metric name + * @param description Human-readable description + * @param unit Unit of measurement + * @param valueSupplier Callback that returns the current value + * @param attributes Attributes/labels for this gauge + */ + public void registerGauge(String name, String description, String unit, + Supplier valueSupplier, Attributes attributes) { + ObservableLongGauge gauge = meter.gaugeBuilder(name) + .setDescription(description) + .setUnit(unit) + .ofLongs() + .buildWithCallback(measurement -> { + try { + Long value = valueSupplier.get(); + if (value != null) { + measurement.record(value, attributes); + } + } catch (Exception e) { + LOGGER.debug("Error collecting gauge {}: {}", name, e.getMessage()); + } + }); + gauges.put(name, gauge); + } + + /** + * Registers an observable counter with a callback. + * Observable counters are for monotonically increasing values where you observe + * the cumulative total (e.g., CPU time consumed, total bytes read from disk). + * The SDK automatically computes deltas between observations. + * + * @param name Metric name + * @param description Human-readable description + * @param unit Unit of measurement + * @param valueSupplier Callback that returns the current cumulative value + * @param attributes Attributes/labels for this counter + */ + public void registerObservableCounter(String name, String description, String unit, + Supplier valueSupplier, Attributes attributes) { + ObservableLongCounter counter = meter.counterBuilder(name) + .setDescription(description) + .setUnit(unit) + .buildWithCallback(measurement -> { + try { + Long value = valueSupplier.get(); + if (value != null && value >= 0) { + measurement.record(value, attributes); + } + } catch (Exception e) { + LOGGER.debug("Error collecting observable counter {}: {}", name, e.getMessage()); + } + }); + observableCounters.put(name, counter); + } + + /** + * Forces an immediate flush of pending metrics. + */ + public void flush() { + if (meterProvider != null) { + meterProvider.forceFlush(); + } + } + + /** + * Shuts down the service and releases resources. + * Flushes any pending metrics before closing. + */ + public void shutdown() { + LOGGER.info("Shutting down OpenTelemetry metrics service"); + if (meterProvider != null) { + meterProvider.close(); + } + LOGGER.info("OpenTelemetry metrics service shut down"); + } +} diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv index 9960fc726c0..a944c8cde7e 100644 --- a/tools/dependencies-report/src/main/resources/licenseMapping.csv +++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv @@ -36,7 +36,6 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "com.github.luben:zstd-jni:1.5.7-4",https://github.com/luben/zstd-jni,BSD-2-Clause "com.google.googlejavaformat:google-java-format:",https://github.com/google/google-java-format,Apache-2.0 "com.google.guava:guava:",https://github.com/google/guava,Apache-2.0 -"com.google.j2objc:j2objc-annotations:",https://github.com/google/j2objc/,Apache-2.0 "commons-codec:commons-codec:",http://commons.apache.org/proper/commons-codec/,Apache-2.0 "commons-logging:commons-logging:",http://commons.apache.org/proper/commons-logging/,Apache-2.0 "concurrent-ruby:",http://www.concurrent-ruby.com,MIT @@ -97,6 +96,11 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL "io-console:",https://github.com/ruby/io-console,BSD-2-Clause "io-wait:",https://github.com/ruby/io-wait,BSD-2-Clause "io.netty:netty-all:",https://github.com/netty/netty,Apache-2.0 +"io.opentelemetry:opentelemetry-api:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-bom:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-exporter-otlp:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 +"io.opentelemetry:opentelemetry-sdk-metrics:",https://github.com/open-telemetry/opentelemetry-java,Apache-2.0 "ipaddr:",https://github.com/ruby/ipaddr,BSD-2-Clause "irb:",https://github.com/ruby/irb,BSD-2-Clause "jar-dependencies:",https://github.com/mkristian/jar-dependencies,MIT diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-api-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-bom-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-exporter-otlp-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0. diff --git a/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt new file mode 100644 index 00000000000..e416b4ba168 --- /dev/null +++ b/tools/dependencies-report/src/main/resources/notices/io.opentelemetry!opentelemetry-sdk-metrics-NOTICE.txt @@ -0,0 +1,7 @@ +OpenTelemetry Java +Copyright The OpenTelemetry Authors + +This product includes software developed at +The OpenTelemetry Authors (https://opentelemetry.io/). + +Licensed under the Apache License, Version 2.0.