diff --git a/.buildkite/scripts/health-report-tests/tests/recovery-1m.yaml b/.buildkite/scripts/health-report-tests/tests/recovery-1m.yaml new file mode 100644 index 00000000000..71f21b03920 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/recovery-1m.yaml @@ -0,0 +1,45 @@ +name: "Recovery in 1min pipeline" +config: + - pipeline.id: crashy-recovery-pp + pipeline.recoverable: true + config.string: | + input { heartbeat { interval => 0.1 } } + filter { + ruby { + 'init' => '@crash_time = ::Time.now + 20' + 'code' => 'event.set("[@metadata][poison]", ::Time.now > @crash_time)' + } + if [@metadata][poison] { + failure_injector { crash_at => filter } + } + } + output { stdout {} } +conditions: + full_start_required: true + wait_seconds: 35 # anticipate one crash 20 seconds after pipeline starts, not enough for two. +expectation: + status: "yellow" + symptom: "1 indicator is concerning (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`crashy-recovery-pp`)" + indicators: + crashy-recovery-pp: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent" + cause: "pipeline has recovered from crashes 1 time in the last 5 minutes" + action: "inspect logs to determine source of crash" + help_url: { $include: "health-report-pipeline-recovery" } + impacts: + - id: "logstash:health:pipeline:recovery:impact:intermittent_processing" + severity: 2 + description: "pipeline recently recovered from a crash" + impact_areas: [ "pipeline_execution" ] + details: + status: + state: "RUNNING" + recovery_log: + - { $match: "{ISO8601}" } diff --git a/config/logstash.yml b/config/logstash.yml index c58843168ca..d2903c47fd5 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -122,6 +122,15 @@ # # config.reload.interval: 3s # +# Whether to allow the pipeline reload feature to recover a pipeline that has crashed. +# Requires `config.reload.automatic`. Pipeline cannot include non-reloadable plugins. +# Accepts one of the following values: +# - `false` (default): never recover after observing crash +# - `auto`: recover if-and-only-if `queue.type` is set to `persisted` +# - `true`: always recover after observing crash (WARNING: risk of data loss) +# +# pipeline.recoverable: false +# # Show fully compiled configuration as debug log message # NOTE: --log.level must be 'debug' # diff --git a/config/pipelines.yml b/config/pipelines.yml index 560f0b187c5..dded37c192d 100644 --- a/config/pipelines.yml +++ b/config/pipelines.yml @@ -44,15 +44,21 @@ # # Default is 1000 (effectively disabled). Set to a lower value to enable chunking. # pipeline.batch.output_chunking.growth_threshold_factor: 1000 # -# Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false". -# "auto" automatically enables ordering if the 'pipeline.workers' setting -# is also set to '1', and disables otherwise. -# "true" enforces ordering on a pipeline and prevents logstash from starting -# a pipeline with multiple workers allocated. -# "false" disable any extra processing necessary for preserving ordering. -# +# # Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false". +# # "auto" automatically enables ordering if the 'pipeline.workers' setting +# # is also set to '1', and disables otherwise. +# # "true" enforces ordering on a pipeline and prevents logstash from starting +# # a pipeline with multiple workers allocated. +# # "false" disable any extra processing necessary for preserving ordering. # pipeline.ordered: auto # +# # Configure this pipeline to be recoverable if it crashes and `config.reload.automatic` is enabled. +# # Accepts one of the following values: +# # - `false` (default): never recover after observing crash +# # - `auto`: recover if-and-only-if `queue.type` is set to `persisted` +# # - `true`: always recover after observing crash (WARNING: risk of data loss) +# pipeline.recoverable: false +# # # Internal queuing model, "memory" for legacy in-memory based queuing and # # "persisted" for disk-based acked queueing. Defaults is memory # queue.type: memory diff --git a/docker/data/logstash/env2yaml/src/main/java/org/logstash/env2yaml/Env2Yaml.java b/docker/data/logstash/env2yaml/src/main/java/org/logstash/env2yaml/Env2Yaml.java index c2591d7bc9a..bfff8ee8cfc 100644 --- a/docker/data/logstash/env2yaml/src/main/java/org/logstash/env2yaml/Env2Yaml.java +++ b/docker/data/logstash/env2yaml/src/main/java/org/logstash/env2yaml/Env2Yaml.java @@ -43,6 +43,7 @@ private Map buildSettingMap() { "node.name", "path.data", "pipeline.id", "pipeline.workers", "pipeline.output.workers", "pipeline.batch.size", "pipeline.batch.delay", "pipeline.batch.output_chunking.growth_threshold_factor", "pipeline.unsafe_shutdown", "pipeline.ecs_compatibility", "pipeline.ordered", + "pipeline.recoverable", "pipeline.reloadable", "pipeline.plugin_classloaders", "pipeline.separate_logs", "path.config", "config.string", "config.test_and_exit", "config.reload.automatic", "config.reload.interval", "config.debug", "config.support_escapes", diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md index a83c125cadc..5932eeaf4e5 100644 --- a/docs/reference/logstash-settings-file.md +++ b/docs/reference/logstash-settings-file.md @@ -56,6 +56,7 @@ The `logstash.yml` file includes these settings. | `pipeline.unsafe_shutdown` | When set to `true`, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown. | `false` | | `pipeline.plugin_classloaders` | (Beta) Load Java plugins in independent classloaders to isolate their dependencies. | `false` | | `pipeline.ordered` | Set the pipeline event ordering. Valid options are:

