Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/process-typed.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,28 @@ process grep {
}
```

:::{versionadded} 26.04.0
:::

Input files can be staged using a *staging closure* instead of a file pattern:

```nextflow
process ls {
input:
slice: Set<Path>

stage:
stageAs(slice) { file -> "${file.parent.name}/${file.name}" }

script:
"""
ls -1 */*.txt | sort
"""
}
```

The staging closure should define the stage name for a given input file.

See {ref}`process-reference-typed` for available stage directives.

## Outputs
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ The following directives can be used in the `stage:` section of a typed process:
`stageAs( value: Iterable<Path>, filePattern: String )`
: Stages a collection of files into the task directory under the given alias.

`stageAs( value: Iterable<Path>, transform: (Path) -> String )`
: Stage a collection of files into the task directory with an alias determined by the given closure.

`stdin( value: String )`
: Stages the given value as the standard input (i.e., `stdin`) to the task script.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class TaskInputResolver {
List<FileHolder> resolve(ProcessFileInput fileInput, Object value) {
final ctx = task.context
final normalized = normalizeInputToFiles(value, count, true)
final resolved = expandWildcards( fileInput.getFilePattern(ctx), normalized )
final resolved = fileInput.getStagingClosure() != null
? expandStagingClosure( fileInput.getStagingClosure(), normalized )
: expandWildcards( fileInput.getFilePattern(ctx), normalized )

count += resolved.size()

Expand Down Expand Up @@ -295,6 +297,24 @@ class TaskInputResolver {
result.toString()
}

/**
* When a staging closure is provided for a collection of files,
* stage each file by applying the staging closure.
*
* @param stager
* @param files
*/
protected static List<FileHolder> expandStagingClosure(Closure stagingClosure, List<FileHolder> files) {
assert files != null

final result = new ArrayBag(files.size())
for( final holder : files ) {
final stageName = stagingClosure.call(holder.getStorePath())
result << holder.withName(stageName)
}
return result
Comment on lines +310 to +315
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lehmann-Fabian the main difference I saw with your PR is that it seems like you allowed the staging closure to still use glob patterns

I wasn't sure if this is actually needed, and the implementation is much simpler if we say that the staging closure must return a fully-resolved file name (no glob patterns), so I left it out

But let me know if you actually needed this

}

protected static Object singleItemOrList( List<FileHolder> items, boolean single, ScriptType type ) {
assert items != null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,32 @@ class ProcessFileInput {
*/
private Object filePattern

/**
* Closure that defines the stage name for a given input file.
*/
private Closure stagingClosure

/**
* Lazy expression (e.g. closure) which defines which files
* to stage in terms of the task inputs.
* It is evaluated for each task against the task context.
*/
private Object value

ProcessFileInput(Object filePattern, Object value) {
this.filePattern = filePattern != null ? filePattern : '*'
ProcessFileInput(Object patternOrClosure, Object value) {
if( patternOrClosure instanceof Closure && patternOrClosure.getMaximumNumberOfParameters() == 1 )
this.stagingClosure = patternOrClosure
else
this.filePattern = patternOrClosure != null ? patternOrClosure : '*'
this.value = value
}

String getFilePattern(Map ctx) {
return ctx.resolveLazy(filePattern)
return filePattern != null ? ctx.resolveLazy(filePattern) : null
}

Closure getStagingClosure() {
return stagingClosure
}

Object resolve(Map ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,27 @@ class TaskInputResolverTest extends Specification {
and:
task.context.samples*.toString() == ['sample_1.fastq', 'sample_2.fastq']

when: 'staging a file collection with a staging closure'
task.context = new TaskContext(holder: [:])
task.inputs = [:]
and:
param = Mock(InParam) { getName() >> 'reads' }
fileInput = new ProcessFileInput({ path -> "staged_${path.name}" }, { -> reads })
value = [ Path.of('/other/foo.fq'), Path.of('/other/bar.fq')]
task.setInput(param, value)
result = resolver.resolve(fileInput, value)
task.context.put( param.name, resolver.normalizeValue(value, holdersMap(result)) )
then:
1 * executor.isForeignFile(_ as Path) >> false
1 * executor.isForeignFile(_ as Path) >> false
and:
result[0].storePath == value[0]
result[0].stageName == 'staged_foo.fq'
result[1].storePath == value[1]
result[1].stageName == 'staged_bar.fq'
and:
task.context.reads*.toString() == ['staged_foo.fq', 'staged_bar.fq']

when: 'staging a file in a record'
task.context = new TaskContext(holder: [:])
task.inputs = [:]
Expand Down Expand Up @@ -308,6 +329,31 @@ class TaskInputResolverTest extends Specification {
list2 *. stageName == ['dir/bar01/titi.fa', 'dir/bar02/toto.fa']
}

def 'should expand staging closure'() {

when: 'empty list'
def result = TaskInputResolver.expandStagingClosure({ path -> "staged_${path.name}" }, [])
then:
result.size() == 0

when: 'single file'
result = TaskInputResolver.expandStagingClosure(
{ path -> "renamed_${path.name}" },
[FileHolder.get('sample.fastq')] )
then:
result.size() == 1
result[0].stageName == 'renamed_sample.fastq'

when: 'multiple files'
result = TaskInputResolver.expandStagingClosure(
{ path -> "prefix_${path.name}" },
[FileHolder.get('foo.txt'), FileHolder.get('bar.txt')] )
then:
result.size() == 2
result[0].stageName == 'prefix_foo.txt'
result[1].stageName == 'prefix_bar.txt'
}

@Unroll
def 'should expand wildcards rule' () {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ProcessDslV2Test extends Specification {
dsl._input_('infile', Path, false)
dsl._input_('x', String, false)
dsl._input_('y', String, false)
dsl.stageAs({ infile }, 'filename.fa')
dsl.stageAs({ -> infile }, 'filename.fa')
dsl.stdin { y }

then:
Expand Down Expand Up @@ -112,7 +112,7 @@ class ProcessDslV2Test extends Specification {
dsl.memory '10 GB'
dsl._input_('foo', String, false)
dsl._input_('sample', Path, false)
dsl.stageAs({ sample }, 'sample.txt')
dsl.stageAs({ -> sample }, 'sample.txt')
dsl._output_('result', Path, { file('$file0') })
dsl._unstage_files('$file0', 'result.txt')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,34 @@ class ProcessFileInputTest extends Specification {
input.getFilePattern([id: 'sample1']) == 'sample1.txt'
}

def 'should resolve staging closure'() {
given:
def stagingClosure
def input

when: 'no staging closure is set'
input = new ProcessFileInput(null, null)
then:
input.getStagingClosure() == null

when: 'a string pattern is set'
input = new ProcessFileInput('*.txt', null)
then:
input.getStagingClosure() == null

when: 'a zero-param closure is used as file pattern'
input = new ProcessFileInput({ -> 'pattern.txt' }, null)
then:
input.getStagingClosure() == null

when: 'a one-param closure is used as staging closure'
stagingClosure = { file -> "staged_${file}" }
input = new ProcessFileInput(stagingClosure, null)
then:
input.getStagingClosure() == stagingClosure
input.getFilePattern([:]) == null
}

def 'should resolve file value'() {
given:
def value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void visitProcessDirectives(Statement directives) {
private void visitProcessStagers(Statement directives) {
asDirectives(directives).forEach((call) -> {
var arguments = asMethodCallArguments(call).stream()
.map(arg -> sgh.transformToLazy(arg))
.map(arg -> sgh.transformToLazy(arg, false))
.toList();
call.setArguments(args(arguments));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;

import org.codehaus.groovy.ast.CodeVisitorSupport;
import org.codehaus.groovy.ast.Parameter;
import org.codehaus.groovy.ast.Variable;
import org.codehaus.groovy.ast.expr.ClosureExpression;
import org.codehaus.groovy.ast.expr.Expression;
Expand Down Expand Up @@ -119,16 +120,23 @@ private static String asPropertyChain(PropertyExpression node) {
* wrapping it in a closure if it references variables.
*
* @param node
* @param implicitParam
*/
public Expression transformToLazy(Expression node) {
public Expression transformToLazy(Expression node, boolean implicitParam) {
if( node instanceof ClosureExpression )
return node;
var vars = new VariableCollector().collect(node);
if( !vars.isEmpty() )
return closureX(stmt(node));
if( !vars.isEmpty() ) {
var parameters = implicitParam ? Parameter.EMPTY_ARRAY : null;
return closureX(parameters, stmt(node));
}
return node;
}

public Expression transformToLazy(Expression node) {
return transformToLazy(node, true);
}

private class VariableCollector extends CodeVisitorSupport {

private Set<Variable> vars;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import groovy.transform.NamedParam;
import groovy.transform.NamedParams;
Expand Down Expand Up @@ -388,6 +389,11 @@ interface StageDsl extends DslScope {
""")
void stageAs(Iterable<Path> value, String filePattern);

@Description("""
Stage a collection of files into the task directory with an alias determined by the given closure.
""")
void stageAs(Iterable<Path> value, Function<Path,String> transform);

@Description("""
Stage the given value as the standard input (i.e. `stdin`) to the task script.
""")
Expand Down
1 change: 1 addition & 0 deletions tests/checks/.IGNORE-PARSER-V2
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ output-dsl.nf
params-dsl.nf
record-types.nf
records.nf
stage-file-closure.nf
task-ext-block.nf
topic-channel-typed.nf
type-annotations.nf
Expand Down
20 changes: 20 additions & 0 deletions tests/checks/stage-file-closure.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
set -e

#
# run normal mode
#
echo ''
$NXF_RUN | tee stdout
[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process'` == 2 ]] || false

cmp stdout .expected || false


#
# RESUME mode
#
echo ''
$NXF_RUN -resume | tee stdout
[[ `grep 'INFO' .nextflow.log | grep -c 'Cached process'` == 2 ]] || false

cmp stdout .expected || false
10 changes: 10 additions & 0 deletions tests/checks/stage-file-closure.nf/.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
group1/sample1.txt
group1/sample2.txt
group1/sample3.txt
group2/sample1.txt
group2/sample2.txt
group2/sample3.txt
group3/sample1.txt
group3/sample2.txt
group3/sample3.txt

41 changes: 41 additions & 0 deletions tests/stage-file-closure.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

nextflow.preview.types = true

process make_files {
input:
n_groups: Integer
group_size: Integer

output:
files('*/*.txt')

script:
"""
for i in `seq 1 ${n_groups}`; do
mkdir group\${i}
for j in `seq 1 ${group_size}`; do
touch group\${i}/sample\${j}.txt
done
done
"""
}

process ls {
input:
slice: Set<Path>

stage:
stageAs(slice) { file -> "${file.parent.name}/${file.name}" }

output:
stdout()

script:
"""
ls -1 */*.txt | sort
"""
}

workflow {
ls(make_files(3, 3)).view()
}
Loading