diff --git a/docs/process-typed.md b/docs/process-typed.md index b6d0660ea1..94550cfeb4 100644 --- a/docs/process-typed.md +++ b/docs/process-typed.md @@ -241,6 +241,28 @@ process grep { } ``` +:::{versionadded} 26.04.0 +::: + +Input files can be staged using a *staging closure* instead of a file pattern: + +```nextflow +process ls { + input: + slice: Set + + stage: + stageAs(slice) { file -> "${file.parent.name}/${file.name}" } + + script: + """ + ls -1 */*.txt | sort + """ +} +``` + +The staging closure should define the stage name for a given input file. + See {ref}`process-reference-typed` for available stage directives. ## Outputs diff --git a/docs/reference/process.md b/docs/reference/process.md index 18bad7863c..3e9ab8f867 100644 --- a/docs/reference/process.md +++ b/docs/reference/process.md @@ -79,6 +79,9 @@ The following directives can be used in the `stage:` section of a typed process: `stageAs( value: Iterable, filePattern: String )` : Stages a collection of files into the task directory under the given alias. +`stageAs( value: Iterable, transform: (Path) -> String )` +: Stage a collection of files into the task directory with an alias determined by the given closure. + `stdin( value: String )` : Stages the given value as the standard input (i.e., `stdin`) to the task script. diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy index a965880259..9c276a6195 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskInputResolver.groovy @@ -88,7 +88,9 @@ class TaskInputResolver { List resolve(ProcessFileInput fileInput, Object value) { final ctx = task.context final normalized = normalizeInputToFiles(value, count, true) - final resolved = expandWildcards( fileInput.getFilePattern(ctx), normalized ) + final resolved = fileInput.getStagingClosure() != null + ? expandStagingClosure( fileInput.getStagingClosure(), normalized ) + : expandWildcards( fileInput.getFilePattern(ctx), normalized ) count += resolved.size() @@ -295,6 +297,24 @@ class TaskInputResolver { result.toString() } + /** + * When a staging closure is provided for a collection of files, + * stage each file by applying the staging closure. + * + * @param stager + * @param files + */ + protected static List expandStagingClosure(Closure stagingClosure, List files) { + assert files != null + + final result = new ArrayBag(files.size()) + for( final holder : files ) { + final stageName = stagingClosure.call(holder.getStorePath()) + result << holder.withName(stageName) + } + return result + } + protected static Object singleItemOrList( List items, boolean single, ScriptType type ) { assert items != null diff --git a/modules/nextflow/src/main/groovy/nextflow/script/params/v2/ProcessFileInput.groovy b/modules/nextflow/src/main/groovy/nextflow/script/params/v2/ProcessFileInput.groovy index 154a9d26f1..61a078707d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/params/v2/ProcessFileInput.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/params/v2/ProcessFileInput.groovy @@ -33,6 +33,11 @@ class ProcessFileInput { */ private Object filePattern + /** + * Closure that defines the stage name for a given input file. + */ + private Closure stagingClosure + /** * Lazy expression (e.g. closure) which defines which files * to stage in terms of the task inputs. @@ -40,13 +45,20 @@ class ProcessFileInput { */ private Object value - ProcessFileInput(Object filePattern, Object value) { - this.filePattern = filePattern != null ? filePattern : '*' + ProcessFileInput(Object patternOrClosure, Object value) { + if( patternOrClosure instanceof Closure && patternOrClosure.getMaximumNumberOfParameters() == 1 ) + this.stagingClosure = patternOrClosure + else + this.filePattern = patternOrClosure != null ? patternOrClosure : '*' this.value = value } String getFilePattern(Map ctx) { - return ctx.resolveLazy(filePattern) + return filePattern != null ? ctx.resolveLazy(filePattern) : null + } + + Closure getStagingClosure() { + return stagingClosure } Object resolve(Map ctx) { diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy index 103375a006..916deef9a4 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskInputResolverTest.groovy @@ -151,6 +151,27 @@ class TaskInputResolverTest extends Specification { and: task.context.samples*.toString() == ['sample_1.fastq', 'sample_2.fastq'] + when: 'staging a file collection with a staging closure' + task.context = new TaskContext(holder: [:]) + task.inputs = [:] + and: + param = Mock(InParam) { getName() >> 'reads' } + fileInput = new ProcessFileInput({ path -> "staged_${path.name}" }, { -> reads }) + value = [ Path.of('/other/foo.fq'), Path.of('/other/bar.fq')] + task.setInput(param, value) + result = resolver.resolve(fileInput, value) + task.context.put( param.name, resolver.normalizeValue(value, holdersMap(result)) ) + then: + 1 * executor.isForeignFile(_ as Path) >> false + 1 * executor.isForeignFile(_ as Path) >> false + and: + result[0].storePath == value[0] + result[0].stageName == 'staged_foo.fq' + result[1].storePath == value[1] + result[1].stageName == 'staged_bar.fq' + and: + task.context.reads*.toString() == ['staged_foo.fq', 'staged_bar.fq'] + when: 'staging a file in a record' task.context = new TaskContext(holder: [:]) task.inputs = [:] @@ -308,6 +329,31 @@ class TaskInputResolverTest extends Specification { list2 *. stageName == ['dir/bar01/titi.fa', 'dir/bar02/toto.fa'] } + def 'should expand staging closure'() { + + when: 'empty list' + def result = TaskInputResolver.expandStagingClosure({ path -> "staged_${path.name}" }, []) + then: + result.size() == 0 + + when: 'single file' + result = TaskInputResolver.expandStagingClosure( + { path -> "renamed_${path.name}" }, + [FileHolder.get('sample.fastq')] ) + then: + result.size() == 1 + result[0].stageName == 'renamed_sample.fastq' + + when: 'multiple files' + result = TaskInputResolver.expandStagingClosure( + { path -> "prefix_${path.name}" }, + [FileHolder.get('foo.txt'), FileHolder.get('bar.txt')] ) + then: + result.size() == 2 + result[0].stageName == 'prefix_foo.txt' + result[1].stageName == 'prefix_bar.txt' + } + @Unroll def 'should expand wildcards rule' () { diff --git a/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessDslV2Test.groovy b/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessDslV2Test.groovy index d9073d0ee2..e8ee267913 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessDslV2Test.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessDslV2Test.groovy @@ -40,7 +40,7 @@ class ProcessDslV2Test extends Specification { dsl._input_('infile', Path, false) dsl._input_('x', String, false) dsl._input_('y', String, false) - dsl.stageAs({ infile }, 'filename.fa') + dsl.stageAs({ -> infile }, 'filename.fa') dsl.stdin { y } then: @@ -112,7 +112,7 @@ class ProcessDslV2Test extends Specification { dsl.memory '10 GB' dsl._input_('foo', String, false) dsl._input_('sample', Path, false) - dsl.stageAs({ sample }, 'sample.txt') + dsl.stageAs({ -> sample }, 'sample.txt') dsl._output_('result', Path, { file('$file0') }) dsl._unstage_files('$file0', 'result.txt') diff --git a/modules/nextflow/src/test/groovy/nextflow/script/params/v2/ProcessFileInputTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/params/v2/ProcessFileInputTest.groovy index a27009d66f..fb2f200482 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/params/v2/ProcessFileInputTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/params/v2/ProcessFileInputTest.groovy @@ -45,6 +45,34 @@ class ProcessFileInputTest extends Specification { input.getFilePattern([id: 'sample1']) == 'sample1.txt' } + def 'should resolve staging closure'() { + given: + def stagingClosure + def input + + when: 'no staging closure is set' + input = new ProcessFileInput(null, null) + then: + input.getStagingClosure() == null + + when: 'a string pattern is set' + input = new ProcessFileInput('*.txt', null) + then: + input.getStagingClosure() == null + + when: 'a zero-param closure is used as file pattern' + input = new ProcessFileInput({ -> 'pattern.txt' }, null) + then: + input.getStagingClosure() == null + + when: 'a one-param closure is used as staging closure' + stagingClosure = { file -> "staged_${file}" } + input = new ProcessFileInput(stagingClosure, null) + then: + input.getStagingClosure() == stagingClosure + input.getFilePattern([:]) == null + } + def 'should resolve file value'() { given: def value diff --git a/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java b/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java index 627735ab81..2099bc64ed 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java +++ b/modules/nf-lang/src/main/java/nextflow/script/control/ProcessToGroovyVisitorV2.java @@ -120,7 +120,7 @@ private void visitProcessDirectives(Statement directives) { private void visitProcessStagers(Statement directives) { asDirectives(directives).forEach((call) -> { var arguments = asMethodCallArguments(call).stream() - .map(arg -> sgh.transformToLazy(arg)) + .map(arg -> sgh.transformToLazy(arg, false)) .toList(); call.setArguments(args(arguments)); }); diff --git a/modules/nf-lang/src/main/java/nextflow/script/control/ScriptToGroovyHelper.java b/modules/nf-lang/src/main/java/nextflow/script/control/ScriptToGroovyHelper.java index db771ef757..f2333050ce 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/control/ScriptToGroovyHelper.java +++ b/modules/nf-lang/src/main/java/nextflow/script/control/ScriptToGroovyHelper.java @@ -21,6 +21,7 @@ import java.util.Set; import org.codehaus.groovy.ast.CodeVisitorSupport; +import org.codehaus.groovy.ast.Parameter; import org.codehaus.groovy.ast.Variable; import org.codehaus.groovy.ast.expr.ClosureExpression; import org.codehaus.groovy.ast.expr.Expression; @@ -119,16 +120,23 @@ private static String asPropertyChain(PropertyExpression node) { * wrapping it in a closure if it references variables. * * @param node + * @param implicitParam */ - public Expression transformToLazy(Expression node) { + public Expression transformToLazy(Expression node, boolean implicitParam) { if( node instanceof ClosureExpression ) return node; var vars = new VariableCollector().collect(node); - if( !vars.isEmpty() ) - return closureX(stmt(node)); + if( !vars.isEmpty() ) { + var parameters = implicitParam ? Parameter.EMPTY_ARRAY : null; + return closureX(parameters, stmt(node)); + } return node; } + public Expression transformToLazy(Expression node) { + return transformToLazy(node, true); + } + private class VariableCollector extends CodeVisitorSupport { private Set vars; diff --git a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java index a75a875b3a..a9cc63ee12 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java +++ b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import groovy.transform.NamedParam; import groovy.transform.NamedParams; @@ -388,6 +389,11 @@ interface StageDsl extends DslScope { """) void stageAs(Iterable value, String filePattern); + @Description(""" + Stage a collection of files into the task directory with an alias determined by the given closure. + """) + void stageAs(Iterable value, Function transform); + @Description(""" Stage the given value as the standard input (i.e. `stdin`) to the task script. """) diff --git a/tests/checks/.IGNORE-PARSER-V2 b/tests/checks/.IGNORE-PARSER-V2 index 78480f28a1..495531eb71 100644 --- a/tests/checks/.IGNORE-PARSER-V2 +++ b/tests/checks/.IGNORE-PARSER-V2 @@ -9,6 +9,7 @@ output-dsl.nf params-dsl.nf record-types.nf records.nf +stage-file-closure.nf task-ext-block.nf topic-channel-typed.nf type-annotations.nf diff --git a/tests/checks/stage-file-closure.nf/.checks b/tests/checks/stage-file-closure.nf/.checks new file mode 100644 index 0000000000..1e21004508 --- /dev/null +++ b/tests/checks/stage-file-closure.nf/.checks @@ -0,0 +1,20 @@ +set -e + +# +# run normal mode +# +echo '' +$NXF_RUN | tee stdout +[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process'` == 2 ]] || false + +cmp stdout .expected || false + + +# +# RESUME mode +# +echo '' +$NXF_RUN -resume | tee stdout +[[ `grep 'INFO' .nextflow.log | grep -c 'Cached process'` == 2 ]] || false + +cmp stdout .expected || false diff --git a/tests/checks/stage-file-closure.nf/.expected b/tests/checks/stage-file-closure.nf/.expected new file mode 100644 index 0000000000..96c1f2f12a --- /dev/null +++ b/tests/checks/stage-file-closure.nf/.expected @@ -0,0 +1,10 @@ +group1/sample1.txt +group1/sample2.txt +group1/sample3.txt +group2/sample1.txt +group2/sample2.txt +group2/sample3.txt +group3/sample1.txt +group3/sample2.txt +group3/sample3.txt + diff --git a/tests/stage-file-closure.nf b/tests/stage-file-closure.nf new file mode 100644 index 0000000000..9ca1bff0cd --- /dev/null +++ b/tests/stage-file-closure.nf @@ -0,0 +1,41 @@ + +nextflow.preview.types = true + +process make_files { + input: + n_groups: Integer + group_size: Integer + + output: + files('*/*.txt') + + script: + """ + for i in `seq 1 ${n_groups}`; do + mkdir group\${i} + for j in `seq 1 ${group_size}`; do + touch group\${i}/sample\${j}.txt + done + done + """ +} + +process ls { + input: + slice: Set + + stage: + stageAs(slice) { file -> "${file.parent.name}/${file.name}" } + + output: + stdout() + + script: + """ + ls -1 */*.txt | sort + """ +} + +workflow { + ls(make_files(3, 3)).view() +}