Skip to content

Commit f0638f2

Browse files
yaauiedonoghuc
andcommitted
Support REACTIVE pipeline recovery with config.reload manager (elastic#18930)
* reload automatic: recover crashed pipelines during convergence * recovery: add health report probes * derp: fix invocation of failure_injector filter * PR feedback: - do not resolve recovery action until pipeline has settled into its crash state - capture a successful recovery as a successful reload in metrics * health tests: back out local branch changes * update recovery test assertions to use new $match helper * Apply suggestion from @yaauie * add logging, examples for `pipeline.recoverable` setting * recovery: clean up old pipeline * recovery: keep only last 5min of recovery log * Apply suggestions from code review Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com> --------- Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com>
1 parent 91bf043 commit f0638f2

File tree

18 files changed

+385
-19
lines changed

18 files changed

+385
-19
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
name: "Recovery in 1min pipeline"
2+
config:
3+
- pipeline.id: crashy-recovery-pp
4+
pipeline.recoverable: true
5+
config.string: |
6+
input { heartbeat { interval => 0.1 } }
7+
filter {
8+
ruby {
9+
'init' => '@crash_time = ::Time.now + 20'
10+
'code' => 'event.set("[@metadata][poison]", ::Time.now > @crash_time)'
11+
}
12+
if [@metadata][poison] {
13+
failure_injector { crash_at => filter }
14+
}
15+
}
16+
output { stdout {} }
17+
conditions:
18+
full_start_required: true
19+
wait_seconds: 35 # anticipate one crash 20 seconds after pipeline starts, not enough for two.
20+
expectation:
21+
status: "yellow"
22+
symptom: "1 indicator is concerning (`pipelines`)"
23+
indicators:
24+
pipelines:
25+
status: "yellow"
26+
symptom: "1 indicator is concerning (`crashy-recovery-pp`)"
27+
indicators:
28+
crashy-recovery-pp:
29+
status: "yellow"
30+
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
31+
diagnosis:
32+
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
33+
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
34+
action: "inspect logs to determine source of crash"
35+
help_url: { $include: "health-report-pipeline-recovery" }
36+
impacts:
37+
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
38+
severity: 2
39+
description: "pipeline recently recovered from a crash"
40+
impact_areas: [ "pipeline_execution" ]
41+
details:
42+
status:
43+
state: "RUNNING"
44+
recovery_log:
45+
- { $match: "{ISO8601}" }

config/logstash.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@
122122
#
123123
# config.reload.interval: 3s
124124
#
125+
# Whether to allow the pipeline reload feature to recover a pipeline that has crashed.
126+
# Requires `config.reload.automatic`. Pipeline cannot include non-reloadable plugins.
127+
# Accepts one of the following values:
128+
# - `false` (default): never recover after observing crash
129+
# - `auto`: recover if-and-only-if `queue.type` is set to `persisted`
130+
# - `true`: always recover after observing crash (WARNING: risk of data loss)
131+
#
132+
# pipeline.recoverable: false
133+
#
125134
# Show fully compiled configuration as debug log message
126135
# NOTE: --log.level must be 'debug'
127136
#

config/pipelines.yml

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,21 @@
4444
# # Default is 1000 (effectively disabled). Set to a lower value to enable chunking.
4545
# pipeline.batch.output_chunking.growth_threshold_factor: 1000
4646
#
47-
# Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".
48-
# "auto" automatically enables ordering if the 'pipeline.workers' setting
49-
# is also set to '1', and disables otherwise.
50-
# "true" enforces ordering on a pipeline and prevents logstash from starting
51-
# a pipeline with multiple workers allocated.
52-
# "false" disable any extra processing necessary for preserving ordering.
53-
#
47+
# # Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".
48+
# # "auto" automatically enables ordering if the 'pipeline.workers' setting
49+
# # is also set to '1', and disables otherwise.
50+
# # "true" enforces ordering on a pipeline and prevents logstash from starting
51+
# # a pipeline with multiple workers allocated.
52+
# # "false" disable any extra processing necessary for preserving ordering.
5453
# pipeline.ordered: auto
5554
#
55+
# # Configure this pipeline to be recoverable if it crashes and `config.reload.automatic` is enabled.
56+
# # Accepts one of the following values:
57+
# # - `false` (default): never recover after observing crash
58+
# # - `auto`: recover if-and-only-if `queue.type` is set to `persisted`
59+
# # - `true`: always recover after observing crash (WARNING: risk of data loss)
60+
# pipeline.recoverable: false
61+
#
5662
# # Internal queuing model, "memory" for legacy in-memory based queuing and
5763
# # "persisted" for disk-based acked queueing. Defaults is memory
5864
# queue.type: memory

docker/data/logstash/env2yaml/src/main/java/org/logstash/env2yaml/Env2Yaml.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private Map<String, String> buildSettingMap() {
4343
"node.name", "path.data", "pipeline.id", "pipeline.workers",
4444
"pipeline.output.workers", "pipeline.batch.size", "pipeline.batch.delay", "pipeline.batch.output_chunking.growth_threshold_factor",
4545
"pipeline.unsafe_shutdown", "pipeline.ecs_compatibility", "pipeline.ordered",
46+
"pipeline.recoverable", "pipeline.reloadable",
4647
"pipeline.plugin_classloaders", "pipeline.separate_logs", "path.config",
4748
"config.string", "config.test_and_exit", "config.reload.automatic",
4849
"config.reload.interval", "config.debug", "config.support_escapes",

docs/reference/logstash-settings-file.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ The `logstash.yml` file includes these settings.
5656
| `pipeline.unsafe_shutdown` | When set to `true`, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown. | `false` |
5757
| `pipeline.plugin_classloaders` | (Beta) Load Java plugins in independent classloaders to isolate their dependencies. | `false` |
5858
| `pipeline.ordered` | Set the pipeline event ordering. Valid options are:<br><br>* `auto`. Automatically enables ordering if the `pipeline.workers` setting is `1`, and disables otherwise.<br>* `true`. Enforces ordering on the pipeline and prevents Logstash from starting if there are multiple workers.<br>* `false`. Disables the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost of preserving order.<br> | `auto` |
59+
| `pipeline.recoverable` | Configure the pipeline to be eligible for automated recovery. Requires using `config.reload.automatic`. All plugins in the pipeline must be reloadable. Possible values are:<br><br>* `auto`: enable only if using a persisted queue type.<br>* `true`: always enable, even if queue is ephemeral (may cause data loss)<br>* `false`: never enable<br> | `false` |
5960
| `pipeline.ecs_compatibility` | Sets the pipeline’s default value for `ecs_compatibility`, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema. Possible values are:<br><br>* `disabled`<br>* `v1`<br>* `v8`<br><br>This option allows the [early opt-in (or preemptive opt-out) of ECS compatibility](/reference/ecs-ls.md) modes in plugins, which is scheduled to be on-by-default in a future major release of {{ls}}.<br><br>Values other than `disabled` are currently considered BETA, and may produce unintended consequences when upgrading {{ls}}.<br> | `disabled` |
6061
| `path.config` | The path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order. | Platform-specific. See [Logstash Directory Layout](/reference/dir-layout.md). |
6162
| `config.string` | A string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file. | *N/A* |

docs/static/spec/openapi/logstash-api.yaml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,6 +1741,12 @@ paths:
17411741
properties:
17421742
worker_utilization:
17431743
$ref: '#/components/schemas/FlowWindows'
1744+
recovery_log:
1745+
type: array
1746+
description: "If the pipeline has successfully recovered from a crash, include a list of timestamps"
1747+
items:
1748+
type: string
1749+
description: An ISO8601-compliant timestamp
17441750
impacts:
17451751
type: array
17461752
description: "If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on Logstash."
@@ -1860,6 +1866,37 @@ paths:
18601866
worker_utilization:
18611867
last_1_minute: 100.0
18621868
last_5_minutes: 100.0
1869+
recoveryCase:
1870+
description: "A pipeline has recently recovered from a crash"
1871+
value:
1872+
status: "yellow"
1873+
symptom: "1 indicator is unhealthy (`pipelines`)"
1874+
indicators:
1875+
pipelines:
1876+
status: "yellow"
1877+
symptom: "1 indicator is concerning (`crashy-pipeline`)"
1878+
indicators:
1879+
crashy-pipeline:
1880+
status: "yellow"
1881+
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
1882+
diagnosis:
1883+
- id: "logstash:health:pipeline:recovery:diagnosis:5m-recovery-recent"
1884+
cause: "pipeline has recovered from crashes 1 time in the last 5 minutes"
1885+
action: "inspect logs to determine source of crash"
1886+
impacts:
1887+
- id: "logstash:health:pipeline:recovery:impact:intermittent_processing"
1888+
severity: 2
1889+
description: "the pipeline recently recovered from a crash"
1890+
impact_areas: [ "pipeline_execution" ]
1891+
details:
1892+
status:
1893+
state: "RUNNING"
1894+
flow:
1895+
worker_utilization:
1896+
last_1_minute: 0.1
1897+
last_5_minutes: 0.1
1898+
recovery_log:
1899+
- "2026-04-01T21:12:04.498144Z"
18631900
x-metaTags:
18641901
- content: Logstash
18651902
name: product_name

logstash-core/lib/logstash/agent.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,9 @@ def pipeline_details(pipeline_id)
190190
else PipelineIndicator::Status::UNKNOWN
191191
end
192192

193-
PipelineIndicator::Details.new(status, sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation)
193+
PipelineIndicator::Details.new(status,
194+
sync_state.pipeline&.to_java.collectWorkerUtilizationFlowObservation,
195+
sync_state.recovery_log)
194196
end
195197
end
196198

@@ -564,7 +566,7 @@ def update_success_metrics(action, action_result)
564566
# When a pipeline is successfully created we create the metric
565567
# place holder related to the lifecycle of the pipeline
566568
initialize_pipeline_metrics(action)
567-
when LogStash::PipelineAction::Reload
569+
when LogStash::PipelineAction::Reload, LogStash::PipelineAction::Recover
568570
update_successful_reload_metrics(action, action_result)
569571
end
570572
end

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def self.as_java_range(r)
5757
Setting::PositiveIntegerSetting.new("pipeline.batch.output_chunking.growth_threshold_factor", 1000),
5858
Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false),
5959
Setting::BooleanSetting.new("pipeline.reloadable", true),
60+
Setting::CoercibleStringSetting.new("pipeline.recoverable", "false", true, %w(auto true false)),
6061
Setting::BooleanSetting.new("pipeline.plugin_classloaders", false),
6162
Setting::BooleanSetting.new("pipeline.separate_logs", false),
6263
Setting::CoercibleStringSetting.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),

