Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Resource requests and other job characteristics can be controlled via the follow

- {ref}`process-accelerator`
- {ref}`process-arch` (only when using Fargate platform type for AWS Batch)
- {ref}`process-hints`
- {ref}`process-container`
- {ref}`process-containerOptions`
- {ref}`process-cpus`
Expand Down
49 changes: 49 additions & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,55 @@ Containers are a very useful way to execute your scripts in a reproducible self-
This directive is ignored for processes that are {ref}`executed natively <process-native>`.
:::

(process-hints)=

### hints

:::{versionadded} 25.04.0-edge
:::

The `hints` directive allows you to specify executor-specific scheduling hints using namespaced keys. Executors consume the hints they understand and silently ignore the rest. This directive is repeatable — multiple calls are merged.

Currently supported hint namespaces:

**`consumable-resource:`** — Maps to [AWS Batch consumable resources](https://docs.aws.amazon.com/batch/latest/userguide/resource-aware-scheduling.html) for managing limited resources such as software license seats. AWS Batch will hold jobs in `RUNNABLE` until the required resources are available, then lock them for the duration of the job.

For example, to require one license seat per task:

```nextflow
process runDragen {
hints 'consumable-resource:my-dragen-license': 1
cpus 4
memory '16 GB'

script:
"""
dragen --ref-dir /ref ...
"""
}
```

Multiple hints can be specified in a single call or across multiple calls:

```nextflow
process runMultiLicense {
hints 'consumable-resource:license-a': 1, 'consumable-resource:license-b': 3

script:
"""
multi-tool ...
"""
}
```

:::{note}
Consumable resources must be created in your AWS account before using this hint. See the [AWS Batch documentation](https://docs.aws.amazon.com/batch/latest/userguide/resource-aware-scheduling-how-to-create.html) for details.
:::

:::{warning}
AWS Batch job queues use FIFO ordering by default. A job waiting for a consumable resource will block all subsequent jobs in the same queue — even those that don't require the resource. To avoid this, use a dedicated job queue (via the `queue` directive) for processes that require consumable resources, or use a fair-share scheduling policy on the job queue.
:::

(process-containeroptions)=

### containerOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,15 @@ class TaskConfig extends LazyMap implements Cloneable {
return CmdLineOptionMap.emptyOption()
}

Map<String, Object> getHints() {
def value = get('hints')
if( value instanceof Map )
return (Map<String, Object>) value
if( value != null )
throw new IllegalArgumentException("Invalid `hints` directive value: $value [${value.getClass().getName()}]")
return null
}

Map<String, String> getResourceLabels() {
return get('resourceLabels') as Map<String, String> ?: Collections.<String,String>emptyMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ProcessBuilder {
'cache',
'clusterOptions',
'conda',
'hints',
'container',
'containerOptions',
'cpus',
Expand Down Expand Up @@ -416,6 +417,27 @@ class ProcessBuilder {
allSecrets.add(name)
}

/**
* Implements the {@code hints} directive.
*
* Example: hints 'consumable-resource:my-license': 1, 'consumable-resource:other': 2
*
* This directive can be specified (invoked) multiple times in
* the process definition. Multiple calls are merged.
*
* @param map A map of namespaced hint keys to values
*/
void hints(Map<String, Object> map) {
if( !map ) return

def allHints = (Map)config.get('hints')
if( !allHints ) {
allHints = [:]
}
allHints += map
config.put('hints', allHints)
}

/// SCRIPT

ProcessBuilder withBody(Closure closure, String section, String source='', List values=null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,45 @@ class TaskConfigTest extends Specification {
res.type == 'nvidia'
}

def 'should get hints'() {
given:
def script = Mock(BaseScript)

// from DSL: merged map
when:
def process = new ProcessConfig(script)
def dsl = new ProcessBuilder(process)
dsl.hints 'consumable-resource:my-license': 1
def res = process.createTaskConfig().getHints()
then:
res == ['consumable-resource:my-license': 1]

// from DSL: multiple calls merge
when:
process = new ProcessConfig(script)
dsl = new ProcessBuilder(process)
dsl.hints 'consumable-resource:license-a': 2
dsl.hints 'consumable-resource:license-b': 3
res = process.createTaskConfig().getHints()
then:
res == ['consumable-resource:license-a': 2, 'consumable-resource:license-b': 3]

// from config: map syntax
when:
def config = new TaskConfig()
config.put('hints', ['consumable-resource:my-license': 1, 'other-hint': 'value'])
res = config.getHints()
then:
res == ['consumable-resource:my-license': 1, 'other-hint': 'value']

// absent directive returns null
when:
config = new TaskConfig()
res = config.getHints()
then:
res == null
}

def 'should configure secrets'() {
given:
def script = Mock(BaseScript)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ class ProcessBuilderTest extends Specification {
config.getSecret() == ['foo', 'bar']
}

def 'should set hints'() {
given:
def builder = createBuilder()
def config = builder.getConfig()
expect:
config.get('hints') == null

when:
builder.hints 'consumable-resource:my-license': 1
then:
config.get('hints') == ['consumable-resource:my-license': 1]

// multiple calls merge
when:
builder.hints 'consumable-resource:other-license': 3
then:
config.get('hints') == ['consumable-resource:my-license': 1, 'consumable-resource:other-license': 3]

// multi-key map in one call
when:
builder.hints 'consumable-resource:license-a': 2, 'some-other-hint': 'value'
then:
config.get('hints') == ['consumable-resource:my-license': 1, 'consumable-resource:other-license': 3, 'consumable-resource:license-a': 2, 'some-other-hint': 'value']
}

def 'should set process labels'() {
when:
def builder = createBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ void arch(
""")
void conda(String value);

@Description("""
The `hints` directive allows you to specify executor-specific scheduling hints using namespaced keys. Executors consume the hints they understand and ignore the rest.

[Read more](https://nextflow.io/docs/latest/reference/process.html#hints)
""")
void hints(Map<String,?> value);

@Description("""
The `container` directive allows you to execute the process script in a container.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ import software.amazon.awssdk.services.batch.model.SubmitJobRequest
import software.amazon.awssdk.services.batch.model.SubmitJobResponse
import software.amazon.awssdk.services.batch.model.TerminateJobRequest
import software.amazon.awssdk.services.batch.model.Volume
import software.amazon.awssdk.services.batch.model.ConsumableResourceProperties
import software.amazon.awssdk.services.batch.model.ConsumableResourceRequirement
/**
* Implements a task handler for AWS Batch jobs
*/
Expand Down Expand Up @@ -623,12 +625,39 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
// finally set the container options
result.containerProperties(container)

// set consumable resource properties from hints
final hints = task.config.getHints()
final consumablePrefix = 'consumable-resource:'
if( hints ) {
final List<ConsumableResourceRequirement> resourceList = new ArrayList<>()
for( Map.Entry<String, Object> entry : hints.entrySet() ) {
if( entry.key.startsWith(consumablePrefix) ) {
final resourceName = entry.key.substring(consumablePrefix.length())
resourceList.add(
ConsumableResourceRequirement.builder()
.consumableResource(resourceName)
.quantity(entry.value as Long)
.build()
)
}
}
if( resourceList ) {
result.consumableResourceProperties(
ConsumableResourceProperties.builder()
.consumableResourceList(resourceList)
.build()
)
}
}

// add to this list all values that has to contribute to the
// job definition unique name creation
hashingTokens.add(name)
hashingTokens.add(container.toString())
if( containerOpts )
hashingTokens.add(containerOpts)
if( hints )
hashingTokens.add(hints.toString())

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package nextflow.cloud.aws.batch.model
import groovy.transform.CompileStatic
import software.amazon.awssdk.services.batch.model.JobDefinitionType
import software.amazon.awssdk.services.batch.model.PlatformCapability
import software.amazon.awssdk.services.batch.model.ConsumableResourceProperties
import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest

/**
Expand All @@ -45,6 +46,8 @@ class RegisterJobDefinitionModel {

private Map<String,String> tags

private ConsumableResourceProperties consumableResourceProperties

RegisterJobDefinitionModel jobDefinitionName(String value) {
this.jobDefinitionName = value
return this
Expand Down Expand Up @@ -82,6 +85,11 @@ class RegisterJobDefinitionModel {
return this
}

RegisterJobDefinitionModel consumableResourceProperties(ConsumableResourceProperties value) {
this.consumableResourceProperties = value
return this
}

String getJobDefinitionName() {
return jobDefinitionName
}
Expand All @@ -106,6 +114,10 @@ class RegisterJobDefinitionModel {
return tags
}

ConsumableResourceProperties getConsumableResourceProperties() {
return consumableResourceProperties
}

RegisterJobDefinitionRequest toBatchRequest() {
final builder = RegisterJobDefinitionRequest.builder()

Expand All @@ -121,6 +133,8 @@ class RegisterJobDefinitionModel {
builder.parameters(parameters)
if (tags)
builder.tags(tags)
if (consumableResourceProperties)
builder.consumableResourceProperties(consumableResourceProperties)

return (RegisterJobDefinitionRequest) builder.build()
}
Expand All @@ -134,6 +148,7 @@ class RegisterJobDefinitionModel {
", containerProperties=" + containerProperties +
", parameters=" + parameters +
", tags=" + tags +
", consumableResourceProperties=" + consumableResourceProperties +
'}';
}
}
Loading
Loading