Skip to content

Support REACTIVE pipeline recovery with config.reload manager#18930

Merged
yaauie merged 12 commits intoelastic:mainfrom
yaauie:pipeline-recovery-reactive
Apr 7, 2026
Merged

Support REACTIVE pipeline recovery with config.reload manager#18930
yaauie merged 12 commits intoelastic:mainfrom
yaauie:pipeline-recovery-reactive

Conversation

@yaauie
Copy link
Copy Markdown
Member

@yaauie yaauie commented Apr 1, 2026

Release notes

  • Adds opt-in crashed-pipeline recovery with a new pipeline.recovery option that works when config.reload.automatic is enabled and accepts the following values:
    • auto: recovers crashed pipelines that are backed by the persistent queue
    • false (default): do not automate recovery of crashed pipelines
    • true: recovers all crashed pipelines, even if backed by the ephemeral memory queue (risk: data loss)

What does this PR do?

  • Enables the config-reload facility to observe crashed pipelines and recover (restart) them.
  • Exposes recent crashed-pipeline recoveries in the health report
    • 5+ crash recoveries in the last 5 minutes is RED (critical)
    • 1 crash recovery in the last 5 minutes is YELLOW (concerning)

Why is it important/What is the impact to the user?

While pipeline crashes are rare (typically caused by a plugin crashing while handling events), in some cases users running managed pipelines would prefer that the pipeline be automatically restarted.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

With the following crashy.conf pipeline definition:

input {
  heartbeat { interval => 1 }
}
filter {
  ruby {
    id => "signal"
    init => '@crash_time = ::Time.now + 20; $stderr.puts("=== EXPECT CRASH AT #{@crash_time}")'
    code => 'event.set("[@metadata][poison]", ::Time.now > @crash_time)'
  }
  if [@metadata][poison] {
    # emulate the failure-injector plugin by escaping ruby filter's error handling
    ruby {
      id => "pseudo:failure-injector"
      init => '@poison = Class.new(::RuntimeError) { def message; fail("poison"); end; };'
      code => 'raise @poison.new'
    }
  }
}
output {
  stdout { codec => rubydebug }
}
  1. Checkout the branch and build Logstash normally:
    ./gradlew clean assemble installDefaultGems
    
  2. Invoke Logstash with reloading enabled and the pipeline flagged as recoverable:
    bin/logstash -Sconfig.reload.automatic=true -Spipeline.recoverable=true -f crashy.conf
    
  3. Wait ~30s for first crash
  4. Observe pipeline recovers successfully
  5. Validate degraded status in health report:
    curl --silent 'localhost:9600/_health_report?pretty=true' | jq
    
  6. Wait ~2min until there have been 5 crashes, then observe health report is further degraded:
    curl --silent 'localhost:9600/_health_report?pretty=true' | jq
    

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 1, 2026

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • run exhaustive tests : Run the exhaustive tests Buildkite pipeline.

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Apr 1, 2026

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 1, 2026

🔍 Preview links for changed docs

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 1, 2026

Vale Linting Results

Summary: 1 suggestion found

💡 Suggestions (1)
File Line Rule Message
docs/reference/logstash-settings-file.md 60 Elastic.WordChoice Consider using 'can, might' instead of 'may', unless the term is in the UI.

The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@jsvd
Copy link
Copy Markdown
Member

jsvd commented Apr 2, 2026

❯ curl -s 'localhost:9600/_node/stats/pipelines/main' | jq .pipelines.main.reloads
{
  "last_error": {
    "message": "Could not execute action: PipelineAction::Recover<main>, action_result: existing pipeline is not in a settled crashed state",
    "backtrace": null
  },
  "last_success_timestamp": null,
  "successes": 0,
  "last_failure_timestamp": "2026-04-02T14:43:32.935629Z",
  "failures": 23
}

This is odd, given that the crashy.conf does allow the pipeline to boot up but occasionally crash. so seeing reload.failures: 23 vs reload.successes: 0 doesn't seem correct.