logstash-core/lib/logstash/java_pipeline.rb

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,37 @@ def start_workers
270270
batch_output_chunking_growth_threshold_factor = settings.get("pipeline.batch.output_chunking.growth_threshold_factor")
271271
batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode")
272272

273+
274+
queue_type = settings.get('queue.type')
275+
queue_is_ephemeral = (queue_type == MEMORY)
276+
pipeline_recoverable = settings.get('pipeline.recoverable')
277+
pipeline_is_recoverable = case pipeline_recoverable
278+
when 'true' then true
279+
when 'false' then false
280+
when 'auto' then !queue_is_ephemeral
281+
end
282+
config_reload_automatic = settings.get('config.reload.automatic')
283+
284+
if pipeline_is_recoverable
285+
if !config_reload_automatic
286+
@logger.warn("Pipeline is configured to be recoverable with `pipeline.recoverable: #{pipeline_recoverable}`, " +
287+
"but config reloading has been disabled with `config.reload.automatic: #{config_reload_automatic}`; " +
288+
"if this pipeline crashes it will NOT be recovered.",
289+
default_logging_keys)
290+
elsif queue_is_ephemeral
291+
@logger.warn("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " +
292+
"with `pipeline.recoverable: #{pipeline_recoverable}`; " +
293+
"in the event of a crash in-flight events will be lost, " +
294+
"so enabling auto-recovery increases the risk of data loss.",
295+
default_logging_keys)
296+
else
297+
@logger.info("Pipeline with `queue.type: #{queue_type}` is configured to be recoverable " +
298+
"with `pipeline.recoverable: #{pipeline_recoverable}`; " +
299+
"in the event of a crash some in-flight events may be re-processed",
300+
default_logging_keys)
301+
end
302+
end
303+
273304
max_inflight = batch_size * pipeline_workers
274305

