Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ class TaskConfig extends LazyMap implements Cloneable {
throw new IllegalArgumentException("Not a valid `ErrorStrategy` value: ${strategy}")
}

List<String> getRetryOn() {
final value = get('retryOn')
if( value instanceof List )
return (List<String>) value
if( value == null )
return Collections.<String>emptyList()
// single string value
if( value instanceof CharSequence )
return [ value.toString() ]
throw new IllegalArgumentException("Not a valid `retryOn` value: ${value}")
}

def getResourceLimit(String directive) {
final limits = get('resourceLimits') as Map
return limits?.get(directive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class TaskErrorFormatter {
// -- the exit status
message << "\nCommand exit status:\n ${task.exitStatus != Integer.MAX_VALUE ? task.exitStatus : '-'}".toString()

// -- the termination reason (e.g. OOMKilled, Evicted) when provided by the executor
if( task.terminationReason )
message << "\nTermination reason:\n ${task.terminationReason}".toString()

// -- the tail of the process stdout
message << "\nCommand output:"
final max = 50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ class TaskProcessor {
if( task && error instanceof ProcessException ) {
// expose current task exit status
task.config.exitStatus = task.exitStatus
task.config.terminationReason = task.terminationReason
task.config.aborted = task.aborted
task.config.errorCount = procErrCount
task.config.retryCount = taskErrCount
//Add trace of the previous execution in the task context for next execution
Expand All @@ -1085,6 +1087,8 @@ class TaskProcessor {
errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries)
if( errorStrategy.soft ) {
def msg = "[$task.hashLog] NOTE: ${submitTimeout ? submitErrMsg : error.message}"
if( task.terminationReason )
msg += " [reason: ${task.terminationReason}]"
if( errorStrategy == IGNORE )
msg += " -- Error is ignored"
else if( errorStrategy == RETRY )
Expand Down Expand Up @@ -1150,7 +1154,13 @@ class TaskProcessor {

protected ErrorStrategy checkErrorStrategy( TaskRun task, ProcessException error, final int taskErrCount, final int procErrCount, final submitRetries ) {

final action = task.config.getErrorStrategy()
// always evaluate the error strategy (may have side effects like logging)
final configAction = task.config.getErrorStrategy()
// retryOn directive: if the termination reason matches, override to RETRY
final retryReasons = task.config.getRetryOn()
final action = (retryReasons && task.terminationReason in retryReasons)
? RETRY
: configAction

// retry is not allowed when the script cannot be compiled or similar errors
if( error instanceof ProcessUnrecoverableException || error.cause instanceof ProcessUnrecoverableException ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ class TaskRun implements Cloneable {
*/
volatile boolean aborted

/**
* The executor-specific reason for task termination (e.g. OOMKilled, Evicted).
* Set by the executor when the platform provides a termination reason.
*/
volatile String terminationReason

/**
* The action {@link ErrorStrategy} action applied if task has failed
*/
Expand Down Expand Up @@ -378,6 +384,7 @@ class TaskRun implements Cloneable {
copy.name = null // <-- force to re-evaluate the name that can include a dynamic tag
copy.error = null
copy.exitStatus = Integer.MAX_VALUE
copy.terminationReason = null
return copy
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ProcessBuilder {
'queue',
'resourceLabels',
'resourceLimits',
'retryOn',
'scratch',
'secret',
'shell',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ void resourceLimits(
Map<String,?> opts
);

@Description("""
The `retryOn` directive allows you to specify termination reasons (e.g. OOMKilled) that should trigger a retry, overriding the error strategy.
""")
void retryOn(String value);
void retryOn(List<String> value);

@Description("""
The `scratch` directive allows you to execute each task in a temporary directory that is local to the compute node.

Expand Down
43 changes: 40 additions & 3 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.container.ContainerHelper
import nextflow.container.DockerBuilder
import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException
import nextflow.exception.NodeTerminationException
import nextflow.k8s.client.PodUnschedulableException
import nextflow.exception.ProcessSubmitException
Expand Down Expand Up @@ -358,7 +360,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
}
return state
}
catch (NodeTerminationException | PodUnschedulableException e) {
catch (NodeTerminationException | PodUnschedulableException | K8sOutOfCpuException | K8sOutOfMemoryException e) {
// create a synthetic `state` object adding an extra `nodeTermination`
// attribute to return the error to the caller method
final instant = Instant.now()
Expand Down Expand Up @@ -438,11 +440,28 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
// the K8s API is more reliable because the container may terminate before the exit file is written
// See https://github.com/nextflow-io/nextflow/issues/6436
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core
log.trace("[k8s] Container Terminated state ${state.terminated}")
final k8sExitCode = (state.terminated as Map)?.exitCode as Integer
log.debug("[k8s] Container Terminated state ${state.terminated}")
final terminated = state.terminated as Map
final k8sExitCode = terminated?.exitCode as Integer
final reason = terminated?.reason as String
task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile()
task.stdout = outputFile
task.stderr = errorFile
// expose termination reason so users can make decisions in errorStrategy
// K8s provides 'reason' when the container itself is killed (e.g. OOMKilled, Evicted)
// When the process inside is killed but the container exits normally, reason is null
// In that case, infer from exit code: 137 = SIGKILL (likely OOM), 143 = SIGTERM
if( reason && reason != 'Error' && reason != 'Completed' )
task.terminationReason = reason
else if( k8sExitCode == 137 )
task.terminationReason = 'OOMKilled(exit137)'
else if( k8sExitCode == 143 )
task.terminationReason = 'SignalTerm(exit143)'
// classify infrastructure failures so Nextflow can auto-retry
if( isInfrastructureFailure(reason) ) {
task.error = new NodeTerminationException("K8s infrastructure failure: ${reason}")
task.aborted = true
}
}
status = TaskStatus.COMPLETED
saveJobLogOnError(task)
Expand All @@ -455,6 +474,24 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
return false
}

/**
* Determine if the K8s container termination reason indicates an infrastructure
* failure (as opposed to an application failure).
*/
protected static boolean isInfrastructureFailure(String reason) {
if( !reason )
return false
switch( reason ) {
case 'Evicted':
case 'Preempting':
case 'DeadlineExceeded':
case 'Shutdown':
return true
default:
return false
}
}

protected void saveJobLogOnError(TaskRun task) {
if( task.isSuccess() )
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class K8sClient {
final dummyPodStatus = [
terminated: [
reason: "Completed",
exitCode: 0,
startedAt: jobStatus.startTime,
finishedAt: jobStatus.completionTime,
]
Expand Down
140 changes: 140 additions & 0 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.nio.file.Paths

import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException
import nextflow.exception.NodeTerminationException
import nextflow.file.http.XPath
import nextflow.fusion.FusionConfig
Expand Down Expand Up @@ -500,8 +502,10 @@ class K8sTaskHandlerTest extends Specification {
then:
1 * handler.getState() >> fullState
1 * handler.updateTimestamps(termState)
0 * handler.readExitFile()
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
// exitCode 0 from K8s is now used directly instead of falling through to readExitFile()
handler.task.exitStatus == 0
handler.task.@stdout == OUT_FILE
handler.task.@stderr == ERR_FILE
Expand Down Expand Up @@ -678,6 +682,142 @@ class K8sTaskHandlerTest extends Specification {
state.nodeTermination.message == "Pod failed for unknown reason"
}

def 'should return nodeTermination state for K8sOutOfMemoryException' () {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
def state = handler.getState()
then:
1 * client.podState(POD_NAME) >> { throw new K8sOutOfMemoryException("Pod out of memory") }
then:
state.terminated.startedAt
state.terminated.finishedAt
and:
state.nodeTermination instanceof K8sOutOfMemoryException
state.nodeTermination.message == "Pod out of memory"
}

def 'should return nodeTermination state for K8sOutOfCpuException' () {
given:
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
def state = handler.getState()
then:
1 * client.podState(POD_NAME) >> { throw new K8sOutOfCpuException("Pod out of CPU") }
then:
state.terminated.startedAt
state.terminated.finishedAt
and:
state.nodeTermination instanceof K8sOutOfCpuException
state.nodeTermination.message == "Pod out of CPU"
}

def 'should classify infrastructure failures' () {
expect:
K8sTaskHandler.isInfrastructureFailure(reason) == expected
where:
reason | expected
'Evicted' | true
'Preempting' | true
'Shutdown' | true
'DeadlineExceeded' | true
'OOMKilled' | false
'Completed' | false
'Error' | false
null | false
'' | false
}

def 'should set task aborted for Evicted' () {
given:
def ERR_FILE = Paths.get('err.file')
def OUT_FILE = Paths.get('out.file')
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def termState = [ reason: "Evicted",
startedAt: "2018-01-13T10:09:36Z",
finishedAt: "2018-01-13T10:19:36Z",
exitCode: 143 ]
def task = new TaskRun()
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))

when:
def result = handler.checkIfCompleted()
then:
1 * handler.getState() >> [terminated: termState]
1 * handler.updateTimestamps(termState)
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
handler.task.exitStatus == 143
handler.task.aborted == true
handler.task.terminationReason == 'Evicted'
handler.task.error instanceof NodeTerminationException
handler.status == TaskStatus.COMPLETED
result == true
}

def 'should set terminationReason but not aborted for OOMKilled' () {
given:
def ERR_FILE = Paths.get('err.file')
def OUT_FILE = Paths.get('out.file')
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def termState = [ reason: "OOMKilled",
startedAt: "2018-01-13T10:09:36Z",
finishedAt: "2018-01-13T10:19:36Z",
exitCode: 137 ]
def task = new TaskRun()
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))

when:
def result = handler.checkIfCompleted()
then:
1 * handler.getState() >> [terminated: termState]
1 * handler.updateTimestamps(termState)
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
handler.task.exitStatus == 137
handler.task.aborted == false
handler.task.terminationReason == 'OOMKilled'
handler.task.error == null
handler.status == TaskStatus.COMPLETED
result == true
}

def 'should not set task aborted for application failure' () {
given:
def ERR_FILE = Paths.get('err.file')
def OUT_FILE = Paths.get('out.file')
def POD_NAME = 'pod-xyz'
def client = Mock(K8sClient)
def termState = [ reason: "Error",
startedAt: "2018-01-13T10:09:36Z",
finishedAt: "2018-01-13T10:19:36Z",
exitCode: 1 ]
def task = new TaskRun()
def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE))

when:
def result = handler.checkIfCompleted()
then:
1 * handler.getState() >> [terminated: termState]
1 * handler.updateTimestamps(termState)
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
handler.task.exitStatus == 1
handler.task.aborted == false
handler.task.terminationReason == null
handler.task.error == null
handler.status == TaskStatus.COMPLETED
result == true
}

def 'should return container mounts' () {

given:
Expand Down
21 changes: 9 additions & 12 deletions plugins/nf-k8s/src/test/nextflow/k8s/client/K8sClientTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1060,15 +1060,13 @@ class K8sClientTest extends Specification {
e.message == "K8s pod in Failed state"
}

def 'should fallback to job status when pod is gone and not return hardcoded exit code' () {
def 'should fallback to job status when pod is gone and return exit code zero' () {
given:
def JOB_STATUS_JSON = '''
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "test-job"
},
"metadata": { "name": "nf-abc123" },
"status": {
"succeeded": 1,
"startTime": "2025-01-15T10:00:00Z",
Expand All @@ -1084,22 +1082,21 @@ class K8sClientTest extends Specification {
}
}
'''
def JOB_NAME = 'nf-abc123'
def client = Spy(K8sClient)
final JOB_NAME = 'test-job'

when:
def result = client.jobStateFallback0(JOB_NAME)

def result = client.jobState(JOB_NAME)
then:
// findPodNameForJob returns null (pod is gone)
1 * client.findPodNameForJob(JOB_NAME) >> null
// falls back to jobStateFallback0 which calls jobStatus
1 * client.jobStatus(JOB_NAME) >> new K8sResponseJson(JOB_STATUS_JSON)

and:
result.terminated != null
result.terminated.reason == 'Completed'
result.terminated.startedAt == '2025-01-15T10:00:00Z'
result.terminated.finishedAt == '2025-01-15T10:05:00Z'
// The key assertion: exitCode should not be present (null) so fallback to .exitcode file works
result.terminated.exitCode == null
result.terminated.exitcode == null
// K8s only reports succeeded==1 when exit code is 0, so synthetic status should include it
result.terminated.exitCode == 0
}
}