Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ class TaskProcessor {
@CompileStatic
final protected void checkCachedOrLaunchTask( TaskRun task, HashCode hash, boolean shouldTryCache ) {

int tries = task.failCount +1
int tries = task.failCount + task.abortedCount + 1
while( true ) {
hash = HashBuilder.defaultHasher().putBytes(hash.asBytes()).putInt(tries).hash()

Expand All @@ -812,6 +812,11 @@ class TaskProcessor {
final cached = shouldTryCache && exists && entry.trace.isCompleted() && checkCachedOutput(task.clone(), resumeDir, hash, entry)
if( cached )
break

// Issue #6884: https://github.com/nextflow-io/nextflow/issues/6884
// When not cached but there is an entry whose status is ABORT or FAILED, the task counters for these status must be incremented to be sure future retries takes it into account to calculate the hash.
if( entry?.trace?.isFailed() ) task.failCount += 1
if( entry?.trace?.isAborted() ) task.abortedCount+=1
}
catch (Throwable t) {
log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t)
Expand All @@ -837,7 +842,12 @@ class TaskProcessor {
finally {
lock.release()
}

// Issue #6884: https://github.com/nextflow-io/nextflow/issues/6884
// When cached tasks include failures and aborts and not cached. The task.attempt could not be sync with cached failures. We update the task.attempts and task.scripts
if( task.failCount > 0 && task.config.getAttempt() != task.failCount + 1 ) {
task.config.attempt = task.failCount + 1
task.resolve(taskBody)
}
Comment on lines +848 to +851
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not currently required to fix the case of the issue.
However, when a task has been executed with previous failures but has not been completed, it is not cached, and it is re-executed. This execution is done with task.attempt = 1, with this code it is re-exceuted with task.attempt = failedCount +1.

This case is happening in the added test. A process is defined to fail in the first attempt and succeed for the rest. So, it should execute twice in total. In the test, the execution is aborted after the first retry. Without this code, the task will be reexecuted again twice (first fails and second succeeds). With this code, the previous failed task will be counted as an attempt and then the task runs only once.

I am not sure if reexecuting with attempt =1 was intended or if it should be managed as in this code and update the attempts according to cached failures. @bentsherman @pditommaso what's your opinion about it?

// submit task for execution
submitTask( task, hash, workDir )
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ class TaskRun implements Cloneable {
*/
volatile int failCount

/**
* The number of times the execution of the task has been aborted
*/
volatile int abortedCount

/**
* The number of times the submit of the task has been retried
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,14 @@ class TraceRecord implements Serializable {
store.status == 'COMPLETED'
}

boolean isAborted() {
store.status == 'ABORTED'
}

boolean isFailed() {
store.status == 'FAILED'
}

String getExecutorName() {
return executorName
}
Expand Down
34 changes: 34 additions & 0 deletions tests/checks/resume-retried-with-abort.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
set -e

#
# run with abort
#
echo ''
timeout --signal=INT 3 $NXF_RUN | tee stdout || true

[[ `< .nextflow.log grep -c 'Submitted process > SMALL_SLEEP_RETRY'` == 1 ]] || false


#
# RESUME mode with fail and abort
#
echo ''
timeout --signal=INT 9 $NXF_RUN -resume | tee stdout || true

[[ `< .nextflow.log grep -c 'Submitted process > SMALL_SLEEP_RETRY'` == 1 ]] || false
[[ `< .nextflow.log grep -c 'Re-submitted process > SMALL_SLEEP_RETRY'` == 1 ]] || false
#
# RESUME mode with completed and abort
#
echo ''
timeout --signal=INT 9 $NXF_RUN -resume | tee stdout || true

[[ `< .nextflow.log grep -c 'Submitted process > SMALL_SLEEP_RETRY'` == 1 ]] || false

#
# RESUME mode with cached and finish
#
echo ''
$NXF_RUN -resume | tee stdout

[[ `< .nextflow.log grep -c 'Cached process > SMALL_SLEEP_RETRY'` == 1 ]] || false
65 changes: 65 additions & 0 deletions tests/resume-retried-with-abort.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env nextflow
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

process LONG_SLEEP {
tag "long-sleep"

input:
val x

output:
stdout

script:
'''
echo "LONG_SLEEP start"
sleep 30
echo "LONG_SLEEP done"
'''
}

process SMALL_SLEEP_RETRY {
tag "small-sleep-retry"

errorStrategy 'retry'
maxRetries 1

input:
val x

output:
stdout

script:
"""
echo "SMALL_SLEEP_RETRY attempt: ${task.attempt}"
sleep 5

if [[ ${task.attempt} -eq 1 ]]; then
echo "Failing first attempt on purpose"
exit 1
fi

echo "Second attempt succeeded"
"""
}

workflow {
ch = Channel.of(1)
LONG_SLEEP(ch)
SMALL_SLEEP_RETRY(ch)
}
Loading