275306
config_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :config])
@@ -290,7 +321,8 @@ def start_workers
290321
"pipeline.batch.output_chunking.growth_threshold_factor" => batch_output_chunking_growth_threshold_factor,
291322
"pipeline.max_inflight" => max_inflight,
292323
"batch_metric_sampling" => batch_metric_sampling,
293-
"pipeline.sources" => pipeline_source_details)
324+
"pipeline.sources" => pipeline_source_details,
325+
"pipeline.recoverable" => pipeline_is_recoverable && config_reload_automatic)
294326
@logger.info("Starting pipeline", pipeline_log_params)
295327

296328
filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

logstash-core/lib/logstash/pipeline_action.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
require "logstash/pipeline_action/create"
2020
require "logstash/pipeline_action/stop"
2121
require "logstash/pipeline_action/reload"
22+
require "logstash/pipeline_action/recover"
2223
require "logstash/pipeline_action/delete"
2324
require "logstash/pipeline_action/stop_and_delete"
2425

2526
module LogStash module PipelineAction
2627
ORDERING = {
28+
LogStash::PipelineAction::Recover => 99,
2729
LogStash::PipelineAction::Create => 100,
2830
LogStash::PipelineAction::Reload => 200,
2931
LogStash::PipelineAction::Stop => 300,

0 commit comments

Comments
 (0)