From 1d6afcb3580c094f17e8bbde684eef1d2662c6c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rayan=20Hassa=C3=AFne?= Date: Thu, 12 Mar 2026 16:51:58 +0100 Subject: [PATCH] Add K8s failure classification, terminationReason plumbing, and retryOn directive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When running Nextflow pipelines on Kubernetes, all task failures are treated identically — there is no distinction between infrastructure failures (OOMKilled, Evicted, Preempted) and application errors (exit code 1, script bugs). This makes it impossible to implement smart retry strategies like "retry OOM with more memory" or "fail fast on application errors" without resorting to fragile exit-code matching in error strategy closures. Additionally, task.terminationReason was not accessible in error strategy closures because the value was set on TaskRun but never plumbed through to TaskConfig (the object exposed as `task` in closures). The K8s task handler now extracts the container termination reason from the K8s API and classifies failures into categories: - **Application failures** (OOMKilled, Error, etc.): `terminationReason` is set on the task but no special handling — respects the user's error strategy. - **Infrastructure failures** (Evicted, Preempting, DeadlineExceeded, Shutdown): Sets `task.aborted = true` and throws `NodeTerminationException` for automatic retry, since these are transient platform issues not caused by the task itself. - **Inferred reasons**: When K8s doesn't provide a reason but the exit code is informative (137 = SIGKILL/OOM, 143 = SIGTERM), a synthetic reason is set (e.g. `OOMKilled(exit137)`). Also fixes the `exitcode` → `exitCode` typo in `K8sClient.jobStateFallback0()` and the `0 ?: readExitFile()` Groovy truthiness bug (0 is falsy in Groovy, so exit code 0 was incorrectly falling through to readExitFile). Building on the work in #6436 and #6442 which introduced K8s exit code reading from the container terminated state. - Added `volatile String terminationReason` field to `TaskRun` - Plumbed `terminationReason` and `aborted` from `TaskRun` to `TaskConfig` in `TaskProcessor.resumeOrDie()`, following the existing `exitStatus` pattern - `terminationReason` is now accessible as `task.terminationReason` in error strategy closures - Native logging: retry messages now include `[reason: OOMKilled]` and hard failure error blocks include a "Termination reason" section — no custom error strategy closure needed for visibility New process directive `retryOn` that provides a declarative way to retry based on termination reasons without writing Groovy closures: process FOO { retryOn 'OOMKilled' memory { 2.GB * task.attempt } ... } Or in config files (assignment syntax): process { retryOn = ['OOMKilled', 'OOMKilled(exit137)'] } When the task's `terminationReason` matches any value in the `retryOn` list, the error strategy is overridden to RETRY. The user's error strategy closure is still evaluated first (preserving side effects like logging), but the return value is overridden. Tested on GKE with nf-core/oncoanalyser using a config that forces OOM: process { withName: "BWAMEM2_ALIGN" { memory = { 1500.MB * task.attempt } retryOn = ['OOMKilled'] } } BWAMEM2_ALIGN OOMs at 1500MB, Nextflow logs: [2b/54e1be] NOTE: Process `BWAMEM2_ALIGN (...)` terminated with an error exit status (137) [reason: OOMKilled] -- Execution is retried (1) The task is retried with 3000MB (attempt 2), then 4500MB (attempt 3) if needed. - TaskRun.groovy: added terminationReason field, cleared on makeCopy() - TaskConfig.groovy: added getRetryOn() getter - TaskProcessor.groovy: plumbed terminationReason/aborted to config, implemented retryOn override in checkErrorStrategy(), added native terminationReason logging - ProcessBuilder.groovy: registered retryOn as valid directive - K8sTaskHandler.groovy: failure classification, terminationReason extraction, infrastructure failure detection, K8sOutOfCpu/MemoryException in catch clause - K8sClient.groovy: fixed exitcode→exitCode typo, explicit exitCode: 0 - K8sTaskHandlerTest.groovy: 6 new tests + 1 updated test - K8sClientTest.groovy: 1 new test for job fallback exit code Signed-off-by: Rayan Hassaine Co-Authored-By: Claude Opus 4.6 Signed-off-by: Rayan Hassaïne --- .../nextflow/processor/TaskConfig.groovy | 12 ++ .../processor/TaskErrorFormatter.groovy | 4 + .../nextflow/processor/TaskProcessor.groovy | 12 +- .../groovy/nextflow/processor/TaskRun.groovy | 7 + .../nextflow/script/dsl/ProcessBuilder.groovy | 1 + .../java/nextflow/script/dsl/ProcessDsl.java | 6 + .../main/nextflow/k8s/K8sTaskHandler.groovy | 43 +++++- .../main/nextflow/k8s/client/K8sClient.groovy | 1 + .../nextflow/k8s/K8sTaskHandlerTest.groovy | 140 ++++++++++++++++++ .../nextflow/k8s/client/K8sClientTest.groovy | 21 ++- 10 files changed, 231 insertions(+), 16 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 27bdd95f98..72f966ef8a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -240,6 +240,18 @@ class TaskConfig extends LazyMap implements Cloneable { throw new IllegalArgumentException("Not a valid `ErrorStrategy` value: ${strategy}") } + List getRetryOn() { + final value = get('retryOn') + if( value instanceof List ) + return (List) value + if( value == null ) + return Collections.emptyList() + // single string value + if( value instanceof CharSequence ) + return [ value.toString() ] + throw new IllegalArgumentException("Not a valid `retryOn` value: ${value}") + } + def getResourceLimit(String directive) { final limits = get('resourceLimits') as Map return limits?.get(directive) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy index b6bc4d5daa..ec8a2f4354 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy @@ -117,6 +117,10 @@ class TaskErrorFormatter { // -- the exit status message << "\nCommand exit status:\n ${task.exitStatus != Integer.MAX_VALUE ? task.exitStatus : '-'}".toString() + // -- the termination reason (e.g. OOMKilled, Evicted) when provided by the executor + if( task.terminationReason ) + message << "\nTermination reason:\n ${task.terminationReason}".toString() + // -- the tail of the process stdout message << "\nCommand output:" final max = 50 diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index f0ff2152f0..0c27196df4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -1075,6 +1075,8 @@ class TaskProcessor { if( task && error instanceof ProcessException ) { // expose current task exit status task.config.exitStatus = task.exitStatus + task.config.terminationReason = task.terminationReason + task.config.aborted = task.aborted task.config.errorCount = procErrCount task.config.retryCount = taskErrCount //Add trace of the previous execution in the task context for next execution @@ -1085,6 +1087,8 @@ class TaskProcessor { errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries) if( errorStrategy.soft ) { def msg = "[$task.hashLog] NOTE: ${submitTimeout ? submitErrMsg : error.message}" + if( task.terminationReason ) + msg += " [reason: ${task.terminationReason}]" if( errorStrategy == IGNORE ) msg += " -- Error is ignored" else if( errorStrategy == RETRY ) @@ -1150,7 +1154,13 @@ class TaskProcessor { protected ErrorStrategy checkErrorStrategy( TaskRun task, ProcessException error, final int taskErrCount, final int procErrCount, final submitRetries ) { - final action = task.config.getErrorStrategy() + // always evaluate the error strategy (may have side effects like logging) + final configAction = task.config.getErrorStrategy() + // retryOn directive: if the termination reason matches, override to RETRY + final retryReasons = task.config.getRetryOn() + final action = (retryReasons && task.terminationReason in retryReasons) + ? RETRY + : configAction // retry is not allowed when the script cannot be compiled or similar errors if( error instanceof ProcessUnrecoverableException || error.cause instanceof ProcessUnrecoverableException ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 2213783bfe..a6775c4bea 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -342,6 +342,12 @@ class TaskRun implements Cloneable { */ volatile boolean aborted + /** + * The executor-specific reason for task termination (e.g. OOMKilled, Evicted). + * Set by the executor when the platform provides a termination reason. + */ + volatile String terminationReason + /** * The action {@link ErrorStrategy} action applied if task has failed */ @@ -378,6 +384,7 @@ class TaskRun implements Cloneable { copy.name = null // <-- force to re-evaluate the name that can include a dynamic tag copy.error = null copy.exitStatus = Integer.MAX_VALUE + copy.terminationReason = null return copy } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy index 2f63c30d62..7dc7f8ea2f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy @@ -73,6 +73,7 @@ class ProcessBuilder { 'queue', 'resourceLabels', 'resourceLimits', + 'retryOn', 'scratch', 'secret', 'shell', diff --git a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java index a75a875b3a..d14080c1a8 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java +++ b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java @@ -303,6 +303,12 @@ void resourceLimits( Map opts ); + @Description(""" + The `retryOn` directive allows you to specify termination reasons (e.g. OOMKilled) that should trigger a retry, overriding the error strategy. + """) + void retryOn(String value); + void retryOn(List value); + @Description(""" The `scratch` directive allows you to execute each task in a temporary directory that is local to the compute node. diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index 57bc2e4a57..6b6bdabde4 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -28,6 +28,8 @@ import groovy.util.logging.Slf4j import nextflow.SysEnv import nextflow.container.ContainerHelper import nextflow.container.DockerBuilder +import nextflow.exception.K8sOutOfCpuException +import nextflow.exception.K8sOutOfMemoryException import nextflow.exception.NodeTerminationException import nextflow.k8s.client.PodUnschedulableException import nextflow.exception.ProcessSubmitException @@ -358,7 +360,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { } return state } - catch (NodeTerminationException | PodUnschedulableException e) { + catch (NodeTerminationException | PodUnschedulableException | K8sOutOfCpuException | K8sOutOfMemoryException e) { // create a synthetic `state` object adding an extra `nodeTermination` // attribute to return the error to the caller method final instant = Instant.now() @@ -438,11 +440,28 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { // the K8s API is more reliable because the container may terminate before the exit file is written // See https://github.com/nextflow-io/nextflow/issues/6436 // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core - log.trace("[k8s] Container Terminated state ${state.terminated}") - final k8sExitCode = (state.terminated as Map)?.exitCode as Integer + log.debug("[k8s] Container Terminated state ${state.terminated}") + final terminated = state.terminated as Map + final k8sExitCode = terminated?.exitCode as Integer + final reason = terminated?.reason as String task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile() task.stdout = outputFile task.stderr = errorFile + // expose termination reason so users can make decisions in errorStrategy + // K8s provides 'reason' when the container itself is killed (e.g. OOMKilled, Evicted) + // When the process inside is killed but the container exits normally, reason is null + // In that case, infer from exit code: 137 = SIGKILL (likely OOM), 143 = SIGTERM + if( reason && reason != 'Error' && reason != 'Completed' ) + task.terminationReason = reason + else if( k8sExitCode == 137 ) + task.terminationReason = 'OOMKilled(exit137)' + else if( k8sExitCode == 143 ) + task.terminationReason = 'SignalTerm(exit143)' + // classify infrastructure failures so Nextflow can auto-retry + if( isInfrastructureFailure(reason) ) { + task.error = new NodeTerminationException("K8s infrastructure failure: ${reason}") + task.aborted = true + } } status = TaskStatus.COMPLETED saveJobLogOnError(task) @@ -455,6 +474,24 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { return false } + /** + * Determine if the K8s container termination reason indicates an infrastructure + * failure (as opposed to an application failure). + */ + protected static boolean isInfrastructureFailure(String reason) { + if( !reason ) + return false + switch( reason ) { + case 'Evicted': + case 'Preempting': + case 'DeadlineExceeded': + case 'Shutdown': + return true + default: + return false + } + } + protected void saveJobLogOnError(TaskRun task) { if( task.isSuccess() ) return diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy index 48c95929c4..65279a961a 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy @@ -420,6 +420,7 @@ class K8sClient { final dummyPodStatus = [ terminated: [ reason: "Completed", + exitCode: 0, startedAt: jobStatus.startTime, finishedAt: jobStatus.completionTime, ] diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy index e63e36b0f4..4461ac180e 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -22,6 +22,8 @@ import java.nio.file.Paths import nextflow.Session import nextflow.SysEnv +import nextflow.exception.K8sOutOfCpuException +import nextflow.exception.K8sOutOfMemoryException import nextflow.exception.NodeTerminationException import nextflow.file.http.XPath import nextflow.fusion.FusionConfig @@ -500,8 +502,10 @@ class K8sTaskHandlerTest extends Specification { then: 1 * handler.getState() >> fullState 1 * handler.updateTimestamps(termState) + 0 * handler.readExitFile() 1 * handler.deleteJobIfSuccessful(task) >> null 1 * handler.saveJobLogOnError(task) >> null + // exitCode 0 from K8s is now used directly instead of falling through to readExitFile() handler.task.exitStatus == 0 handler.task.@stdout == OUT_FILE handler.task.@stderr == ERR_FILE @@ -678,6 +682,142 @@ class K8sTaskHandlerTest extends Specification { state.nodeTermination.message == "Pod failed for unknown reason" } + def 'should return nodeTermination state for K8sOutOfMemoryException' () { + given: + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME)) + + when: + def state = handler.getState() + then: + 1 * client.podState(POD_NAME) >> { throw new K8sOutOfMemoryException("Pod out of memory") } + then: + state.terminated.startedAt + state.terminated.finishedAt + and: + state.nodeTermination instanceof K8sOutOfMemoryException + state.nodeTermination.message == "Pod out of memory" + } + + def 'should return nodeTermination state for K8sOutOfCpuException' () { + given: + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME)) + + when: + def state = handler.getState() + then: + 1 * client.podState(POD_NAME) >> { throw new K8sOutOfCpuException("Pod out of CPU") } + then: + state.terminated.startedAt + state.terminated.finishedAt + and: + state.nodeTermination instanceof K8sOutOfCpuException + state.nodeTermination.message == "Pod out of CPU" + } + + def 'should classify infrastructure failures' () { + expect: + K8sTaskHandler.isInfrastructureFailure(reason) == expected + where: + reason | expected + 'Evicted' | true + 'Preempting' | true + 'Shutdown' | true + 'DeadlineExceeded' | true + 'OOMKilled' | false + 'Completed' | false + 'Error' | false + null | false + '' | false + } + + def 'should set task aborted for Evicted' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "Evicted", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 143 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 143 + handler.task.aborted == true + handler.task.terminationReason == 'Evicted' + handler.task.error instanceof NodeTerminationException + handler.status == TaskStatus.COMPLETED + result == true + } + + def 'should set terminationReason but not aborted for OOMKilled' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "OOMKilled", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 137 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 137 + handler.task.aborted == false + handler.task.terminationReason == 'OOMKilled' + handler.task.error == null + handler.status == TaskStatus.COMPLETED + result == true + } + + def 'should not set task aborted for application failure' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "Error", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 1 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 1 + handler.task.aborted == false + handler.task.terminationReason == null + handler.task.error == null + handler.status == TaskStatus.COMPLETED + result == true + } + def 'should return container mounts' () { given: diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy index 263a6e3cad..2b5de72c0e 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy @@ -1060,15 +1060,13 @@ class K8sClientTest extends Specification { e.message == "K8s pod in Failed state" } - def 'should fallback to job status when pod is gone and not return hardcoded exit code' () { + def 'should fallback to job status when pod is gone and return exit code zero' () { given: def JOB_STATUS_JSON = ''' { "apiVersion": "batch/v1", "kind": "Job", - "metadata": { - "name": "test-job" - }, + "metadata": { "name": "nf-abc123" }, "status": { "succeeded": 1, "startTime": "2025-01-15T10:00:00Z", @@ -1084,22 +1082,21 @@ class K8sClientTest extends Specification { } } ''' + def JOB_NAME = 'nf-abc123' def client = Spy(K8sClient) - final JOB_NAME = 'test-job' when: - def result = client.jobStateFallback0(JOB_NAME) - + def result = client.jobState(JOB_NAME) then: + // findPodNameForJob returns null (pod is gone) + 1 * client.findPodNameForJob(JOB_NAME) >> null + // falls back to jobStateFallback0 which calls jobStatus 1 * client.jobStatus(JOB_NAME) >> new K8sResponseJson(JOB_STATUS_JSON) - and: - result.terminated != null result.terminated.reason == 'Completed' result.terminated.startedAt == '2025-01-15T10:00:00Z' result.terminated.finishedAt == '2025-01-15T10:05:00Z' - // The key assertion: exitCode should not be present (null) so fallback to .exitcode file works - result.terminated.exitCode == null - result.terminated.exitcode == null + // K8s only reports succeeded==1 when exit code is 0, so synthetic status should include it + result.terminated.exitCode == 0 } }