Comment on lines +39 to +40
elsif pipeline.crashed? && pipeline.configured_as_recoverable?
actions << LogStash::PipelineAction::Recover.new(pipeline_config, @metric)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that a PipelineAction::Recover isn't initiated until the crash has fully settled.

Suggested change
elsif pipeline.crashed? && pipeline.configured_as_recoverable?
actions << LogStash::PipelineAction::Recover.new(pipeline_config, @metric)
elsif pipeline.crashed? && !pipeline.running? && pipeline.configured_as_recoverable?
actions << LogStash::PipelineAction::Recover.new(pipeline_config, @metric)

@yaauie
Copy link
Copy Markdown
Member Author

yaauie commented Apr 3, 2026

❯ curl -s 'localhost:9600/_node/stats/pipelines/main' | jq .pipelines.main.reloads
{
  "last_error": {
    "message": "Could not execute action: PipelineAction::Recover<main>, action_result: existing pipeline is not in a settled crashed state",
    "backtrace": null
  },
  "last_success_timestamp": null,
  "successes": 0,
  "last_failure_timestamp": "2026-04-02T14:43:32.935629Z",
  "failures": 23
}

This is odd, given that the crashy.conf does allow the pipeline to boot up but occasionally crash. so seeing reload.failures: 23 vs reload.successes: 0 doesn't seem correct.

I failed to capture this in Agent#update_success_metrics. Since we are effectively reloading, I'm going to consider it a reload for the sake of these metrics.

If we also avoid kicking off the action until the pipeline has settled its crash, we will avoid seeing the failures there, too.

 - do not resolve recovery action until pipeline has settled into
   its crash state
 - capture a successful recovery as a successful reload in metrics
Copy link
Copy Markdown
Member Author

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test will need minor change following the landing of #18937 refactor of the testing assertions implementation.

@robbavey
Copy link
Copy Markdown
Member

robbavey commented Apr 6, 2026

@yaauie Can you move this PR out of draft please?

@yaauie yaauie marked this pull request as ready for review April 6, 2026 22:25
@yaauie
Copy link
Copy Markdown
Member Author

yaauie commented Apr 6, 2026

Health report tests against f02a99a are green here

