Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,26 @@ In most cases, you won't need to use dynamic file names, because each task is ex
An example of when you may have to deal with that is when you have many input files in a task, and some of these files may have the same filename. In this case, a solution would be to use the `stageAs` option.
:::

### Dynamic input multiple file names

In some cases, it might be necessary to stage in multiple files, but keep a folder hierarchy or change the
naming for each file individually.
Therefore, you can access the sourceObj and storePath of each input file.:

```groovy
fasta = Channel.fromPath( "/root/*/*.fa" ).buffer(size:10, remainder: true)
process blastThemAll {

input:
file {"${sourceObj.parent}/${sourceObj.name}.fa"} from fasta

"""
find . -name "*"
"""

}
```

### Input type `env`

The `env` qualifier allows you to define an environment variable in the process execution context based on the input value. For example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1868,30 +1868,45 @@ class TaskProcessor {
* @return
*/
@CompileStatic
protected List<FileHolder> expandWildcards( String name, List<FileHolder> files ) {
protected List<FileHolder> expandWildcards( FileInParam fileInParam, List<FileHolder> files, TaskContext ctx = null ) {
assert files != null

// use an unordered so that cache hash key is not affected by file entries order
final result = new ArrayBag(files.size())
if( files.size()==0 ) { return result }

if( !name || name == '*' ) {
result.addAll(files)
return result
}

if( !name.contains('*') && !name.contains('?') && files.size()>1 ) {
/*
* When name do not contain any wildcards *BUT* multiple files are provide
* it is managed like having a 'star' at the end of the file name
*/
name += '*'
List<String> names = []
Map<String,Integer> namesMap = [:]
for( int i=0; i<files.size(); i++ ) {
FileHolder holder = files[i]
Map tmp = [storePath:holder.storePath,sourceObj:holder.sourceObj]
String newName = fileInParam.getFilePattern( ctx ? ctx as Map + tmp : tmp ) ?: '*'
//Count occurrence
namesMap.put( newName, (namesMap.get( newName ) ?: 0) + 1 )
names << newName
}

//replace for every name
Map<String,Integer> namesMapIndex = [:]
for( int i=0; i<files.size(); i++ ) {
def holder = files[i]
def newName = expandWildcards0(name, holder.stageName, i+1, files.size())
result << holder.withName( newName )
FileHolder holder = files[i]
String newName = names[i]
String newNameWithoutStar = newName
if ( newName != '*' ) {
if (!newName.contains('*') && !newName.contains('?') && namesMap.get(newName) > 1) {
/*
* When name do not contain any wildcards *BUT* multiple files are provide
* it is managed like having a 'star' at the end of the file name
*/
newName += '*'
}
int cindex = namesMapIndex.getOrDefault(newNameWithoutStar, 0) + 1
newName = expandWildcards0(newName, holder.stageName, cindex, namesMap.get(newNameWithoutStar))
namesMapIndex.put( newNameWithoutStar, cindex )
result << holder.withName( newName )
} else {
result << holder
}
}

return result
Expand Down Expand Up @@ -2030,7 +2045,7 @@ class TaskProcessor {
final val = entry.getValue()
final fileParam = param as FileInParam
final normalized = normalizeInputToFiles(val, count, fileParam.isPathQualifier(), batch)
final resolved = expandWildcards( fileParam.getFilePattern(ctx), normalized )
final resolved = expandWildcards( fileParam, normalized, ctx )
ctx.put( param.name, singleItemOrList(resolved, task.type) )
count += resolved.size()
for( FileHolder item : resolved ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ class FileInParam extends BaseInParam implements PathQualifier {
return this
}

boolean stageAsByClosure(){
Object value = null

if( filePattern != null ) {
value = filePattern
}else if( bindObject instanceof Map ) {
assert !pathQualifier
def entry = bindObject.entrySet().first()
value = entry?.value
} else if( bindObject != null ) {
value = bindObject
}

return value instanceof Closure
}

String getFilePattern(Map ctx = null) {

if( filePattern != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.processor

import nextflow.script.params.FileInParam

import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -59,6 +61,12 @@ class TaskProcessorTest extends Specification {
@Override protected void createOperator() { }
}

FileInParam getInputParam( Object value ){
FileInParam inParam = new FileInParam(Mock(Binding), [])
inParam.bind( value )
return inParam
}


def 'should filter hidden files'() {

Expand Down Expand Up @@ -208,8 +216,8 @@ class TaskProcessorTest extends Specification {
* an index number is added to the specified name
*/
when:
def list1 = processor.expandWildcards('file_name', [FileHolder.get('x')])
def list2 = processor.expandWildcards('file_name', [FileHolder.get('x'), FileHolder.get('y')] )
def list1 = processor.expandWildcards( getInputParam('file_name'), [FileHolder.get('x')])
def list2 = processor.expandWildcards( getInputParam('file_name'), [FileHolder.get('x'), FileHolder.get('y')] )
then:
list1 *. stageName == ['file_name']
list2 *. stageName == ['file_name1', 'file_name2']
Expand All @@ -220,8 +228,8 @@ class TaskProcessorTest extends Specification {
* When a collection of files is provided, the name is expanded to the index number
*/
when:
list1 = processor.expandWildcards('file*.fa', [FileHolder.get('x')])
list2 = processor.expandWildcards('file_*.fa', [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
list1 = processor.expandWildcards( getInputParam('file*.fa'), [FileHolder.get('x')])
list2 = processor.expandWildcards( getInputParam('file_*.fa'), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
Expand All @@ -235,9 +243,9 @@ class TaskProcessorTest extends Specification {
def p0 = [FileHolder.get('0')]
def p1_p4 = (1..4).collect { FileHolder.get(it.toString()) }
def p1_p12 = (1..12).collect { FileHolder.get(it.toString()) }
list1 = processor.expandWildcards('file?.fa', p0 )
list2 = processor.expandWildcards('file_???.fa', p1_p4 )
def list3 = processor.expandWildcards('file_?.fa', p1_p12 )
list1 = processor.expandWildcards( getInputParam('file?.fa'), p0 )
list2 = processor.expandWildcards( getInputParam('file_???.fa'), p1_p4 )
def list3 = processor.expandWildcards( getInputParam('file_?.fa'), p1_p12 )
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
Expand All @@ -247,66 +255,201 @@ class TaskProcessorTest extends Specification {
list3 *. stageName == ['file_1.fa', 'file_2.fa', 'file_3.fa', 'file_4.fa', 'file_5.fa', 'file_6.fa', 'file_7.fa', 'file_8.fa', 'file_9.fa', 'file_10.fa', 'file_11.fa', 'file_12.fa']

when:
list1 = processor.expandWildcards('*', [FileHolder.get('a')])
list2 = processor.expandWildcards('*', [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
list1 = processor.expandWildcards( getInputParam('*'), [FileHolder.get('a')])
list2 = processor.expandWildcards( getInputParam('*'), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['a']
list2 *. stageName == ['x','y','z']

when:
list1 = processor.expandWildcards('dir1/*', [FileHolder.get('a')])
list2 = processor.expandWildcards('dir2/*', [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
list1 = processor.expandWildcards( getInputParam('dir1/*'), [FileHolder.get('a')])
list2 = processor.expandWildcards( getInputParam('dir2/*'), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['dir1/a']
list2 *. stageName == ['dir2/x','dir2/y','dir2/z']

when:
list1 = processor.expandWildcards('/dir/file*.fa', [FileHolder.get('x')])
list2 = processor.expandWildcards('dir/file_*.fa', [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
list1 = processor.expandWildcards( getInputParam('/dir/file*.fa'), [FileHolder.get('x')])
list2 = processor.expandWildcards( getInputParam('dir/file_*.fa'), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['dir/file.fa']
list2 *. stageName == ['dir/file_1.fa', 'dir/file_2.fa', 'dir/file_3.fa']

when:
list1 = processor.expandWildcards('dir/*', [FileHolder.get('file.fa')])
list2 = processor.expandWildcards('dir/*', [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
list1 = processor.expandWildcards( getInputParam('dir/*'), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam('dir/*'), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/file.fa']
list2 *. stageName == ['dir/titi.fa', 'dir/toto.fa']

when:
list1 = processor.expandWildcards('dir/*/*', [FileHolder.get('file.fa')])
list2 = processor.expandWildcards('dir/*/*', [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
list1 = processor.expandWildcards( getInputParam('dir/*/*'), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam('dir/*/*'), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/1/file.fa']
list2 *. stageName == ['dir/1/titi.fa', 'dir/2/toto.fa']

when:
list1 = processor.expandWildcards('dir/foo*/*', [FileHolder.get('file.fa')])
list2 = processor.expandWildcards('dir/foo*/*', [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
list1 = processor.expandWildcards( getInputParam('dir/foo*/*'), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam('dir/foo*/*'), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/foo1/file.fa']
list2 *. stageName == ['dir/foo1/titi.fa', 'dir/foo2/toto.fa']

when:
list1 = processor.expandWildcards('dir/??/*', [FileHolder.get('file.fa')])
list2 = processor.expandWildcards('dir/??/*', [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
list1 = processor.expandWildcards( getInputParam('dir/??/*'), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam('dir/??/*'), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/01/file.fa']
list2 *. stageName == ['dir/01/titi.fa', 'dir/02/toto.fa']

when:
list1 = processor.expandWildcards('dir/bar??/*', [FileHolder.get('file.fa')])
list2 = processor.expandWildcards('dir/bar??/*', [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
list1 = processor.expandWildcards( getInputParam('dir/bar??/*'), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam('dir/bar??/*'), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/bar01/file.fa']
list2 *. stageName == ['dir/bar01/titi.fa', 'dir/bar02/toto.fa']

}

def 'should expand wildcards Closures same behavior'() {

setup:
def processor = [:] as TaskProcessor

/*
* The name do not contain any wildcards *BUT* when multiple files are provide
* an index number is added to the specified name
*/
when:
def list1 = processor.expandWildcards( getInputParam({ 'file_name' }), [FileHolder.get('x')])
def list2 = processor.expandWildcards( getInputParam({ 'file_name' }), [FileHolder.get('x'), FileHolder.get('y')] )
then:
list1 *. stageName == ['file_name']
list2 *. stageName == ['file_name1', 'file_name2']


/*
* The star wildcard: when a single item is provided, it is simply ignored
* When a collection of files is provided, the name is expanded to the index number
*/
when:
list1 = processor.expandWildcards( getInputParam({ 'file*.fa' }), [FileHolder.get('x')])
list2 = processor.expandWildcards( getInputParam({ 'file_*.fa' }), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['file.fa']
list2 *. stageName == ['file_1.fa', 'file_2.fa', 'file_3.fa']

/*
* The question mark wildcards *always* expand to an index number
*/
when:
def p0 = [FileHolder.get('0')]
def p1_p4 = (1..4).collect { FileHolder.get(it.toString()) }
def p1_p12 = (1..12).collect { FileHolder.get(it.toString()) }
list1 = processor.expandWildcards( getInputParam({ 'file?.fa' }), p0 )
list2 = processor.expandWildcards( getInputParam({ 'file_???.fa' }), p1_p4 )
def list3 = processor.expandWildcards( getInputParam({ 'file_?.fa' }), p1_p12 )
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list3 instanceof ArrayBag
list1 *. stageName == ['file1.fa']
list2 *. stageName == ['file_001.fa', 'file_002.fa', 'file_003.fa', 'file_004.fa']
list3 *. stageName == ['file_1.fa', 'file_2.fa', 'file_3.fa', 'file_4.fa', 'file_5.fa', 'file_6.fa', 'file_7.fa', 'file_8.fa', 'file_9.fa', 'file_10.fa', 'file_11.fa', 'file_12.fa']

when:
list1 = processor.expandWildcards( getInputParam({ '*' }), [FileHolder.get('a')])
list2 = processor.expandWildcards( getInputParam({ '*' }), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['a']
list2 *. stageName == ['x','y','z']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir1/*' }), [FileHolder.get('a')])
list2 = processor.expandWildcards( getInputParam({ 'dir2/*' }), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['dir1/a']
list2 *. stageName == ['dir2/x','dir2/y','dir2/z']

when:
list1 = processor.expandWildcards( getInputParam({ '/dir/file*.fa' }), [FileHolder.get('x')])
list2 = processor.expandWildcards( getInputParam({ 'dir/file_*.fa' }), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1 instanceof ArrayBag
list2 instanceof ArrayBag
list1 *. stageName == ['dir/file.fa']
list2 *. stageName == ['dir/file_1.fa', 'dir/file_2.fa', 'dir/file_3.fa']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir/*' }), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam({ 'dir/*' }), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/file.fa']
list2 *. stageName == ['dir/titi.fa', 'dir/toto.fa']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir/*/*' }), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam({ 'dir/*/*' }), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/1/file.fa']
list2 *. stageName == ['dir/1/titi.fa', 'dir/2/toto.fa']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir/foo*/*' }), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam({ 'dir/foo*/*' }), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/foo1/file.fa']
list2 *. stageName == ['dir/foo1/titi.fa', 'dir/foo2/toto.fa']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir/??/*' }), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam({ 'dir/??/*' }), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/01/file.fa']
list2 *. stageName == ['dir/01/titi.fa', 'dir/02/toto.fa']

when:
list1 = processor.expandWildcards( getInputParam({ 'dir/bar??/*' }), [FileHolder.get('file.fa')])
list2 = processor.expandWildcards( getInputParam({ 'dir/bar??/*' }), [FileHolder.get('titi.fa'), FileHolder.get('file.fq', 'toto.fa')])
then:
list1 *. stageName == ['dir/bar01/file.fa']
list2 *. stageName == ['dir/bar01/titi.fa', 'dir/bar02/toto.fa']
}

def 'should expand wildcards Closures for naming'() {

setup:
def processor = [:] as TaskProcessor

when:
def list1 = processor.expandWildcards(getInputParam({ storePath.name * 2 }), [FileHolder.get('x'), FileHolder.get('y'), FileHolder.get('z')])
then:
list1*.stageName == ['xx','yy','zz']

when:
list1 = processor.expandWildcards(getInputParam({ storePath.parent.name + "/" + storePath.name }), [FileHolder.get('a/x/a.txt'), FileHolder.get('b/y/a.txt'), FileHolder.get('c/z/a.txt')])
def list2 = processor.expandWildcards(getInputParam('*'), [FileHolder.get('x/a.txt'), FileHolder.get('y/a.txt'), FileHolder.get('z/a.txt')])
then:
list1*.stageName == ['x/a.txt','y/a.txt','z/a.txt']
list2*.stageName == ['a.txt','a.txt','a.txt']

when:
list1 = processor.expandWildcards(getInputParam({ storePath.name }), [FileHolder.get('a.txt'), FileHolder.get('a.txt'), FileHolder.get('b.txt'), FileHolder.get('b.txt')])
then:
list1*.stageName == ['a.txt1','a.txt2','b.txt1','b.txt2']
}

@Unroll
Expand Down