diff --git a/docs/executor.md b/docs/executor.md index c62f0e21f3..011cc84eca 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -24,6 +24,7 @@ Resource requests and other job characteristics can be controlled via the follow - {ref}`process-accelerator` - {ref}`process-arch` (only when using Fargate platform type for AWS Batch) +- {ref}`process-hints` - {ref}`process-container` - {ref}`process-containerOptions` - {ref}`process-cpus` diff --git a/docs/reference/process.md b/docs/reference/process.md index 18bad7863c..90149e5edb 100644 --- a/docs/reference/process.md +++ b/docs/reference/process.md @@ -582,6 +582,55 @@ Containers are a very useful way to execute your scripts in a reproducible self- This directive is ignored for processes that are {ref}`executed natively `. ::: +(process-hints)= + +### hints + +:::{versionadded} 25.04.0-edge +::: + +The `hints` directive allows you to specify executor-specific scheduling hints using namespaced keys. Executors consume the hints they understand and silently ignore the rest. This directive is repeatable — multiple calls are merged. + +Currently supported hint namespaces: + +**`consumable-resource:`** — Maps to [AWS Batch consumable resources](https://docs.aws.amazon.com/batch/latest/userguide/resource-aware-scheduling.html) for managing limited resources such as software license seats. AWS Batch will hold jobs in `RUNNABLE` until the required resources are available, then lock them for the duration of the job. + +For example, to require one license seat per task: + +```nextflow +process runDragen { + hints 'consumable-resource:my-dragen-license': 1 + cpus 4 + memory '16 GB' + + script: + """ + dragen --ref-dir /ref ... + """ +} +``` + +Multiple hints can be specified in a single call or across multiple calls: + +```nextflow +process runMultiLicense { + hints 'consumable-resource:license-a': 1, 'consumable-resource:license-b': 3 + + script: + """ + multi-tool ... + """ +} +``` + +:::{note} +Consumable resources must be created in your AWS account before using this hint. See the [AWS Batch documentation](https://docs.aws.amazon.com/batch/latest/userguide/resource-aware-scheduling-how-to-create.html) for details. +::: + +:::{warning} +AWS Batch job queues use FIFO ordering by default. A job waiting for a consumable resource will block all subsequent jobs in the same queue — even those that don't require the resource. To avoid this, use a dedicated job queue (via the `queue` directive) for processes that require consumable resources, or use a fair-share scheduling policy on the job queue. +::: + (process-containeroptions)= ### containerOptions diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 27bdd95f98..f09192ff0c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -529,6 +529,15 @@ class TaskConfig extends LazyMap implements Cloneable { return CmdLineOptionMap.emptyOption() } + Map getHints() { + def value = get('hints') + if( value instanceof Map ) + return (Map) value + if( value != null ) + throw new IllegalArgumentException("Invalid `hints` directive value: $value [${value.getClass().getName()}]") + return null + } + Map getResourceLabels() { return get('resourceLabels') as Map ?: Collections.emptyMap() } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy index 2f63c30d62..f4595d1f19 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy @@ -50,6 +50,7 @@ class ProcessBuilder { 'cache', 'clusterOptions', 'conda', + 'hints', 'container', 'containerOptions', 'cpus', @@ -416,6 +417,27 @@ class ProcessBuilder { allSecrets.add(name) } + /** + * Implements the {@code hints} directive. + * + * Example: hints 'consumable-resource:my-license': 1, 'consumable-resource:other': 2 + * + * This directive can be specified (invoked) multiple times in + * the process definition. Multiple calls are merged. + * + * @param map A map of namespaced hint keys to values + */ + void hints(Map map) { + if( !map ) return + + def allHints = (Map)config.get('hints') + if( !allHints ) { + allHints = [:] + } + allHints += map + config.put('hints', allHints) + } + /// SCRIPT ProcessBuilder withBody(Closure closure, String section, String source='', List values=null) { diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy index 5703eecab4..88aad9cd23 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy @@ -586,6 +586,45 @@ class TaskConfigTest extends Specification { res.type == 'nvidia' } + def 'should get hints'() { + given: + def script = Mock(BaseScript) + + // from DSL: merged map + when: + def process = new ProcessConfig(script) + def dsl = new ProcessBuilder(process) + dsl.hints 'consumable-resource:my-license': 1 + def res = process.createTaskConfig().getHints() + then: + res == ['consumable-resource:my-license': 1] + + // from DSL: multiple calls merge + when: + process = new ProcessConfig(script) + dsl = new ProcessBuilder(process) + dsl.hints 'consumable-resource:license-a': 2 + dsl.hints 'consumable-resource:license-b': 3 + res = process.createTaskConfig().getHints() + then: + res == ['consumable-resource:license-a': 2, 'consumable-resource:license-b': 3] + + // from config: map syntax + when: + def config = new TaskConfig() + config.put('hints', ['consumable-resource:my-license': 1, 'other-hint': 'value']) + res = config.getHints() + then: + res == ['consumable-resource:my-license': 1, 'other-hint': 'value'] + + // absent directive returns null + when: + config = new TaskConfig() + res = config.getHints() + then: + res == null + } + def 'should configure secrets'() { given: def script = Mock(BaseScript) diff --git a/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessBuilderTest.groovy index a25a8144a0..40f1314870 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/dsl/ProcessBuilderTest.groovy @@ -134,6 +134,31 @@ class ProcessBuilderTest extends Specification { config.getSecret() == ['foo', 'bar'] } + def 'should set hints'() { + given: + def builder = createBuilder() + def config = builder.getConfig() + expect: + config.get('hints') == null + + when: + builder.hints 'consumable-resource:my-license': 1 + then: + config.get('hints') == ['consumable-resource:my-license': 1] + + // multiple calls merge + when: + builder.hints 'consumable-resource:other-license': 3 + then: + config.get('hints') == ['consumable-resource:my-license': 1, 'consumable-resource:other-license': 3] + + // multi-key map in one call + when: + builder.hints 'consumable-resource:license-a': 2, 'some-other-hint': 'value' + then: + config.get('hints') == ['consumable-resource:my-license': 1, 'consumable-resource:other-license': 3, 'consumable-resource:license-a': 2, 'some-other-hint': 'value'] + } + def 'should set process labels'() { when: def builder = createBuilder() diff --git a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java index a75a875b3a..9a8d73781b 100644 --- a/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java +++ b/modules/nf-lang/src/main/java/nextflow/script/dsl/ProcessDsl.java @@ -122,6 +122,13 @@ void arch( """) void conda(String value); + @Description(""" + The `hints` directive allows you to specify executor-specific scheduling hints using namespaced keys. Executors consume the hints they understand and ignore the rest. + + [Read more](https://nextflow.io/docs/latest/reference/process.html#hints) + """) + void hints(Map value); + @Description(""" The `container` directive allows you to execute the process script in a container. diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy index 3d6cb2810d..806d8e53bd 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy @@ -82,6 +82,8 @@ import software.amazon.awssdk.services.batch.model.SubmitJobRequest import software.amazon.awssdk.services.batch.model.SubmitJobResponse import software.amazon.awssdk.services.batch.model.TerminateJobRequest import software.amazon.awssdk.services.batch.model.Volume +import software.amazon.awssdk.services.batch.model.ConsumableResourceProperties +import software.amazon.awssdk.services.batch.model.ConsumableResourceRequirement /** * Implements a task handler for AWS Batch jobs */ @@ -623,12 +625,39 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler resourceList = new ArrayList<>() + for( Map.Entry entry : hints.entrySet() ) { + if( entry.key.startsWith(consumablePrefix) ) { + final resourceName = entry.key.substring(consumablePrefix.length()) + resourceList.add( + ConsumableResourceRequirement.builder() + .consumableResource(resourceName) + .quantity(entry.value as Long) + .build() + ) + } + } + if( resourceList ) { + result.consumableResourceProperties( + ConsumableResourceProperties.builder() + .consumableResourceList(resourceList) + .build() + ) + } + } + // add to this list all values that has to contribute to the // job definition unique name creation hashingTokens.add(name) hashingTokens.add(container.toString()) if( containerOpts ) hashingTokens.add(containerOpts) + if( hints ) + hashingTokens.add(hints.toString()) return result } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/model/RegisterJobDefinitionModel.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/model/RegisterJobDefinitionModel.groovy index 2ef91878d6..c11a141c4b 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/model/RegisterJobDefinitionModel.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/model/RegisterJobDefinitionModel.groovy @@ -20,6 +20,7 @@ package nextflow.cloud.aws.batch.model import groovy.transform.CompileStatic import software.amazon.awssdk.services.batch.model.JobDefinitionType import software.amazon.awssdk.services.batch.model.PlatformCapability +import software.amazon.awssdk.services.batch.model.ConsumableResourceProperties import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest /** @@ -45,6 +46,8 @@ class RegisterJobDefinitionModel { private Map tags + private ConsumableResourceProperties consumableResourceProperties + RegisterJobDefinitionModel jobDefinitionName(String value) { this.jobDefinitionName = value return this @@ -82,6 +85,11 @@ class RegisterJobDefinitionModel { return this } + RegisterJobDefinitionModel consumableResourceProperties(ConsumableResourceProperties value) { + this.consumableResourceProperties = value + return this + } + String getJobDefinitionName() { return jobDefinitionName } @@ -106,6 +114,10 @@ class RegisterJobDefinitionModel { return tags } + ConsumableResourceProperties getConsumableResourceProperties() { + return consumableResourceProperties + } + RegisterJobDefinitionRequest toBatchRequest() { final builder = RegisterJobDefinitionRequest.builder() @@ -121,6 +133,8 @@ class RegisterJobDefinitionModel { builder.parameters(parameters) if (tags) builder.tags(tags) + if (consumableResourceProperties) + builder.consumableResourceProperties(consumableResourceProperties) return (RegisterJobDefinitionRequest) builder.build() } @@ -134,6 +148,7 @@ class RegisterJobDefinitionModel { ", containerProperties=" + containerProperties + ", parameters=" + parameters + ", tags=" + tags + + ", consumableResourceProperties=" + consumableResourceProperties + '}'; } } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy index d5b9bee22f..4d75ff6296 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy @@ -61,6 +61,8 @@ import software.amazon.awssdk.services.batch.model.ResourceType import software.amazon.awssdk.services.batch.model.RetryStrategy import software.amazon.awssdk.services.batch.model.SubmitJobRequest import software.amazon.awssdk.services.batch.model.SubmitJobResponse +import software.amazon.awssdk.services.batch.model.ConsumableResourceProperties +import software.amazon.awssdk.services.batch.model.ConsumableResourceRequirement import spock.lang.Specification import spock.lang.Unroll /** @@ -600,6 +602,113 @@ class AwsBatchTaskHandlerTest extends Specification { result.containerProperties.volumes[0].name() == 'aws-cli' } + def 'should create a job definition with consumable resource hints' () { + given: + def IMAGE = 'foo/bar:1.0' + def JOB_NAME = 'nf-foo-bar-1-0' + def HINTS = ['consumable-resource:my-license': 1] + def task = Mock(TaskRun) { + getContainer() >> IMAGE + getConfig() >> Mock(TaskConfig) { + getHints() >> HINTS + } + } + def handler = Spy(AwsBatchTaskHandler) { + getTask() >> task + fusionEnabled() >> false + } + handler.@executor = Mock(AwsBatchExecutor) + + when: + def result = handler.makeJobDefRequest(task) + then: + 1 * handler.normalizeJobDefinitionName(IMAGE) >> JOB_NAME + 1 * handler.getAwsOptions() >> new AwsOptions() + result.consumableResourceProperties != null + result.consumableResourceProperties.consumableResourceList().size() == 1 + result.consumableResourceProperties.consumableResourceList()[0].consumableResource() == 'my-license' + result.consumableResourceProperties.consumableResourceList()[0].quantity() == 1 + } + + def 'should create a job definition with multiple consumable resource hints' () { + given: + def IMAGE = 'foo/bar:1.0' + def JOB_NAME = 'nf-foo-bar-1-0' + def HINTS = ['consumable-resource:license-a': 2, 'consumable-resource:license-b': 3] + def task = Mock(TaskRun) { + getContainer() >> IMAGE + getConfig() >> Mock(TaskConfig) { + getHints() >> HINTS + } + } + def handler = Spy(AwsBatchTaskHandler) { + getTask() >> task + fusionEnabled() >> false + } + handler.@executor = Mock(AwsBatchExecutor) + + when: + def result = handler.makeJobDefRequest(task) + then: + 1 * handler.normalizeJobDefinitionName(IMAGE) >> JOB_NAME + 1 * handler.getAwsOptions() >> new AwsOptions() + result.consumableResourceProperties != null + result.consumableResourceProperties.consumableResourceList().size() == 2 + result.consumableResourceProperties.consumableResourceList()[0].consumableResource() == 'license-a' + result.consumableResourceProperties.consumableResourceList()[0].quantity() == 2 + result.consumableResourceProperties.consumableResourceList()[1].consumableResource() == 'license-b' + result.consumableResourceProperties.consumableResourceList()[1].quantity() == 3 + } + + def 'should ignore non-consumable hints in job definition' () { + given: + def IMAGE = 'foo/bar:1.0' + def JOB_NAME = 'nf-foo-bar-1-0' + def HINTS = ['some-other-hint': 'value'] + def task = Mock(TaskRun) { + getContainer() >> IMAGE + getConfig() >> Mock(TaskConfig) { + getHints() >> HINTS + } + } + def handler = Spy(AwsBatchTaskHandler) { + getTask() >> task + fusionEnabled() >> false + } + handler.@executor = Mock(AwsBatchExecutor) + + when: + def result = handler.makeJobDefRequest(task) + then: + 1 * handler.normalizeJobDefinitionName(IMAGE) >> JOB_NAME + 1 * handler.getAwsOptions() >> new AwsOptions() + result.consumableResourceProperties == null + } + + def 'should not set consumable resources when hints absent' () { + given: + def IMAGE = 'foo/bar:1.0' + def JOB_NAME = 'nf-foo-bar-1-0' + def task = Mock(TaskRun) { + getContainer() >> IMAGE + getConfig() >> Mock(TaskConfig) { + getHints() >> null + } + } + def handler = Spy(AwsBatchTaskHandler) { + getTask() >> task + fusionEnabled() >> false + } + handler.@executor = Mock(AwsBatchExecutor) + + when: + def result = handler.makeJobDefRequest(task) + then: + 1 * handler.normalizeJobDefinitionName(IMAGE) >> JOB_NAME + 1 * handler.getAwsOptions() >> new AwsOptions() + result.consumableResourceProperties == null + } + def 'should create a fargate job definition' () { given: def ARM64 = new Architecture('linux/arm64')