* `auto`. Automatically enables ordering if the `pipeline.workers` setting is `1`, and disables otherwise.
* `true`. Enforces ordering on the pipeline and prevents Logstash from starting if there are multiple workers.
* `false`. Disables the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost of preserving order.
| `auto` | +| `pipeline.recoverable` | Configure the pipeline to be eligible for automated recovery. Requires using `config.reload.automatic`. All plugins in the pipeline must be reloadable. Possible values are:

* `auto`: enable only if using a persisted queue type.
* `true`: always enable, even if queue is ephemeral (may cause data loss)
* `false`: never enable
| `false` | | `pipeline.ecs_compatibility` | Sets the pipeline’s default value for `ecs_compatibility`, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema. Possible values are:

* `disabled`
* `v1`
* `v8`

This option allows the [early opt-in (or preemptive opt-out) of ECS compatibility](/reference/ecs-ls.md) modes in plugins, which is scheduled to be on-by-default in a future major release of {{ls}}.

Values other than `disabled` are currently considered BETA, and may produce unintended consequences when upgrading {{ls}}.
| `disabled` | | `path.config` | The path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). | | `config.string` | A string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file. | *N/A* | diff --git a/docs/static/spec/openapi/logstash-api.yaml b/docs/static/spec/openapi/logstash-api.yaml index 1ed68e1b111..7460d9eacf6 100644 --- a/docs/static/spec/openapi/logstash-api.yaml +++ b/docs/static/spec/openapi/logstash-api.yaml @@ -1741,6 +1741,12 @@ paths: properties: worker_utilization: $ref: '#/components/schemas/FlowWindows' + recovery_log: + type: array + description: "If the pipeline has successfully recovered from a crash, include a list of timestamps" + items: + type: string + description: An ISO8601-compliant timestamp impacts: type: array description: "If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on Logstash." @@ -1860,6 +1866,37 @@ paths: worker_utilization: last_1_minute: 100.0 last_5_minutes: 100.0 + recoveryCase: + description: "A pipeline has recently recovered from a crash" + value: + status: "yellow" + symptom: "1 indicator is unhealthy (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`crashy-pipeline`)" + indicators: + crashy-pipeline: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent" + cause: "pipeline has recovered from crashes 1 time in the last 5 minutes" + action: "inspect logs to determine source of crash" + impacts: + - id: "logstash:health:pipeline:recovery:impact:intermittent_processing" + severity: 2 + description: "the pipeline recently recovered from a crash" + impact_areas: [ "pipeline_execution" ] + details: + status: + state: "RUNNING" + flow: + worker_utilization: + last_1_minute: 0.1 + last_5_minutes: 0.1 + recovery_log: + - "2026-04-01T21:12:04.498144Z" x-metaTags: - content: Logstash name: product_name diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index ab854e8974b..c16b47ec91b 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -190,7 +190,9 @@ def pipeline_details(pipeline_id) else PipelineIndicator::Status::UNKNOWN end - PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation) + PipelineIndicator::Details.new(status, + sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation, + sync_state.recovery_log) end end @@ -564,7 +566,7 @@ def update_success_metrics(action, action_result) # When a pipeline is successfully created we create the metric # place holder related to the lifecycle of the pipeline initialize_pipeline_metrics(action) - when LogStash::PipelineAction::Reload + when LogStash::PipelineAction::Reload, LogStash::PipelineAction::Recover update_successful_reload_metrics(action, action_result) end end diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 343202177cc..5ceeca979ff 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -57,6 +57,7 @@ def self.as_java_range(r) Setting::PositiveIntegerSetting.new("pipeline.batch.output_chunking.growth_threshold_factor", 1000), Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false), Setting::BooleanSetting.new("pipeline.reloadable", true), + Setting::CoercibleStringSetting.new("pipeline.recoverable", "false", true, %w(auto true false)), Setting::BooleanSetting.new("pipeline.plugin_classloaders", false), Setting::BooleanSetting.new("pipeline.separate_logs", false), Setting::CoercibleStringSetting.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]), diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index f64dd415e62..d3005b2075e 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -270,6 +270,37 @@ def start_workers batch_output_chunking_growth_threshold_factor = settings.get("pipeline.batch.output_chunking.growth_threshold_factor") batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode") + + queue_type = settings.get('queue.type') + queue_is_ephemeral = (queue_type == MEMORY) + pipeline_recoverable = settings.get('pipeline.recoverable') + pipeline_is_recoverable = case pipeline_recoverable + when 'true' then true + when 'false' then false + when 'auto' then !queue_is_ephemeral + end + config_reload_automatic = settings.get('config.reload.automatic') + + if pipeline_is_recoverable + if !config_reload_automatic + @logger.warn("Pipeline is configured to be recoverable with `pipeline.recoverable: #{pipeline_recoverable}`, " + + "but config reloading has been disabled with `config.reload.automatic: #{config_reload_automatic}`; " + + "if this pipeline crashes it will NOT be recovered.", + default_logging_keys) + elsif queue_is_ephemeral + @logger.warn("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " + + "with `pipeline.recoverable: #{pipeline_recoverable}`; " + + "in the event of a crash in-flight events will be lost, " + + "so enabling auto-recovery increases the risk of data loss.", + default_logging_keys) + else + @logger.info("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " + + "with `pipeline.recoverable: #{pipeline_recoverable}`; " + + "in the event of a crash some in-flight events may be re-processed", + default_logging_keys) + end + end + max_inflight = batch_size * pipeline_workers config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config]) @@ -290,7 +321,8 @@ def start_workers "pipeline.batch.output_chunking.growth_threshold_factor" => batch_output_chunking_growth_threshold_factor, "pipeline.max_inflight" => max_inflight, "batch_metric_sampling" => batch_metric_sampling, - "pipeline.sources" => pipeline_source_details) + "pipeline.sources" => pipeline_source_details, + "pipeline.recoverable" => pipeline_is_recoverable && config_reload_automatic) @logger.info("Starting pipeline", pipeline_log_params) filter_queue_client.set_batch_dimensions(batch_size, batch_delay) diff --git a/logstash-core/lib/logstash/pipeline_action.rb b/logstash-core/lib/logstash/pipeline_action.rb index 3ae612ec058..dc6d5e9e493 100644 --- a/logstash-core/lib/logstash/pipeline_action.rb +++ b/logstash-core/lib/logstash/pipeline_action.rb @@ -19,11 +19,13 @@ require "logstash/pipeline_action/create" require "logstash/pipeline_action/stop" require "logstash/pipeline_action/reload" +require "logstash/pipeline_action/recover" require "logstash/pipeline_action/delete" require "logstash/pipeline_action/stop_and_delete" module LogStash module PipelineAction ORDERING = { + LogStash::PipelineAction::Recover => 99, LogStash::PipelineAction::Create => 100, LogStash::PipelineAction::Reload => 200, LogStash::PipelineAction::Stop => 300, diff --git a/logstash-core/lib/logstash/pipeline_action/base.rb b/logstash-core/lib/logstash/pipeline_action/base.rb index 8060cbcc956..7629a029b3b 100644 --- a/logstash-core/lib/logstash/pipeline_action/base.rb +++ b/logstash-core/lib/logstash/pipeline_action/base.rb @@ -41,5 +41,13 @@ def <=>(other) order = self.execution_priority <=> other.execution_priority order.nonzero? ? order : self.pipeline_id <=> other.pipeline_id end + + def new_failed_action(reason) + ConvergeResult::FailedAction.from_action(self, reason); + end + + def new_success_action + ConvergeResult::SuccessfulAction.new + end end end end diff --git a/logstash-core/lib/logstash/pipeline_action/recover.rb b/logstash-core/lib/logstash/pipeline_action/recover.rb new file mode 100644 index 00000000000..121ea015cf2 --- /dev/null +++ b/logstash-core/lib/logstash/pipeline_action/recover.rb @@ -0,0 +1,84 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/pipeline_action/base" +require "logstash/java_pipeline" + +module LogStash module PipelineAction + class Recover < Base + include LogStash::Util::Loggable + + def initialize(pipeline_config, metric) + @pipeline_config = pipeline_config + @metric = metric + end + + def pipeline_id + @pipeline_config.pipeline_id.to_sym + end + + def to_s + "PipelineAction::Recover<#{pipeline_id}>" + end + + def execute(agent, pipelines_registry) + old_pipeline = pipelines_registry.get_pipeline(pipeline_id) + + # guard with descriptive errors + if old_pipeline.nil? + return new_failed_action("pipeline does not exist") + elsif old_pipeline.running? || !old_pipeline.crashed? + return new_failed_action("existing pipeline is not in a settled crashed state") + elsif !old_pipeline.configured_as_recoverable? + return new_failed_action("existing pipeline not configured to be recoverable (see: `pipeline.recoverable`)") + elsif (nrp = old_pipeline.non_reloadable_plugins) && !nrp.empty? + return new_failed_action("existing pipeline has non-reloadable plugins: #{nrp.map(&:readable_spec).join(', ')}") + end + + begin + pipeline_validator = AbstractPipeline.new(@pipeline_config, nil, logger, nil) + rescue => e + return ConvergeResult::FailedAction.from_exception(e) + end + + if !pipeline_validator.reloadable? + return new_failed_action("Cannot recover pipeline, because the new pipeline is not reloadable") + end + + logger.info("Recovering pipeline", "pipeline.id" => pipeline_id) + + success = pipelines_registry.reload_pipeline(pipeline_id) do + # important NOT to explicitly return from block here + # the block must emit a success boolean value + + # first cleanup the old pipeline + old_pipeline.shutdown + + # Then create a new pipeline + new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) + success = new_pipeline.start # block until the pipeline is correctly started or crashed + + # return success and new_pipeline to registry reload_pipeline + [success, new_pipeline] + end + pipelines_registry.states.get(pipeline_id)&.mark_recovery if success + + LogStash::ConvergeResult::ActionResult.create(self, success) + end + + end +end; end \ No newline at end of file diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 3752003477c..1d359199766 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -23,6 +23,7 @@ def initialize(pipeline_id, pipeline) @pipeline_id = pipeline_id @pipeline = pipeline @loading = Concurrent::AtomicBoolean.new(false) + @recovery_log = java.util.ArrayList.new # this class uses a reentrant lock to ensure thread safe visibility. @lock = Monitor.new @@ -76,6 +77,22 @@ def set_pipeline(pipeline) end end + def mark_recovery + @lock.synchronize do + now = java.time.Instant.now() + five_minutes_ago = now.minus(java.time.Duration::ofMinutes(5)) + @recovery_log.remove(0) until @recovery_log.isEmpty() || @recovery_log.get(0).isAfter(five_minutes_ago) + @recovery_log.add(now) + end + end + + def recovery_log + @lock.synchronize do + # returns an immutable copy of the recovery log + java.util.List::copyOf(@recovery_log) + end + end + def synchronize @lock.synchronize do yield self diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index 107a1319b95..6395058d819 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -155,6 +155,15 @@ def config_name self.class.config_name end + # to be overridden by type implementations + def self.plugin_type + "PLUGIN" + end + + def readable_spec + "[#{self.class.plugin_type}:#{config_name}](#{id})" + end + # This is keep for backward compatibility, the logic was moved into the registry class # but some plugins use this method to return a specific instance on lookup # diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index c5bfbb3ef58..e706792027b 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -60,6 +60,7 @@ def self.included(base) "pipeline.batch.size", "pipeline.id", "pipeline.reloadable", + "pipeline.recoverable", "pipeline.system", "pipeline.workers", "pipeline.ordered", diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb index efa6e44a6f2..ff009dfc7c3 100644 --- a/logstash-core/lib/logstash/state_resolver.rb +++ b/logstash-core/lib/logstash/state_resolver.rb @@ -34,10 +34,10 @@ def resolve(pipelines_registry, pipeline_configs) if pipeline.nil? actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric) - else - if pipeline_config != pipeline.pipeline_config - actions << LogStash::PipelineAction::Reload.new(pipeline_config, @metric) - end + elsif pipeline_config != pipeline.pipeline_config + actions << LogStash::PipelineAction::Reload.new(pipeline_config, @metric) + elsif pipeline.crashed? && !pipeline.running? && pipeline.configured_as_recoverable? + actions << LogStash::PipelineAction::Recover.new(pipeline_config, @metric) end end diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index f92fbf61a06..35722710648 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -539,6 +539,26 @@ public RubyArray nonReloadablePlugins(final ThreadContext context) { return result; } + @JRubyMethod(name = "configured_queue_type") + public final IRubyObject configuredQueueType(final ThreadContext context) { + return getSetting(context, "queue.type"); + } + + @JRubyMethod(name = "configured_as_recoverable?") + public final IRubyObject isConfiguredAsRecoverable(final ThreadContext context) { + final String recoverableSettingValue = getSetting(context, "pipeline.recoverable").asJavaString(); + final boolean result = switch (recoverableSettingValue) { + case "true" -> true; + case "false" -> false; + case "auto" -> getSetting(context, "queue.type").asJavaString().equals(QueueFactoryExt.PERSISTED_TYPE); + default -> { + LOGGER.warn("Unsupported `pipeline.recoverable` value {}; defaulting to `false`", recoverableSettingValue); + yield false; + } + }; + return result ? context.tru : context.fals; + } + @JRubyMethod(name = "collect_stats") public final IRubyObject collectStats(final ThreadContext context) throws IOException { final AbstractNamespacedMetricExt pipelineMetric = diff --git a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java index ce6a5ed10bc..92017c2238c 100644 --- a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java +++ b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java @@ -24,6 +24,11 @@ import org.logstash.instrument.metrics.MetricKeys; import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.OptionalDouble; @@ -42,6 +47,7 @@ public static PipelineIndicator forPipeline(final String pipelineId, PipelineIndicator pipelineIndicator = new PipelineIndicator(new DetailsSupplier(pipelineId, pipelineDetailsProvider)); pipelineIndicator.attachProbe("status", new StatusProbe()); pipelineIndicator.attachProbe("flow:worker_utilization", new FlowWorkerUtilizationProbe()); + pipelineIndicator.attachProbe("recovery", new RecoveryProbe()); return pipelineIndicator; } @@ -87,20 +93,35 @@ public void serialize(Status value, JsonGenerator gen, SerializerProvider serial public static class Details implements Observation { private final Status status; private final FlowObservation flow; + private final List recoveryLog; + + public Details(final Status status) { + this(status, null); + } public Details(Status status, FlowObservation flow) { - this.status = Objects.requireNonNull(status, "status cannot be null"); - this.flow = Objects.requireNonNullElse(flow, FlowObservation.EMPTY); + this(status, flow, List.of()); } - public Details(final Status status) { - this(status, null); + public Details(final Status status, + final FlowObservation flow, + final List recoveryLog) { + this.status = Objects.requireNonNull(status, "status cannot be null"); + this.flow = Objects.requireNonNullElse(flow, FlowObservation.EMPTY); + this.recoveryLog = List.copyOf(recoveryLog); } public Status getStatus() { return this.status; } - public FlowObservation getFlow() { return this.flow; } + + public FlowObservation getFlow() { + return this.flow; + } + + public List getRecoveryLog() { + return this.recoveryLog; + } public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer
{ @Override @@ -113,6 +134,15 @@ public void serialize(final Details details, if (flow != null && !flow.isEmpty()) { jsonGenerator.writeObjectField("flow", flow); } + List recoveryLog = details.getRecoveryLog(); + if (recoveryLog != null && !recoveryLog.isEmpty()) { + jsonGenerator.writeArrayFieldStart("recovery_log"); + final ZoneId zoneId = ZoneId.systemDefault(); + for (Instant instant : recoveryLog) { + jsonGenerator.writeString(OffsetDateTime.ofInstant(instant, zoneId).toString()); + } + jsonGenerator.writeEndArray(); + } jsonGenerator.writeEndObject(); } } @@ -333,4 +363,65 @@ static String impactId(final String state) { return String.format("logstash:health:pipeline:flow:impact:%s", state); } } + + static class RecoveryProbe implements Probe
{ + static final HelpUrl HELP_URL = new HelpUrl("health-report-pipeline-recovery"); + + + static final Impact.Builder INTERMITTENT_PROCESSING = Impact.builder() + .withId(impactId("intermittent_processing")) + .withDescription("pipeline recently recovered from a crash") + .withAdditionalImpactArea(ImpactArea.PIPELINE_EXECUTION); + + @Override + public Analysis analyze(Details observation) { + final List recoveryLog = observation.recoveryLog; + if (Objects.nonNull(recoveryLog) && !recoveryLog.isEmpty()) { + // if we've recovered more than 5 times in 5 minutes, set status to red + final Instant fiveMinutesAgo = Instant.now().minus(5, ChronoUnit.MINUTES); + final long recoveriesLastFiveMinutes = recoveryLog.stream().filter(fiveMinutesAgo::isBefore).count(); + if (recoveriesLastFiveMinutes >= 5) { + return Analysis.builder().withStatus(RED) + .withDiagnosis(db -> db + .withId(diagnosisId("5m-recovery-repeated")) + .withCause(diagnosisCause(recoveriesLastFiveMinutes, "5 minutes")) + .withAction("inspect logs to determine source of crash") + .withHelpUrl(HELP_URL.withAnchor("recovery-5m").toString()) + ) + .withImpact(INTERMITTENT_PROCESSING.withSeverity(1).build()) + .build(); + } + if (recoveriesLastFiveMinutes >= 1) { + return Analysis.builder().withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("5m-recovery-recent")) + .withCause(diagnosisCause(recoveriesLastFiveMinutes, "5 minutes")) + .withAction("inspect logs to determine source of crash") + .withHelpUrl(HELP_URL.withAnchor("recovery-5m").toString()) + ) + .withImpact(INTERMITTENT_PROCESSING.withSeverity(2).build()) + .build(); + } + } + + return Analysis.builder().build(); + } + + static String diagnosisCause(long recoveryCount, String period) { + return new StringBuilder() + .append("pipeline has recovered from crashes ") + .append(recoveryCount).append(recoveryCount == 1 ? " time " : " times " ) + .append("in the last ") + .append(period) + .toString(); + } + + static String diagnosisId(final String state) { + return String.format("logstash:health:pipeline:recovery:diagnosis:%s", state); + } + + static String impactId(final String state) { + return String.format("logstash:health:pipeline:recovery:impact:%s", state); + } + } }