Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .buildkite/scripts/health-report-tests/tests/recovery-1m.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: "Recovery in 1min pipeline"
config:
- pipeline.id: crashy-recovery-pp
pipeline.recoverable: true
config.string: |
input { heartbeat { interval => 0.1 } }
filter {
ruby {
'init' => '@crash_time = ::Time.now + 20'
'code' => 'event.set("[@metadata][poison]", ::Time.now > @crash_time)'
}
if [@metadata][poison] {
failure_injector { crash_at => filter }
}
}
output { stdout {} }
conditions:
full_start_required: true
wait_seconds: 35 # anticipate one crash 20 seconds after pipeline starts, not enough for two.
expectation:
status: "yellow"
symptom: "1 indicator is concerning (`pipelines`)"
indicators:
pipelines:
status: "yellow"
symptom: "1 indicator is concerning (`crashy-recovery-pp`)"
indicators:
crashy-recovery-pp:
status: "yellow"
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
diagnosis:
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
action: "inspect logs to determine source of crash"
help_url: { $include: "health-report-pipeline-recovery" }
impacts:
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
severity: 2
description: "pipeline recently recovered from a crash"
impact_areas: [ "pipeline_execution" ]
details:
status:
state: "RUNNING"
recovery_log:
- { $match: "{ISO8601}" }
9 changes: 9 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@
#
# config.reload.interval: 3s
#
# Whether to allow the pipeline reload feature to recover a pipeline that has crashed.
# Requires `config.reload.automatic`. Pipeline cannot include non-reloadable plugins.
# Accepts one of the following values:
# - `false` (default): never recover after observing crash
# - `auto`: recover if-and-only-if `queue.type` is set to `memory`
# - `true`: always recover after observing crash (WARNING: risk of data loss)
#
# config.recoverable: false
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
Expand Down
20 changes: 13 additions & 7 deletions config/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,21 @@
# # Default is 1000 (effectively disabled). Set to a lower value to enable chunking.
# pipeline.batch.output_chunking.growth_threshold_factor: 1000
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on a pipeline and prevents logstash from starting
# a pipeline with multiple workers allocated.
# "false" disable any extra processing necessary for preserving ordering.
#
# # Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".
# # "auto" automatically enables ordering if the 'pipeline.workers' setting
# # is also set to '1', and disables otherwise.
# # "true" enforces ordering on a pipeline and prevents logstash from starting
# # a pipeline with multiple workers allocated.
# # "false" disable any extra processing necessary for preserving ordering.
# pipeline.ordered: auto
#
# # Configure this pipeline to be recoverable if it crashes and `config.reload.automatic` is enabled.
# # Accepts one of the following values:
# # - `false` (default): never recover after observing crash
# # - `auto`: recover if-and-only-if `queue.type` is set to `memory`
# # - `true`: always recover after observing crash (WARNING: risk of data loss)
# pipeline.recoverable: false
#
# # Internal queuing model, "memory" for legacy in-memory based queuing and
# # "persisted" for disk-based acked queueing. Defaults is memory
# queue.type: memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private Map<String, String> buildSettingMap() {
"node.name", "path.data", "pipeline.id", "pipeline.workers",
"pipeline.output.workers", "pipeline.batch.size", "pipeline.batch.delay", "pipeline.batch.output_chunking.growth_threshold_factor",
"pipeline.unsafe_shutdown", "pipeline.ecs_compatibility", "pipeline.ordered",
"pipeline.recoverable", "pipeline.reloadable",
"pipeline.plugin_classloaders", "pipeline.separate_logs", "path.config",
"config.string", "config.test_and_exit", "config.reload.automatic",
"config.reload.interval", "config.debug", "config.support_escapes",
Expand Down
1 change: 1 addition & 0 deletions docs/reference/logstash-settings-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
| `pipeline.unsafe_shutdown` | When set to `true`, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown. | `false` |
| `pipeline.plugin_classloaders` | (Beta) Load Java plugins in independent classloaders to isolate their dependencies. | `false` |
| `pipeline.ordered` | Set the pipeline event ordering. Valid options are:<br><br>* `auto`. Automatically enables ordering if the `pipeline.workers` setting is `1`, and disables otherwise.<br>* `true`. Enforces ordering on the pipeline and prevents Logstash from starting if there are multiple workers.<br>* `false`. Disables the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost of preserving order.<br> | `auto` |
| `pipeline.recoverable` | Configure the pipeline to be eligible for automated recovery. Requires using `config.reload.automatic`. All plugins in the pipeline must be reloadable. Possible values are:<br><br>* `auto`: enable only if using a persisted queue type.<br>* `true`: always enable, even if queue is ephemeral (may cause data loss)<br>* `false`: never enable<br> | `false` |
| `pipeline.ecs_compatibility` | Sets the pipeline’s default value for `ecs_compatibility`, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema. Possible values are:<br><br>* `disabled`<br>* `v1`<br>* `v8`<br><br>This option allows the [early opt-in (or preemptive opt-out) of ECS compatibility](/reference/ecs-ls.md) modes in plugins, which is scheduled to be on-by-default in a future major release of {{ls}}.<br><br>Values other than `disabled` are currently considered BETA, and may produce unintended consequences when upgrading {{ls}}.<br> | `disabled` |