@JRubyMethod(name = "configured_as_recoverable?")
public final IRubyObject isConfiguredAsRecoverable(final ThreadContext context) {
final String recoverableSettingValue = getSetting(context, "pipeline.recoverable").asJavaString();
final boolean result = switch (recoverableSettingValue) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we warn/fail loading when config.reload.automatic + true or auto is set?

Copy link
Copy Markdown
Member Author

@yaauie yaauie Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in f7ae601:

  • queue.type: memory + pipeline.recoverable: auto + config.reload.automatic: true:
    resolves to false, no warning
    [2026-04-07T20:21:04,173][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => false, thread: "#<Thread:0x1dcd1110 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: persisted + pipeline.recoverable: auto + config.reload.automatic: true
    resolves to true; informational message
    [2026-04-07T20:21:58,949][INFO ][logstash.javapipeline    ][main] Pipeline with `queue.type: persisted` is configured to be recoverable with `pipeline.recoverable: auto`; in the event of a crash some in-flight events may be re-processed {pipeline_id: "main", thread: "#<Thread:0x4c14afd /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:21:58,952][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x4c14afd /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: memory + pipeline.recoverable: true + config.reload.automatic: true
    logs warning about automating data loss
    [2026-04-07T20:22:43,473][WARN ][logstash.javapipeline    ][main] Pipeline with `queue.type: memory` is configured to be recoverable with `pipeline.recoverable: true`; in the event of a crash in-flight events will be lost, so enabling auto-recovery increases the risk of data loss. {pipeline_id: "main", thread: "#<Thread:0x58a27296 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:22:43,477][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x58a27296 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • queue.type: persisted + pipeline.recoverable: true + config.reload.automatic: true
    logs info about possible re-processing
    [2026-04-07T20:23:27,604][INFO ][logstash.javapipeline    ][main] Pipeline with `queue.type: persisted` is configured to be recoverable with `pipeline.recoverable: true`; in the event of a crash some in-flight events may be re-processed {pipeline_id: "main", thread: "#<Thread:0x4f6df6d3 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:23:27,609][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => true, thread: "#<Thread:0x4f6df6d3 /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    
  • pipeline.recoverable: true + config.reload.automatic: false
    logs warning about not actually being recoverable
    [2026-04-07T20:26:18,539][WARN ][logstash.javapipeline    ][main] Pipeline is configured to be recoverable with `pipeline.recoverable: true`, but config reloading has been disabled with `config.reload.automatic: false`; if this pipeline crashes it will NOT be recovered. {pipeline_id: "main", thread: "#<Thread:0x1007a0c /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    [2026-04-07T20:26:18,544][INFO ][logstash.javapipeline    ][main] Starting pipeline {pipeline_id: "main", "pipeline.workers" => 12, "pipeline.batch.size" => 125, "pipeline.batch.delay" => 50, "pipeline.batch.output_chunking.growth_threshold_factor" => 1000, "pipeline.max_inflight" => 1500, "batch_metric_sampling" => "minimal", "pipeline.sources" => ["/Users/rye/src/elastic/logstash@main/crashy.conf"], "pipeline.recoverable" => false, thread: "#<Thread:0x1007a0c /Users/rye/src/elastic/logstash@main/logstash-core/lib/logstash/java_pipeline.rb:147 run>"}
    

@donoghuc donoghuc self-requested a review April 7, 2026 18:43
@yaauie yaauie requested a review from robbavey April 7, 2026 20:38
Copy link
Copy Markdown
Member

@donoghuc donoghuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general all my manual testing looked great. I think this is absolutely solid from a behavior standpoint. Have a few niche and nitpicky comments.

Copy link
Copy Markdown
Member

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Once @donoghuc's concerns have been addressed, I'll be happy to see this ship!

@yaauie yaauie requested a review from donoghuc April 7, 2026 22:02
Copy link
Copy Markdown
Member

@donoghuc donoghuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe some trivial documentation issues in the example configs, but i tested the updates and they work as expected!

Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com>
@yaauie yaauie requested a review from donoghuc April 7, 2026 22:36
Copy link
Copy Markdown
Member

@donoghuc donoghuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid! I think this will be a very helpful feature

@yaauie yaauie merged commit af19ca6 into elastic:main Apr 7, 2026
9 of 11 checks passed
@yaauie yaauie deleted the pipeline-recovery-reactive branch April 7, 2026 22:37
@elasticmachine
Copy link
Copy Markdown

💚 Build Succeeded

History

yaauie added a commit to yaauie/logstash that referenced this pull request Apr 8, 2026
…c#18930)

* reload automatic: recover crashed pipelines during convergence

* recovery: add health report probes

* derp: fix invocation of failure_injector filter

* PR feedback:

 - do not resolve recovery action until pipeline has settled into
   its crash state
 - capture a successful recovery as a successful reload in metrics

* health tests: back out local branch changes

* update recovery test assertions to use new $match helper

* Apply suggestion from @yaauie

* add logging, examples for `pipeline.recoverable` setting

* recovery: clean up old pipeline

* recovery: keep only last 5min of recovery log

* Apply suggestions from code review

Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com>

---------

Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com>
yaauie added a commit that referenced this pull request Apr 8, 2026
#18967)

* reload automatic: recover crashed pipelines during convergence

* recovery: add health report probes

* derp: fix invocation of failure_injector filter

* PR feedback:

 - do not resolve recovery action until pipeline has settled into
   its crash state
 - capture a successful recovery as a successful reload in metrics

* health tests: back out local branch changes

* update recovery test assertions to use new $match helper

* Apply suggestion from @yaauie

* add logging, examples for `pipeline.recoverable` setting

* recovery: clean up old pipeline

* recovery: keep only last 5min of recovery log

* Apply suggestions from code review



---------

Co-authored-by: Cas Donoghue <cas.donoghue@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants