diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 3a33bbc8a6..eb3c1a3f9f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -59,6 +59,7 @@ import nextflow.plugin.Plugins import nextflow.processor.ErrorStrategy import nextflow.processor.TaskFault import nextflow.processor.TaskHandler +import nextflow.processor.hash.TaskHasherFactory import nextflow.processor.TaskProcessor import nextflow.script.BaseScript import nextflow.script.ProcessFactory @@ -179,6 +180,11 @@ class Session implements ISession { */ String runName + /** + * The task hash strategy version + */ + TaskHasherFactory.Version hashStrategy + /** * Enable stub run mode */ @@ -385,6 +391,9 @@ class Session implements ISession { this.runName = config.runName ?: NameGenerator.next() log.debug "Run name: $runName" + // -- hash strategy + this.hashStrategy = TaskHasherFactory.Version.DEFAULT() + // -- dry run this.stubRun = config.stubRun diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 27bdd95f98..c933a51ad8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -570,7 +570,7 @@ class TaskConfig extends LazyMap implements Cloneable { } - protected TaskClosure getStubBlock() { + TaskClosure getStubBlock() { final code = target.get(NextflowDSLImpl.PROCESS_STUB) if( !code ) return null diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index f0ff2152f0..90e97d82f2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -99,6 +99,7 @@ import nextflow.script.types.Record import nextflow.script.types.Types import nextflow.trace.TraceRecord import nextflow.util.Escape +import nextflow.processor.hash.TaskHasherFactory import nextflow.util.HashBuilder import nextflow.util.LockManager import nextflow.util.TestOnly @@ -668,7 +669,7 @@ class TaskProcessor { // -- download foreign files session.filePorter.transfer(foreignFiles) - final hash = new TaskHasher(task).compute() + final hash = TaskHasherFactory.create(task).compute() checkCachedOrLaunchTask(task, hash, resumable) } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 4537cf623c..3b8e5a47a1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -1010,6 +1010,43 @@ class TaskRun implements Cloneable { return result } + /** + * Get the mapping of global variables referenced by the task script, + * including {@code task.ext.*} directive variables. + */ + Map getTaskGlobalVars() { + final result = getGlobalVars(processor.getOwnerScript().getBinding()) + final variableNames = getVariableNames() + final taskConfig = config + for( final key : variableNames ) { + if( !key.startsWith('task.ext.') ) + continue + final value = taskConfig.eval(key.substring(5)) + result.put(key, value) + } + return result + } + + /** + * Scan the task command string looking for invocations of scripts + * defined in the project bin folder. + * + * @param script The task command string + * @return The list of paths of scripts in the project bin folder referenced in the task command + */ + List getTaskBinEntries(String script) { + List result = [] + final entries = processor.session.binEntries + final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`") + while( tokenizer.hasMoreTokens() ) { + final token = tokenizer.nextToken() + final path = entries.get(token) + if( path ) + result.add(path) + } + return result + } + TaskBean toTaskBean() { return new TaskBean(this) } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/hash/AbstractTaskHasher.groovy similarity index 62% rename from modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy rename to modules/nextflow/src/main/groovy/nextflow/processor/hash/AbstractTaskHasher.groovy index 723488a6de..844fc84ba4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHasher.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/hash/AbstractTaskHasher.groovy @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package nextflow.processor +package nextflow.processor.hash import java.nio.file.Path @@ -24,29 +24,41 @@ import groovy.transform.Memoized import groovy.util.logging.Slf4j import nextflow.Session import nextflow.exception.UnexpectedException +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun import nextflow.util.CacheHelper +import nextflow.util.HashBuilder /** - * Implement task hash computation + * Common logic for task hash computation strategies. + * + * Subclasses implement {@link #createHashBuilder()} to configure + * version-specific hashing behaviour. * * @author Paolo Di Tommaso */ @Slf4j @CompileStatic -class TaskHasher { +abstract class AbstractTaskHasher implements TaskHasher { - private TaskRun task + protected TaskRun task - private TaskProcessor processor + protected TaskProcessor processor - private Session session + protected Session session - public TaskHasher(TaskRun task) { + AbstractTaskHasher(TaskRun task) { this.task = task this.processor = task.processor this.session = task.processor.session } - public HashCode compute() { + /** + * Create a {@link HashBuilder} configured for this hashing strategy. + */ + abstract protected HashBuilder createHashBuilder() + + @Override + HashCode compute() { final keys = new ArrayList() @@ -141,94 +153,39 @@ class TaskHasher { /** * Compute a deterministic string representation of eval output commands for cache hashing. - * This method creates a consistent hash key based on the semantic names and command values - * of eval outputs, ensuring cache invalidation when eval outputs change. - * - * @param outEvals Map of eval parameter names to their command strings - * @return A concatenated string of "name=command" pairs, sorted for deterministic hashing */ protected static String computeEvalOutputCommands(Map outEvals) { - // Assert precondition that outEvals should not be null or empty when this method is called assert outEvals != null && !outEvals.isEmpty(), "Eval outputs should not be null or empty" final result = new StringBuilder() - - // Sort entries by key for deterministic ordering. This ensures that the same set of - // eval outputs always produces the same hash regardless of map iteration order, - // which is critical for cache consistency across different JVM runs. - // Without sorting, HashMap iteration order can vary between executions, leading to - // different cache keys for identical eval output configurations and causing - // unnecessary cache misses and task re-execution final sortedEntries = outEvals.entrySet().sort { a, b -> a.key.compareTo(b.key) } - // Build content using for loop to concatenate "name=command" pairs. - // This creates a symmetric pattern with input parameter hashing where both - // the parameter name and its value contribute to the cache key for( final entry : sortedEntries ) { - // Add newline separator between entries for readability in debug scenarios if( result.length() > 0 ) { result.append('\n') } - // Format: "semantic_name=bash_command" - both name and command value are - // included because changing either should invalidate the task cache result.append(entry.key).append('=').append(entry.value) } return result.toString() } - /** - * Get the mapping of global variables that were referenced by - * the task script, excluding references to `task.ext`. - */ - Map getTaskGlobalVars() { - final result = task.getGlobalVars(task.processor.getOwnerScript().getBinding()) - final directives = getTaskExtensionDirectiveVars() - result.putAll(directives) - return result - } - - protected Map getTaskExtensionDirectiveVars() { - final variableNames = task.getVariableNames() - final result = new HashMap(variableNames.size()) - final taskConfig = task.config - for( final key : variableNames ) { - if( !key.startsWith('task.ext.') ) - continue - final value = taskConfig.eval(key.substring(5)) - result.put(key, value) - } - - return result + protected Map getTaskGlobalVars() { + return task.getTaskGlobalVars() } - /** - * This method scans the task command string looking for invocations of scripts - * defined in the project bin folder. - * - * @param script The task command string - * @return The list of paths of scripts in the project bin folder referenced in the task command - */ @Memoized - List getTaskBinEntries(String script) { - List result = [] - final tokenizer = new StringTokenizer(script, " \t\n\r\f()[]{};&|<>`") - while( tokenizer.hasMoreTokens() ) { - final token = tokenizer.nextToken() - final path = session.binEntries.get(token) - if( path ) - result.add(path) - } - return result + protected List getTaskBinEntries(String script) { + return task.getTaskBinEntries(script) } private String safeTaskName(TaskRun task) { return task != null ? task.lazyName() : task.processor.name } - private HashCode computeHash(List keys, CacheHelper.HashMode mode) { + protected HashCode computeHash(List keys, CacheHelper.HashMode mode) { try { - return CacheHelper.hasher(keys, mode).hash() + return createHashBuilder().withMode(mode).with(keys).build() } catch (Throwable e) { final msg = "Something went wrong while creating task hash for process '${task.processor.name}' -- Offending keys: ${ keys.collect { k -> "\n - type=${k.getClass().getName()} value=$k" } }" @@ -238,7 +195,7 @@ class TaskHasher { private void dumpHashEntriesJson(TaskRun task, List entries, CacheHelper.HashMode mode, hash) { final collector = (item) -> [ - hash: CacheHelper.hasher(item, mode).hash().toString(), + hash: createHashBuilder().withMode(mode).with(item).build().toString(), type: item?.getClass()?.getName(), value: item?.toString() ] @@ -250,7 +207,7 @@ class TaskHasher { final buffer = new StringBuilder() buffer.append("[${safeTaskName(task)}] cache hash: ${hash}; mode: $mode; entries: \n") for( final entry : entries ) { - buffer.append( " ${CacheHelper.hasher(entry, mode).hash()} [${entry?.getClass()?.getName()}] $entry \n") + buffer.append( " ${createHashBuilder().withMode(mode).with(entry).build()} [${entry?.getClass()?.getName()}] $entry \n") } log.info(buffer.toString()) } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasher.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasher.groovy new file mode 100644 index 0000000000..3e2ab62797 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasher.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.processor.hash + +import com.google.common.hash.HashCode +import groovy.transform.CompileStatic +/** + * Define the interface for task hash computation + * + * @author Paolo Di Tommaso + */ +@CompileStatic +interface TaskHasher { + + HashCode compute() +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherFactory.groovy new file mode 100644 index 0000000000..f53b76a447 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherFactory.groovy @@ -0,0 +1,66 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.processor.hash + +import groovy.transform.CompileStatic +import nextflow.SysEnv +import nextflow.processor.TaskRun +/** + * Factory for creating versioned {@link TaskHasher} instances. + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class TaskHasherFactory { + + enum Version { + STD_V1('std/v1'), + STD_V2('std/v2') + + final String value + + Version(String value) { + this.value = value + } + + String toString() { value } + + static Version of(String val) { + for( Version v : values() ) { + if( v.value == val ) + return v + } + throw new IllegalArgumentException("Unknown task hasher version: ${val}") + } + + static Version DEFAULT() { + final val = SysEnv.get('NXF_TASK_HASH_VER') + return val ? of(val) : STD_V2 + } + } + + static TaskHasher create(TaskRun task) { + final version = task.processor.session.hashStrategy + switch( version ) { + case Version.STD_V1: + return new TaskHasherV1(task) + case Version.STD_V2: + return new TaskHasherV2(task) + default: + throw new IllegalArgumentException("Unknown task hasher version: ${version}") + } + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV1.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV1.groovy new file mode 100644 index 0000000000..8f006248c8 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV1.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.processor.hash + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.processor.TaskRun +import nextflow.util.HashBuilder +/** + * V1 task hash computation strategy. + * + * This is the original hashing behavior before the record types change. + * Maps are hashed by values only (order-dependent) and CacheFunnel + * is checked after Map and SerializableMarker. + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class TaskHasherV1 extends AbstractTaskHasher { + + TaskHasherV1(TaskRun task) { + super(task) + } + + @Override + protected HashBuilder createHashBuilder() { + return new HashBuilder() + .withOrderIndependentMaps(false) + .withCacheFunnelFirst(false) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV2.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV2.groovy new file mode 100644 index 0000000000..b0369a64d4 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/hash/TaskHasherV2.groovy @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.processor.hash + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.processor.TaskRun +import nextflow.util.HashBuilder +/** + * V2 task hash computation strategy. + * + * This version uses order-independent Map hashing (via entrySet) + * and checks CacheFunnel before Map in the hash builder. + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +class TaskHasherV2 extends AbstractTaskHasher { + + TaskHasherV2(TaskRun task) { + super(task) + } + + @Override + protected HashBuilder createHashBuilder() { + return new HashBuilder() + .withOrderIndependentMaps(true) + .withCacheFunnelFirst(true) + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy index 03d05751df..9a710fbfa0 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy @@ -28,6 +28,8 @@ import nextflow.container.resolver.ContainerInfo import nextflow.container.resolver.ContainerMeta import nextflow.container.resolver.ContainerResolver import nextflow.executor.Executor +import nextflow.script.BaseScript +import nextflow.script.ProcessConfig import nextflow.file.FileHolder import nextflow.script.BodyDef import nextflow.script.ScriptBinding @@ -984,4 +986,37 @@ class TaskRunTest extends Specification { task.template == file task.traceScript == 'echo Ciao mondo' } + + def 'should get task extension directive vars' () { + given: + def binding = new ScriptBinding() + def ownerScript = Mock(BaseScript) { + getBinding() >> binding + } + def processor = Mock(TaskProcessor) { + getConfig() >> Mock(ProcessConfig) + getOwnerScript() >> ownerScript + } + and: + def config = new TaskConfig() + config.cpus = 4 + config.ext.alpha = 'AAAA' + config.ext.delta = { foo } + config.ext.omega = "${-> bar}" + config.context = [foo: 'DDDD', bar: 'OOOO'] + and: + def task = Spy(TaskRun) + task.processor = processor + task.config = config + task.context = Mock(TaskContext) + task.getVariableNames() >> { [ 'task.cpus', 'task.ext.alpha', 'task.ext.delta', 'task.ext.omega' ] as Set } + + when: + def result = task.getTaskGlobalVars() + then: + result['task.ext.alpha'] == 'AAAA' + result['task.ext.delta'] == 'DDDD' + result['task.ext.omega'] == 'OOOO' + !result.containsKey('task.cpus') + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherFactoryTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherFactoryTest.groovy new file mode 100644 index 0000000000..d99b28af01 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherFactoryTest.groovy @@ -0,0 +1,113 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor.hash + +import nextflow.Session +import nextflow.SysEnv +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun +import spock.lang.Specification +/** + * + * @author Paolo Di Tommaso + */ +class TaskHasherFactoryTest extends Specification { + + def 'should create v1 hasher when configured'() { + given: + def session = Mock(Session) { + getHashStrategy() >> TaskHasherFactory.Version.STD_V1 + } + def processor = Mock(TaskProcessor) { + getSession() >> session + } + def task = Mock(TaskRun) { + getProcessor() >> processor + } + + when: + def hasher = TaskHasherFactory.create(task) + + then: + hasher instanceof TaskHasherV1 + hasher.class == TaskHasherV1 + } + + def 'should create v2 hasher when configured'() { + given: + def session = Mock(Session) { + getHashStrategy() >> TaskHasherFactory.Version.STD_V2 + } + def processor = Mock(TaskProcessor) { + getSession() >> session + } + def task = Mock(TaskRun) { + getProcessor() >> processor + } + + when: + def hasher = TaskHasherFactory.create(task) + + then: + hasher instanceof TaskHasherV2 + hasher.class == TaskHasherV2 + } + + def 'should resolve version from env var'() { + given: + SysEnv.push(['NXF_TASK_HASH_VER': 'std/v1']) + + expect: + TaskHasherFactory.Version.DEFAULT() == TaskHasherFactory.Version.STD_V1 + + cleanup: + SysEnv.pop() + } + + def 'should default to std/v2 when env var not set'() { + given: + SysEnv.push([:]) + + expect: + TaskHasherFactory.Version.DEFAULT() == TaskHasherFactory.Version.STD_V2 + + cleanup: + SysEnv.pop() + } + + def 'should have correct string values'() { + expect: + TaskHasherFactory.Version.STD_V1.value == 'std/v1' + TaskHasherFactory.Version.STD_V2.value == 'std/v2' + TaskHasherFactory.Version.STD_V1.toString() == 'std/v1' + TaskHasherFactory.Version.STD_V2.toString() == 'std/v2' + } + + def 'should resolve version from string value'() { + expect: + TaskHasherFactory.Version.of('std/v1') == TaskHasherFactory.Version.STD_V1 + TaskHasherFactory.Version.of('std/v2') == TaskHasherFactory.Version.STD_V2 + } + + def 'should throw on unknown version string'() { + when: + TaskHasherFactory.Version.of('unknown') + + then: + thrown(IllegalArgumentException) + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherTest.groovy similarity index 69% rename from modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy rename to modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherTest.groovy index 4e7461205e..8076ea5abb 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHasherTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/hash/TaskHasherTest.groovy @@ -14,11 +14,14 @@ * limitations under the License. */ -package nextflow.processor +package nextflow.processor.hash import java.nio.file.Path import nextflow.Session +import nextflow.processor.TaskConfig +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun import nextflow.script.ProcessConfig import spock.lang.Specification /** @@ -48,7 +51,7 @@ class TaskHasherTest extends Specification { getProcessor() >> processor } and: - def hasher = Spy(new TaskHasher(task)) + def hasher = Spy(new TaskHasherV1(task)) when: def uuid1 = hasher.compute() @@ -75,64 +78,33 @@ class TaskHasherTest extends Specification { getName() >> 'hello' getSession() >> session } - def task = Mock(TaskRun) { - getProcessor() >> processor - } - def hasher = new TaskHasher(task) + def task = new TaskRun() + task.processor = processor when: - def result = hasher.getTaskBinEntries('var=x foo.sh') + def result = task.getTaskBinEntries('var=x foo.sh') then: result.size() == 1 result.contains(Path.of('/some/path/foo.sh')) when: - result = hasher.getTaskBinEntries('echo $(foo.sh); bar.sh') + result = task.getTaskBinEntries('echo $(foo.sh); bar.sh') then: result.size() == 2 result.contains(Path.of('/some/path/foo.sh')) result.contains(Path.of('/some/path/bar.sh')) } - def 'should get task directive vars' () { - given: - def processor = Spy(TaskProcessor) { - getConfig() >> Mock(ProcessConfig) - } - and: - def config = new TaskConfig() - config.cpus = 4 - config.ext.alpha = 'AAAA' - config.ext.delta = { foo } - config.ext.omega = "${-> bar}" - config.context = [foo: 'DDDD', bar: 'OOOO'] - and: - def task = Mock(TaskRun) { - getConfig() >> config - getProcessor() >> processor - getVariableNames() >> { [ 'task.cpus', 'task.ext.alpha', 'task.ext.delta', 'task.ext.omega' ] as Set } - } - - when: - def result = new TaskHasher(task).getTaskExtensionDirectiveVars() - then: - result == [ - 'task.ext.alpha': 'AAAA', - 'task.ext.delta': 'DDDD', - 'task.ext.omega': 'OOOO', - ] - } - def 'should compute hash entries for eval outputs'() { when: - def result1 = TaskHasher.computeEvalOutputCommands([ + def result1 = AbstractTaskHasher.computeEvalOutputCommands([ 'nxf_out_eval_2': 'echo "value2"', 'nxf_out_eval_1': 'echo "value1"', 'nxf_out_eval_3': 'echo "value3"' ]) - def result2 = TaskHasher.computeEvalOutputCommands([ + def result2 = AbstractTaskHasher.computeEvalOutputCommands([ 'nxf_out_eval_3': 'echo "value3"', 'nxf_out_eval_1': 'echo "value1"', 'nxf_out_eval_2': 'echo "value2"' diff --git a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java index 8c14e282cb..825f340e7b 100644 --- a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java +++ b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java @@ -85,8 +85,32 @@ public static Hasher defaultHasher() { private Path basePath; + private boolean orderIndependentMaps = true; + + private boolean cacheFunnelFirst = true; + public HashBuilder() {} + /** + * When {@code true} (default), Maps are hashed using order-independent + * hashing on their entrySet. When {@code false}, only the values are + * hashed in iteration order (legacy V1 behaviour). + */ + public HashBuilder withOrderIndependentMaps(boolean value) { + this.orderIndependentMaps = value; + return this; + } + + /** + * When {@code true} (default), {@link CacheFunnel} is checked before + * Map and SerializableMarker. When {@code false}, it is checked after + * them (legacy V1 behaviour). + */ + public HashBuilder withCacheFunnelFirst(boolean value) { + this.cacheFunnelFirst = value; + return this; + } + public HashBuilder withHasher(Hasher hasher) { this.hasher = hasher; return this; @@ -146,11 +170,16 @@ else if( value instanceof Object[]) { with(item); } - else if( value instanceof CacheFunnel ) + else if( cacheFunnelFirst && value instanceof CacheFunnel ) ((CacheFunnel)value).funnel(hasher, mode); - else if( value instanceof Map ) - hashUnorderedCollection(hasher, ((Map) value).entrySet(), mode); + else if( value instanceof Map ) { + if( orderIndependentMaps ) + hashUnorderedCollection(hasher, ((Map) value).entrySet(), mode); + else + for( Object item : ((Map)value).values() ) + with(item); + } else if( value instanceof Map.Entry ) { Map.Entry entry = (Map.Entry)value; @@ -182,6 +211,9 @@ else if( value instanceof VersionNumber ) else if( value instanceof SerializableMarker) hasher.putInt( value.hashCode() ); + else if( !cacheFunnelFirst && value instanceof CacheFunnel ) + ((CacheFunnel)value).funnel(hasher, mode); + else if( value instanceof Enum ) hasher.putUnencodedChars( value.getClass().getName() + "." + value ); diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index fc0ddfb4b0..5ec791c1ad 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -38,7 +38,6 @@ import nextflow.lineage.model.v1beta1.WorkflowOutput import nextflow.lineage.model.v1beta1.WorkflowRun import nextflow.file.FileHelper import nextflow.file.FileHolder -import nextflow.processor.TaskHasher import nextflow.processor.TaskRun import nextflow.script.ScriptMeta import nextflow.script.params.BaseParam @@ -273,11 +272,11 @@ class LinObserver implements TraceObserverV2 { } protected Map getTaskGlobalVars(TaskRun task) { - return new TaskHasher(task).getTaskGlobalVars() + return task.getTaskGlobalVars() } protected List getTaskBinEntries(TaskRun task) { - return new TaskHasher(task).getTaskBinEntries(task.source) + return task.getTaskBinEntries(task.source) } protected String storeTaskOutput(TaskRun task, Path path) {