Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .buildkite/scripts/health-report-tests/scenario_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
A class to execute the given scenario for Logstash Health Report integration test
"""
import time
import re
from logstash_health_report import LogstashHealthReport


Expand Down Expand Up @@ -31,7 +32,10 @@ def __get_difference(self, differences: list, expectations: dict, reports: dict)
return differences

if isinstance(expectations.get(key), str):
if expectations.get(key) != reports.get(key):
if expectations.get(key) == "$ISO8601$":
if not re.search("^[-0-9TZ.:]+$", reports.get(key)): # naive ISO8601 pattern
differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}})
elif expectations.get(key) != reports.get(key):
differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}})
continue
elif isinstance(expectations.get(key), dict):
Expand Down
44 changes: 44 additions & 0 deletions .buildkite/scripts/health-report-tests/tests/recovery-1m.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: "Recovery in 1min pipeline"
config:
- pipeline.id: crashy-recovery-pp
pipeline.recoverable: true
config.string: |
input { heartbeat { interval => 0.1 } }
filter {
ruby {
'init' => '@crash_time = ::Time.now + 20'
'code' => 'event.set("[@metadata][poison]", ::Time.now > @crash_time)'
}
if [@metadata][poison] {
failure_injector { crash_at => filter }
}
}
output { stdout {} }
conditions:
full_start_required: true
wait_seconds: 35 # anticipate one crash 20 seconds after pipeline starts, not enough for two.
expectation:
status: "yellow"
symptom: "1 indicator is concerning (`pipelines`)"
indicators:
pipelines:
status: "yellow"
symptom: "1 indicator is concerning (`crashy-recovery-pp`)"
indicators:
crashy-recovery-pp:
status: "yellow"
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
diagnosis:
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
action: "inspect logs to determine source of crash"
impacts:
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
severity: 2
description: "pipeline recently recovered from a crash"
impact_areas: [ "pipeline_execution" ]
details:
status:
state: "RUNNING"
recovery_log:
- "$ISO8601$"
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private Map<String, String> buildSettingMap() {
"node.name", "path.data", "pipeline.id", "pipeline.workers",
"pipeline.output.workers", "pipeline.batch.size", "pipeline.batch.delay", "pipeline.batch.output_chunking.growth_threshold_factor",
"pipeline.unsafe_shutdown", "pipeline.ecs_compatibility", "pipeline.ordered",
"pipeline.recoverable", "pipeline.reloadable",
"pipeline.plugin_classloaders", "pipeline.separate_logs", "path.config",
"config.string", "config.test_and_exit", "config.reload.automatic",
"config.reload.interval", "config.debug", "config.support_escapes",
Expand Down
1 change: 1 addition & 0 deletions docs/reference/logstash-settings-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
| `pipeline.unsafe_shutdown` | When set to `true`, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown. | `false` |
| `pipeline.plugin_classloaders` | (Beta) Load Java plugins in independent classloaders to isolate their dependencies. | `false` |
| `pipeline.ordered` | Set the pipeline event ordering. Valid options are:<br><br>* `auto`. Automatically enables ordering if the `pipeline.workers` setting is `1`, and disables otherwise.<br>* `true`. Enforces ordering on the pipeline and prevents Logstash from starting if there are multiple workers.<br>* `false`. Disables the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost of preserving order.<br> | `auto` |
| `pipeline.recoverable` | Configure the pipeline to be eligible for automated recovery. Requires using `config.reload.automatic`. Possible values are:<br><br>* `auto`: enable only if using a persisted queue type.<br>* `true`: always enable, even if queue is ephemeral (may cause data loss)<br>* `false`: never enable<br> | `false` |
| `pipeline.ecs_compatibility` | Sets the pipeline’s default value for `ecs_compatibility`, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema. Possible values are:<br><br>* `disabled`<br>* `v1`<br>* `v8`<br><br>This option allows the [early opt-in (or preemptive opt-out) of ECS compatibility](/reference/ecs-ls.md) modes in plugins, which is scheduled to be on-by-default in a future major release of {{ls}}.<br><br>Values other than `disabled` are currently considered BETA, and may produce unintended consequences when upgrading {{ls}}.<br> | `disabled` |

Check notice on line 60 in docs/reference/logstash-settings-file.md

View workflow job for this annotation

GitHub Actions / build / vale

Elastic.WordChoice: Consider using 'can, might' instead of 'may', unless the term is in the UI.
| `path.config` | The path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). |
| `config.string` | A string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file. | *N/A* |
| `config.test_and_exit` | When set to `true`, checks that the configuration is valid and then exits. Note that grok patterns are not checked for correctness with this setting. Logstash can read multiple config files from a directory. If you combine this setting with `log.level: debug`, Logstash will log the combined config file, annotating each config block with the source file it came from. | `false` |
Expand Down
37 changes: 37 additions & 0 deletions docs/static/spec/openapi/logstash-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,12 @@ paths:
properties:
worker_utilization:
$ref: '#/components/schemas/FlowWindows'
recovery_log:
type: array
description: "If the pipeline has successfully recovered from a crash, include a list of timestamps"
items:
type: string
description: An ISO8601-compliant timestamp
impacts:
type: array
description: "If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on Logstash."
Expand Down Expand Up @@ -1860,6 +1866,37 @@ paths:
worker_utilization:
last_1_minute: 100.0
last_5_minutes: 100.0
recoveryCase:
description: "A pipeline has recently recovered from a crash"
value:
status: "yellow"
symptom: "1 indicator is unhealthy (`pipelines`)"
indicators:
pipelines:
status: "yellow"
symptom: "1 indicator is concerning (`crashy-pipeline`)"
indicators:
crashy-pipeline:
status: "yellow"
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
diagnosis:
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
action: "inspect logs to determine source of crash"
impacts:
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
severity: 2
description: "the pipeline recently recovered from a crash"
impact_areas: [ "pipeline_execution" ]
details:
status:
state: "RUNNING"
flow:
worker_utilization:
last_1_minute: 0.1
last_5_minutes: 0.1
recovery_log:
- "2026-04-01T21:12:04.498144Z"
x-metaTags:
- content: Logstash
name: product_name
Expand Down
6 changes: 4 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ def pipeline_details(pipeline_id)
else PipelineIndicator::Status::UNKNOWN
end

PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation)
PipelineIndicator::Details.new(status,
sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation,
sync_state.recovery_log)
end
end

Expand Down Expand Up @@ -564,7 +566,7 @@ def update_success_metrics(action, action_result)
# When a pipeline is successfully created we create the metric
# place holder related to the lifecycle of the pipeline
initialize_pipeline_metrics(action)
when LogStash::PipelineAction::Reload
when LogStash::PipelineAction::Reload, LogStash::PipelineAction::Recover
update_successful_reload_metrics(action, action_result)
end
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def self.as_java_range(r)
Setting::PositiveIntegerSetting.new("pipeline.batch.output_chunking.growth_threshold_factor", 1000),
Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false),
Setting::BooleanSetting.new("pipeline.reloadable", true),
Setting::CoercibleStringSetting.new("pipeline.recoverable", "false", true, %w(auto true false)),
Setting::BooleanSetting.new("pipeline.plugin_classloaders", false),
Setting::BooleanSetting.new("pipeline.separate_logs", false),
Setting::CoercibleStringSetting.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/pipeline_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
require "logstash/pipeline_action/create"
require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
require "logstash/pipeline_action/recover"
require "logstash/pipeline_action/delete"
require "logstash/pipeline_action/stop_and_delete"

module LogStash module PipelineAction
ORDERING = {
LogStash::PipelineAction::Recover => 99,
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300,
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ def <=>(other)
order = self.execution_priority <=> other.execution_priority
order.nonzero? ? order : self.pipeline_id <=> other.pipeline_id
end

def new_failed_action(reason)
ConvergeResult::FailedAction.from_action(self, reason);
end

def new_success_action
ConvergeResult::SuccessfulAction.new
end
end
end end
81 changes: 81 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/recover.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/pipeline_action/base"
require "logstash/java_pipeline"

module LogStash module PipelineAction
class Recover < Base
include LogStash::Util::Loggable

def initialize(pipeline_config, metric)
@pipeline_config = pipeline_config
@metric = metric
end

def pipeline_id
@pipeline_config.pipeline_id.to_sym
end

def to_s
"PipelineAction::Recover<#{pipeline_id}>"
end

def execute(agent, pipelines_registry)
old_pipeline = pipelines_registry.get_pipeline(pipeline_id)

# guard with descriptive errors
if old_pipeline.nil?
return new_failed_action("pipeline does not exist")
elsif old_pipeline.running? || !old_pipeline.crashed?
return new_failed_action("existing pipeline is not in a settled crashed state")
elsif !old_pipeline.configured_as_recoverable?
return new_failed_action("existing pipeline not configured to be recoverable (see: `pipeline.recoverable`)")
elsif (nrp = old_pipeline.non_reloadable_plugins) && !nrp.empty?
return new_failed_action("existing pipeline has non-reloadable plugins: #{nrp.map(&:readable_spec).join(', ')}")
end

begin
pipeline_validator = AbstractPipeline.new(@pipeline_config, nil, logger, nil)
rescue => e
return ConvergeResult::FailedAction.from_exception(e)
end

if !pipeline_validator.reloadable?
return new_failed_action("Cannot recover pipeline, because the new pipeline is not reloadable")
end

logger.info("Recovering pipeline", "pipeline.id" => pipeline_id)

success = pipelines_registry.reload_pipeline(pipeline_id) do
# important NOT to explicitly return from block here
# the block must emit a success boolean value

# Then create a new pipeline
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
[success, new_pipeline]
end
pipelines_registry.states.get(pipeline_id)&.mark_recovery if success

LogStash::ConvergeResult::ActionResult.create(self, success)
end

end
end; end
14 changes: 14 additions & 0 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(pipeline_id, pipeline)
@pipeline_id = pipeline_id
@pipeline = pipeline
@loading = Concurrent::AtomicBoolean.new(false)
@recovery_log = java.util.ArrayList.new

# this class uses a reentrant lock to ensure thread safe visibility.
@lock = Monitor.new
Expand Down Expand Up @@ -76,6 +77,19 @@ def set_pipeline(pipeline)
end
end

def mark_recovery
@lock.synchronize do
@recovery_log.add(java.time.Instant.now)
end
end

def recovery_log
@lock.synchronize do
# returns an immutable copy of the recovery log
java.util.List::copyOf(@recovery_log)
end
end

def synchronize
@lock.synchronize do
yield self
Expand Down
9 changes: 9 additions & 0 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ def config_name
self.class.config_name
end

# to be overridden by type implementations
def self.plugin_type
"PLUGIN"
end

def readable_spec
"[#{self.class.plugin_type}:#{config_name}](#{id})"
end

# This is keep for backward compatibility, the logic was moved into the registry class
# but some plugins use this method to return a specific instance on lookup
#
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def self.included(base)
"pipeline.batch.size",
"pipeline.id",
"pipeline.reloadable",
"pipeline.recoverable",
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/state_resolver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def resolve(pipelines_registry, pipeline_configs)

if pipeline.nil?
actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric)
else
if pipeline_config != pipeline.pipeline_config
actions << LogStash::PipelineAction::Reload.new(pipeline_config, @metric)
end
elsif pipeline_config != pipeline.pipeline_config
actions << LogStash::PipelineAction::Reload.new(pipeline_config, @metric)
elsif pipeline.crashed? && !pipeline.running? && pipeline.configured_as_recoverable?
actions << LogStash::PipelineAction::Recover.new(pipeline_config, @metric)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,26 @@ public RubyArray nonReloadablePlugins(final ThreadContext context) {
return result;
}

@JRubyMethod(name = "configured_queue_type")
public final IRubyObject configuredQueueType(final ThreadContext context) {
return getSetting(context, "queue.type");
}

@JRubyMethod(name = "configured_as_recoverable?")
public final IRubyObject isConfiguredAsRecoverable(final ThreadContext context) {
final String recoverableSettingValue = getSetting(context, "pipeline.recoverable").asJavaString();
final boolean result = switch (recoverableSettingValue) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we warn/fail loading when config.reload.automatic + true or auto is set?

Copy link
Copy Markdown
Member Author

@yaauie yaauie Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in f7ae601:

  • queue.type: memory + pipeline.recoverable: auto + config.reload.automatic: true:
    resolves to false, no warning
    [2026-04-07T20:21:04,173][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => false, thread: "#<Thread:0x1dcd1110 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: persisted + pipeline.recoverable: auto + config.reload.automatic: true
    resolves to true; informational message
    [2026-04-07T20:21:58,949][INFO ][logstash.javapipeline    ][main] Pipeline with `queue.type: persisted` is configured to be recoverable with `pipeline.recoverable: auto`; in the event of a crash some in-flight events may be re-processed {pipeline_id: "main", thread: "#<Thread:0x4c14afd /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:21:58,952][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x4c14afd /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: memory + pipeline.recoverable: true + config.reload.automatic: true
    logs warning about automating data loss
    [2026-04-07T20:22:43,473][WARN ][logstash.javapipeline    ][main] Pipeline with `queue.type: memory` is configured to be recoverable with `pipeline.recoverable: true`; in the event of a crash in-flight events will be lost, so enabling auto-recovery increases the risk of data loss. {pipeline_id: "main", thread: "#<Thread:0x58a27296 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:22:43,477][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x58a27296 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: persisted + pipeline.recoverable: true + config.reload.automatic: true
    logs info about possible re-processing
    [2026-04-07T20:23:27,604][INFO ][logstash.javapipeline    ][main] Pipeline with `queue.type: persisted` is configured to be recoverable with `pipeline.recoverable: true`; in the event of a crash some in-flight events may be re-processed {pipeline_id: "main", thread: "#<Thread:0x4f6df6d3 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:23:27,609][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x4f6df6d3 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • pipeline.recoverable: true + config.reload.automatic: false
    logs warning about not actually being recoverable
    [2026-04-07T20:26:18,539][WARN ][logstash.javapipeline    ][main] Pipeline is configured to be recoverable with `pipeline.recoverable: true`, but config reloading has been disabled with `config.reload.automatic: false`; if this pipeline crashes it will NOT be recovered. {pipeline_id: "main", thread: "#<Thread:0x1007a0c /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:26:18,544][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => false, thread: "#<Thread:0x1007a0c /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    

case "true" -> true;
case "false" -> false;
case "auto" -> getSetting(context, "queue.type").asJavaString().equals(QueueFactoryExt.PERSISTED_TYPE);
default -> {
LOGGER.warn("Unsupported `pipeline.recoverable` value {}; defaulting to `false`", recoverableSettingValue);
yield false;
}
};
return result ? context.tru : context.fals;
}

@JRubyMethod(name = "collect_stats")
public final IRubyObject collectStats(final ThreadContext context) throws IOException {
final AbstractNamespacedMetricExt pipelineMetric =
Expand Down
Loading
Loading