Check notice on line 60 in docs/reference/logstash-settings-file.md

View workflow job for this annotation

GitHub Actions / build / vale

Elastic.WordChoice: Consider using 'can, might' instead of 'may', unless the term is in the UI.
| `path.config` | The path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). |
| `config.string` | A string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file. | *N/A* |
| `config.test_and_exit` | When set to `true`, checks that the configuration is valid and then exits. Note that grok patterns are not checked for correctness with this setting. Logstash can read multiple config files from a directory. If you combine this setting with `log.level: debug`, Logstash will log the combined config file, annotating each config block with the source file it came from. | `false` |
Expand Down
37 changes: 37 additions & 0 deletions docs/static/spec/openapi/logstash-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,12 @@ paths:
properties:
worker_utilization:
$ref: '#/components/schemas/FlowWindows'
recovery_log:
type: array
description: "If the pipeline has successfully recovered from a crash, include a list of timestamps"
items:
type: string
description: An ISO8601-compliant timestamp
impacts:
type: array
description: "If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on Logstash."
Expand Down Expand Up @@ -1860,6 +1866,37 @@ paths:
worker_utilization:
last_1_minute: 100.0
last_5_minutes: 100.0
recoveryCase:
description: "A pipeline has recently recovered from a crash"
value:
status: "yellow"
symptom: "1 indicator is unhealthy (`pipelines`)"
indicators:
pipelines:
status: "yellow"
symptom: "1 indicator is concerning (`crashy-pipeline`)"
indicators:
crashy-pipeline:
status: "yellow"
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
diagnosis:
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
action: "inspect logs to determine source of crash"
impacts:
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
severity: 2
description: "the pipeline recently recovered from a crash"
impact_areas: [ "pipeline_execution" ]
details:
status:
state: "RUNNING"
flow:
worker_utilization:
last_1_minute: 0.1
last_5_minutes: 0.1
recovery_log:
- "2026-04-01T21:12:04.498144Z"
x-metaTags:
- content: Logstash
name: product_name
Expand Down
6 changes: 4 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ def pipeline_details(pipeline_id)
else PipelineIndicator::Status::UNKNOWN
end

PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation)
PipelineIndicator::Details.new(status,
sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation,
sync_state.recovery_log)
end
end

Expand Down Expand Up @@ -564,7 +566,7 @@ def update_success_metrics(action, action_result)
# When a pipeline is successfully created we create the metric
# place holder related to the lifecycle of the pipeline
initialize_pipeline_metrics(action)
when LogStash::PipelineAction::Reload
when LogStash::PipelineAction::Reload, LogStash::PipelineAction::Recover
update_successful_reload_metrics(action, action_result)
end
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def self.as_java_range(r)
Setting::PositiveIntegerSetting.new("pipeline.batch.output_chunking.growth_threshold_factor", 1000),
Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false),
Setting::BooleanSetting.new("pipeline.reloadable", true),
Setting::CoercibleStringSetting.new("pipeline.recoverable", "false", true, %w(auto true false)),
Setting::BooleanSetting.new("pipeline.plugin_classloaders", false),
Setting::BooleanSetting.new("pipeline.separate_logs", false),
Setting::CoercibleStringSetting.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Expand Down
34 changes: 33 additions & 1 deletion logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,37 @@ def start_workers
batch_output_chunking_growth_threshold_factor = settings.get("pipeline.batch.output_chunking.growth_threshold_factor")
batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode")


queue_type = settings.get('queue.type')
queue_is_ephemeral = (queue_type == MEMORY)
pipeline_recoverable = settings.get('pipeline.recoverable')
pipeline_is_recoverable = case pipeline_recoverable
when 'true' then true
when 'false' then false
when 'auto' then !queue_is_ephemeral
end
config_reload_automatic = settings.get('config.reload.automatic')

if pipeline_is_recoverable
if !config_reload_automatic
@logger.warn("Pipeline is configured to be recoverable with `pipeline.recoverable: #{pipeline_recoverable}`, " +
"but config reloading has been disabled with `config.reload.automatic: #{config_reload_automatic}`; " +
"if this pipeline crashes it will NOT be recovered.",
default_logging_keys)
elsif queue_is_ephemeral
@logger.warn("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " +
"with `pipeline.recoverable: #{pipeline_recoverable}`; " +
"in the event of a crash in-flight events will be lost, " +
"so enabling auto-recovery increases the risk of data loss.",
default_logging_keys)
else
@logger.info("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " +
"with `pipeline.recoverable: #{pipeline_recoverable}`; " +
"in the event of a crash some in-flight events may be re-processed",
default_logging_keys)
end
end

