diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 27bdd95f98..72f966ef8a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -240,6 +240,18 @@ class TaskConfig extends LazyMap implements Cloneable { throw new IllegalArgumentException("Not a valid `ErrorStrategy` value: ${strategy}") } + List getRetryOn() { + final value = get('retryOn') + if( value instanceof List ) + return (List) value + if( value == null ) + return Collections.emptyList() + // single string value + if( value instanceof CharSequence ) + return [ value.toString() ] + throw new IllegalArgumentException("Not a valid `retryOn` value: ${value}") + } + def getResourceLimit(String directive) { final limits = get('resourceLimits') as Map return limits?.get(directive) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy index b6bc4d5daa..ec8a2f4354 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskErrorFormatter.groovy @@ -117,6 +117,10 @@ class TaskErrorFormatter { // -- the exit status message << "\nCommand exit status:\n ${task.exitStatus != Integer.MAX_VALUE ? task.exitStatus : '-'}".toString() + // -- the termination reason (e.g. OOMKilled, Evicted) when provided by the executor + if( task.terminationReason ) + message << "\nTermination reason:\n ${task.terminationReason}".toString() + // -- the tail of the process stdout message << "\nCommand output:" final max = 50 diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index f0ff2152f0..0c27196df4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -1075,6 +1075,8 @@ class TaskProcessor { if( task && error instanceof ProcessException ) { // expose current task exit status task.config.exitStatus = task.exitStatus + task.config.terminationReason = task.terminationReason + task.config.aborted = task.aborted task.config.errorCount = procErrCount task.config.retryCount = taskErrCount //Add trace of the previous execution in the task context for next execution @@ -1085,6 +1087,8 @@ class TaskProcessor { errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries) if( errorStrategy.soft ) { def msg = "[$task.hashLog] NOTE: ${submitTimeout ? submitErrMsg : error.message}" + if( task.terminationReason ) + msg += " [reason: ${task.terminationReason}]" if( errorStrategy == IGNORE ) msg += " -- Error is ignored" else if( errorStrategy == RETRY ) @@ -1150,7 +1154,13 @@ class TaskProcessor { protected ErrorStrategy checkErrorStrategy( TaskRun task, ProcessException error, final int taskErrCount, final int procErrCount, final submitRetries ) { - final action = task.config.getErrorStrategy() + // always evaluate the error strategy (may have side effects like logging) + final configAction = task.config.getErrorStrategy() + // retryOn directive: if the termination reason matches, override to RETRY + final retryReasons = task.config.getRetryOn() + final action = (retryReasons && task.terminationReason in retryReasons) + ? RETRY + : configAction // retry is not allowed when the script cannot be compiled or similar errors if( error instanceof ProcessUnrecoverableException || error.cause instanceof ProcessUnrecoverableException ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 2213783bfe..a6775c4bea 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -342,6 +342,12 @@ class TaskRun implements Cloneable { */ volatile boolean aborted + /** + * The executor-specific reason for task termination (e.g. OOMKilled, Evicted). + * Set by the executor when the platform provides a termination reason. + */ + volatile String terminationReason + /** * The action {@link ErrorStrategy} action applied if task has failed */ @@ -378,6 +384,7 @@ class TaskRun implements Cloneable { copy.name = null // <-- force to re-evaluate the name that can include a dynamic tag copy.error = null copy.exitStatus = Integer.MAX_VALUE + copy.terminationReason = null return copy } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy index 2f63c30d62..7dc7f8ea2f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy @@ -73,6 +73,7 @@ class ProcessBuilder { 'queue', 'resourceLabels', 'resourceLimits', + 'retryOn', 'scratch', 'secret', 'shell', diff --git a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java index a75a875b3a..d14080c1a8 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java +++ b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java @@ -303,6 +303,12 @@ void resourceLimits( Map opts ); + @Description(""" + The `retryOn` directive allows you to specify termination reasons (e.g. OOMKilled) that should trigger a retry, overriding the error strategy. + """) + void retryOn(String value); + void retryOn(List value); + @Description(""" The `scratch` directive allows you to execute each task in a temporary directory that is local to the compute node. diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index 57bc2e4a57..6b6bdabde4 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -28,6 +28,8 @@ import groovy.util.logging.Slf4j import nextflow.SysEnv import nextflow.container.ContainerHelper import nextflow.container.DockerBuilder +import nextflow.exception.K8sOutOfCpuException +import nextflow.exception.K8sOutOfMemoryException import nextflow.exception.NodeTerminationException import nextflow.k8s.client.PodUnschedulableException import nextflow.exception.ProcessSubmitException @@ -358,7 +360,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { } return state } - catch (NodeTerminationException | PodUnschedulableException e) { + catch (NodeTerminationException | PodUnschedulableException | K8sOutOfCpuException | K8sOutOfMemoryException e) { // create a synthetic `state` object adding an extra `nodeTermination` // attribute to return the error to the caller method final instant = Instant.now() @@ -438,11 +440,28 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { // the K8s API is more reliable because the container may terminate before the exit file is written // See https://github.com/nextflow-io/nextflow/issues/6436 // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core - log.trace("[k8s] Container Terminated state ${state.terminated}") - final k8sExitCode = (state.terminated as Map)?.exitCode as Integer + log.debug("[k8s] Container Terminated state ${state.terminated}") + final terminated = state.terminated as Map + final k8sExitCode = terminated?.exitCode as Integer + final reason = terminated?.reason as String task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile() task.stdout = outputFile task.stderr = errorFile + // expose termination reason so users can make decisions in errorStrategy + // K8s provides 'reason' when the container itself is killed (e.g. OOMKilled, Evicted) + // When the process inside is killed but the container exits normally, reason is null + // In that case, infer from exit code: 137 = SIGKILL (likely OOM), 143 = SIGTERM + if( reason && reason != 'Error' && reason != 'Completed' ) + task.terminationReason = reason + else if( k8sExitCode == 137 ) + task.terminationReason = 'OOMKilled(exit137)' + else if( k8sExitCode == 143 ) + task.terminationReason = 'SignalTerm(exit143)' + // classify infrastructure failures so Nextflow can auto-retry + if( isInfrastructureFailure(reason) ) { + task.error = new NodeTerminationException("K8s infrastructure failure: ${reason}") + task.aborted = true + } } status = TaskStatus.COMPLETED saveJobLogOnError(task) @@ -455,6 +474,24 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { return false } + /** + * Determine if the K8s container termination reason indicates an infrastructure + * failure (as opposed to an application failure). + */ + protected static boolean isInfrastructureFailure(String reason) { + if( !reason ) + return false + switch( reason ) { + case 'Evicted': + case 'Preempting': + case 'DeadlineExceeded': + case 'Shutdown': + return true + default: + return false + } + } + protected void saveJobLogOnError(TaskRun task) { if( task.isSuccess() ) return diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy index 48c95929c4..65279a961a 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/client/K8sClient.groovy @@ -420,6 +420,7 @@ class K8sClient { final dummyPodStatus = [ terminated: [ reason: "Completed", + exitCode: 0, startedAt: jobStatus.startTime, finishedAt: jobStatus.completionTime, ] diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy index e63e36b0f4..4461ac180e 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -22,6 +22,8 @@ import java.nio.file.Paths import nextflow.Session import nextflow.SysEnv +import nextflow.exception.K8sOutOfCpuException +import nextflow.exception.K8sOutOfMemoryException import nextflow.exception.NodeTerminationException import nextflow.file.http.XPath import nextflow.fusion.FusionConfig @@ -500,8 +502,10 @@ class K8sTaskHandlerTest extends Specification { then: 1 * handler.getState() >> fullState 1 * handler.updateTimestamps(termState) + 0 * handler.readExitFile() 1 * handler.deleteJobIfSuccessful(task) >> null 1 * handler.saveJobLogOnError(task) >> null + // exitCode 0 from K8s is now used directly instead of falling through to readExitFile() handler.task.exitStatus == 0 handler.task.@stdout == OUT_FILE handler.task.@stderr == ERR_FILE @@ -678,6 +682,142 @@ class K8sTaskHandlerTest extends Specification { state.nodeTermination.message == "Pod failed for unknown reason" } + def 'should return nodeTermination state for K8sOutOfMemoryException' () { + given: + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME)) + + when: + def state = handler.getState() + then: + 1 * client.podState(POD_NAME) >> { throw new K8sOutOfMemoryException("Pod out of memory") } + then: + state.terminated.startedAt + state.terminated.finishedAt + and: + state.nodeTermination instanceof K8sOutOfMemoryException + state.nodeTermination.message == "Pod out of memory" + } + + def 'should return nodeTermination state for K8sOutOfCpuException' () { + given: + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME)) + + when: + def state = handler.getState() + then: + 1 * client.podState(POD_NAME) >> { throw new K8sOutOfCpuException("Pod out of CPU") } + then: + state.terminated.startedAt + state.terminated.finishedAt + and: + state.nodeTermination instanceof K8sOutOfCpuException + state.nodeTermination.message == "Pod out of CPU" + } + + def 'should classify infrastructure failures' () { + expect: + K8sTaskHandler.isInfrastructureFailure(reason) == expected + where: + reason | expected + 'Evicted' | true + 'Preempting' | true + 'Shutdown' | true + 'DeadlineExceeded' | true + 'OOMKilled' | false + 'Completed' | false + 'Error' | false + null | false + '' | false + } + + def 'should set task aborted for Evicted' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "Evicted", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 143 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 143 + handler.task.aborted == true + handler.task.terminationReason == 'Evicted' + handler.task.error instanceof NodeTerminationException + handler.status == TaskStatus.COMPLETED + result == true + } + + def 'should set terminationReason but not aborted for OOMKilled' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "OOMKilled", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 137 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 137 + handler.task.aborted == false + handler.task.terminationReason == 'OOMKilled' + handler.task.error == null + handler.status == TaskStatus.COMPLETED + result == true + } + + def 'should not set task aborted for application failure' () { + given: + def ERR_FILE = Paths.get('err.file') + def OUT_FILE = Paths.get('out.file') + def POD_NAME = 'pod-xyz' + def client = Mock(K8sClient) + def termState = [ reason: "Error", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z", + exitCode: 1 ] + def task = new TaskRun() + def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> [terminated: termState] + 1 * handler.updateTimestamps(termState) + 1 * handler.deleteJobIfSuccessful(task) >> null + 1 * handler.saveJobLogOnError(task) >> null + handler.task.exitStatus == 1 + handler.task.aborted == false + handler.task.terminationReason == null + handler.task.error == null + handler.status == TaskStatus.COMPLETED + result == true + } + def 'should return container mounts' () { given: diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy index 263a6e3cad..2b5de72c0e 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy @@ -1060,15 +1060,13 @@ class K8sClientTest extends Specification { e.message == "K8s pod in Failed state" } - def 'should fallback to job status when pod is gone and not return hardcoded exit code' () { + def 'should fallback to job status when pod is gone and return exit code zero' () { given: def JOB_STATUS_JSON = ''' { "apiVersion": "batch/v1", "kind": "Job", - "metadata": { - "name": "test-job" - }, + "metadata": { "name": "nf-abc123" }, "status": { "succeeded": 1, "startTime": "2025-01-15T10:00:00Z", @@ -1084,22 +1082,21 @@ class K8sClientTest extends Specification { } } ''' + def JOB_NAME = 'nf-abc123' def client = Spy(K8sClient) - final JOB_NAME = 'test-job' when: - def result = client.jobStateFallback0(JOB_NAME) - + def result = client.jobState(JOB_NAME) then: + // findPodNameForJob returns null (pod is gone) + 1 * client.findPodNameForJob(JOB_NAME) >> null + // falls back to jobStateFallback0 which calls jobStatus 1 * client.jobStatus(JOB_NAME) >> new K8sResponseJson(JOB_STATUS_JSON) - and: - result.terminated != null result.terminated.reason == 'Completed' result.terminated.startedAt == '2025-01-15T10:00:00Z' result.terminated.finishedAt == '2025-01-15T10:05:00Z' - // The key assertion: exitCode should not be present (null) so fallback to .exitcode file works - result.terminated.exitCode == null - result.terminated.exitcode == null + // K8s only reports succeeded==1 when exit code is 0, so synthetic status should include it + result.terminated.exitCode == 0 } }