max_inflight = batch_size * pipeline_workers

config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config])
Expand All @@ -290,7 +321,8 @@ def start_workers
"pipeline.batch.output_chunking.growth_threshold_factor" => batch_output_chunking_growth_threshold_factor,
"pipeline.max_inflight" => max_inflight,
"batch_metric_sampling" => batch_metric_sampling,
"pipeline.sources" => pipeline_source_details)
"pipeline.sources" => pipeline_source_details,
"pipeline.recoverable" => pipeline_is_recoverable && config_reload_automatic)
@logger.info("Starting pipeline", pipeline_log_params)

filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/pipeline_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
require "logstash/pipeline_action/create"
require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
require "logstash/pipeline_action/recover"
require "logstash/pipeline_action/delete"
require "logstash/pipeline_action/stop_and_delete"

module LogStash module PipelineAction
ORDERING = {
LogStash::PipelineAction::Recover => 99,
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300,
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ def <=>(other)
order = self.execution_priority <=> other.execution_priority
order.nonzero? ? order : self.pipeline_id <=> other.pipeline_id
end

def new_failed_action(reason)
ConvergeResult::FailedAction.from_action(self, reason);
end

def new_success_action
ConvergeResult::SuccessfulAction.new
end
end
end end
84 changes: 84 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/recover.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/pipeline_action/base"
require "logstash/java_pipeline"

module LogStash module PipelineAction
class Recover < Base
include LogStash::Util::Loggable

def initialize(pipeline_config, metric)
@pipeline_config = pipeline_config
@metric = metric
end

def pipeline_id
@pipeline_config.pipeline_id.to_sym
end

def to_s
"PipelineAction::Recover<#{pipeline_id}>"
end

def execute(agent, pipelines_registry)
old_pipeline = pipelines_registry.get_pipeline(pipeline_id)

# guard with descriptive errors
if old_pipeline.nil?
return new_failed_action("pipeline does not exist")
elsif old_pipeline.running? || !old_pipeline.crashed?
return new_failed_action("existing pipeline is not in a settled crashed state")
elsif !old_pipeline.configured_as_recoverable?
return new_failed_action("existing pipeline not configured to be recoverable (see: `pipeline.recoverable`)")
elsif (nrp = old_pipeline.non_reloadable_plugins) && !nrp.empty?
return new_failed_action("existing pipeline has non-reloadable plugins: #{nrp.map(&:readable_spec).join(', ')}")
end

begin
pipeline_validator = AbstractPipeline.new(@pipeline_config, nil, logger, nil)
rescue => e
return ConvergeResult::FailedAction.from_exception(e)
end

if !pipeline_validator.reloadable?
return new_failed_action("Cannot recover pipeline, because the new pipeline is not reloadable")
end

logger.info("Recovering pipeline", "pipeline.id" => pipeline_id)

success = pipelines_registry.reload_pipeline(pipeline_id) do
# important NOT to explicitly return from block here
# the block must emit a success boolean value

# first cleanup the old pipeline
old_pipeline.shutdown

# Then create a new pipeline
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
[success, new_pipeline]
end
pipelines_registry.states.get(pipeline_id)&.mark_recovery if success

LogStash::ConvergeResult::ActionResult.create(self, success)
end

end
end; end
17 changes: 17 additions & 0 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(pipeline_id, pipeline)
@pipeline_id = pipeline_id
@pipeline = pipeline
@loading = Concurrent::AtomicBoolean.new(false)
@recovery_log = java.util.ArrayList.new

# this class uses a reentrant lock to ensure thread safe visibility.
@lock = Monitor.new
Expand Down Expand Up @@ -76,6 +77,22 @@ def set_pipeline(pipeline)
end
end

def mark_recovery
@lock.synchronize do
now = java.time.Instant.now()
five_minutes_ago = now.minus(java.time.Duration::ofMinutes(5))
@recovery_log.remove(0) until @recovery_log.isEmpty() || @recovery_log.get(0).isAfter(five_minutes_ago)
@recovery_log.add(now)
end
end

def recovery_log
@lock.synchronize do
# returns an immutable copy of the recovery log
java.util.List::copyOf(@recovery_log)
end
end

def synchronize
@lock.synchronize do
yield self
Expand Down
9 changes: 9 additions & 0 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ def config_name
self.class.config_name
end

# to be overridden by type implementations
def self.plugin_type
"PLUGIN"
end

def readable_spec
"[#{self.class.plugin_type}:#{config_name}](#{id})"
end

# This is keep for backward compatibility, the logic was moved into the registry class
# but some plugins use this method to return a specific instance on lookup
#
Expand Down
Loading
Loading