From 58dc7472ddf87df1d631562758b0ff9c85e32f54 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 16:52:22 -0600 Subject: [PATCH 1/8] feat(nf-tower): add dataset:// NIO FileSystem provider and Path DatasetFileSystemProvider: NIO SPI for 'dataset' scheme, read-only. Delegates I/O to the resolved cloud path's provider. Write ops throw ReadOnlyFileSystemException. DatasetFileSystem: minimal read-only FileSystem implementation. DatasetPath: Path wrapping dataset name + optional version. Parses dataset://name?version=N URIs. Lazy resolution to backing cloud path. Signed-off-by: Edmund Miller --- .../plugin/dataset/DatasetFileSystem.java | 123 ++++++++ .../dataset/DatasetFileSystemProvider.java | 219 ++++++++++++++ .../tower/plugin/dataset/DatasetPath.java | 269 ++++++++++++++++++ 3 files changed, 611 insertions(+) create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystem.java create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPath.java diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystem.java b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystem.java new file mode 100644 index 0000000000..9a8f6f4ceb --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystem.java @@ -0,0 +1,123 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset; + +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.nio.file.WatchService; +import java.nio.file.attribute.UserPrincipalLookupService; +import java.nio.file.spi.FileSystemProvider; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * Minimal read-only FileSystem for the {@code dataset://} scheme. + *

+ * Datasets are single files resolved via the Seqera Platform API, + * so most filesystem operations (roots, stores, path matching) + * are either trivial or unsupported. + * + * @author Edmund Miller + */ +public class DatasetFileSystem extends FileSystem { + + static final String PATH_SEPARATOR = "/"; + + private final DatasetFileSystemProvider provider; + private final Map env; + private volatile boolean open = true; + + DatasetFileSystem(DatasetFileSystemProvider provider, Map env) { + this.provider = provider; + this.env = env != null ? env : Collections.emptyMap(); + } + + @Override + public FileSystemProvider provider() { + return provider; + } + + @Override + public void close() throws IOException { + open = false; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public boolean isReadOnly() { + return true; + } + + @Override + public String getSeparator() { + return PATH_SEPARATOR; + } + + @Override + public Iterable getRootDirectories() { + return Collections.emptyList(); + } + + @Override + public Iterable getFileStores() { + return Collections.emptyList(); + } + + @Override + public Set supportedFileAttributeViews() { + return Set.of("basic"); + } + + @Override + public Path getPath(String first, String... more) { + // Build a dataset path from string components + StringBuilder sb = new StringBuilder(first); + for (String part : more) { + if (!part.isEmpty()) { + sb.append(PATH_SEPARATOR).append(part); + } + } + return new DatasetPath(this, sb.toString()); + } + + @Override + public PathMatcher getPathMatcher(String syntaxAndPattern) { + throw new UnsupportedOperationException("Dataset filesystem does not support path matching"); + } + + @Override + public UserPrincipalLookupService getUserPrincipalLookupService() { + throw new UnsupportedOperationException(); + } + + @Override + public WatchService newWatchService() throws IOException { + throw new UnsupportedOperationException(); + } + + Map getEnv() { + return env; + } +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java new file mode 100644 index 0000000000..0d1e6efc31 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java @@ -0,0 +1,219 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.AccessMode; +import java.nio.file.CopyOption; +import java.nio.file.DirectoryStream; +import java.nio.file.FileStore; +import java.nio.file.FileSystem; +import java.nio.file.FileSystemAlreadyExistsException; +import java.nio.file.LinkOption; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.ReadOnlyFileSystemException; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.FileAttributeView; +import java.nio.file.spi.FileSystemProvider; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * NIO FileSystemProvider for the {@code dataset://} scheme. + *

+ * Resolves dataset URIs of the form {@code dataset://name} or + * {@code dataset://name?version=N} to their backing cloud storage + * path via the Seqera Platform API, then delegates all I/O + * operations to the resolved path's provider. + *

+ * Phase 1: read-only — write operations throw {@link ReadOnlyFileSystemException}. + * + * @author Edmund Miller + */ +public class DatasetFileSystemProvider extends FileSystemProvider { + + private static final Logger log = LoggerFactory.getLogger(DatasetFileSystemProvider.class); + + private volatile DatasetFileSystem fileSystem; + + @Override + public String getScheme() { + return "dataset"; + } + + @Override + public FileSystem newFileSystem(URI uri, Map env) throws IOException { + if (fileSystem != null) { + throw new FileSystemAlreadyExistsException("Dataset filesystem already exists"); + } + synchronized (this) { + if (fileSystem != null) { + throw new FileSystemAlreadyExistsException("Dataset filesystem already exists"); + } + fileSystem = new DatasetFileSystem(this, env); + return fileSystem; + } + } + + @Override + public FileSystem getFileSystem(URI uri) { + if (fileSystem == null) { + throw new java.nio.file.FileSystemNotFoundException("Dataset filesystem not yet created. Use newFileSystem() first"); + } + return fileSystem; + } + + @Override + public Path getPath(URI uri) { + if (!"dataset".equals(uri.getScheme())) { + throw new IllegalArgumentException("URI scheme must be 'dataset': " + uri); + } + return new DatasetPath(getOrCreateFileSystem(), uri); + } + + // -- read operations: delegate to resolved cloud path -- + + @Override + public InputStream newInputStream(Path path, OpenOption... options) throws IOException { + final Path resolved = resolvedPath(path); + log.debug("newInputStream for dataset path {} -> {}", path, resolved); + return resolved.getFileSystem().provider().newInputStream(resolved, options); + } + + @Override + public SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + final Path resolved = resolvedPath(path); + return resolved.getFileSystem().provider().newByteChannel(resolved, options, attrs); + } + + @Override + public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { + throw new UnsupportedOperationException("Dataset paths do not support directory listing"); + } + + @Override + @SuppressWarnings("unchecked") + public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { + final Path resolved = resolvedPath(path); + return resolved.getFileSystem().provider().readAttributes(resolved, type, options); + } + + @Override + public Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { + final Path resolved = resolvedPath(path); + return resolved.getFileSystem().provider().readAttributes(resolved, attributes, options); + } + + @Override + public V getFileAttributeView(Path path, Class type, LinkOption... options) { + try { + final Path resolved = resolvedPath(path); + return resolved.getFileSystem().provider().getFileAttributeView(resolved, type, options); + } + catch (IOException e) { + throw new RuntimeException("Failed to resolve dataset path: " + path, e); + } + } + + @Override + public void checkAccess(Path path, AccessMode... modes) throws IOException { + for (AccessMode mode : modes) { + if (mode == AccessMode.WRITE) { + throw new ReadOnlyFileSystemException(); + } + } + final Path resolved = resolvedPath(path); + resolved.getFileSystem().provider().checkAccess(resolved, modes); + } + + // -- write operations: read-only in phase 1 -- + + @Override + public void createDirectory(Path dir, FileAttribute... attrs) throws IOException { + throw new ReadOnlyFileSystemException(); + } + + @Override + public void delete(Path path) throws IOException { + throw new ReadOnlyFileSystemException(); + } + + @Override + public void copy(Path source, Path target, CopyOption... options) throws IOException { + throw new ReadOnlyFileSystemException(); + } + + @Override + public void move(Path source, Path target, CopyOption... options) throws IOException { + throw new ReadOnlyFileSystemException(); + } + + @Override + public boolean isSameFile(Path path1, Path path2) throws IOException { + if (path1 instanceof DatasetPath && path2 instanceof DatasetPath) { + return path1.equals(path2); + } + return false; + } + + @Override + public boolean isHidden(Path path) throws IOException { + return false; + } + + @Override + public FileStore getFileStore(Path path) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void setAttribute(Path path, String attribute, Object value, LinkOption... options) throws IOException { + throw new ReadOnlyFileSystemException(); + } + + // -- internal helpers -- + + private DatasetFileSystem getOrCreateFileSystem() { + if (fileSystem == null) { + synchronized (this) { + if (fileSystem == null) { + fileSystem = new DatasetFileSystem(this, null); + } + } + } + return fileSystem; + } + + /** + * Resolve a dataset path to its backing cloud storage path. + * Caches the resolved path on the DatasetPath instance. + */ + private Path resolvedPath(Path path) throws IOException { + if (!(path instanceof DatasetPath)) { + throw new IllegalArgumentException("Path must be a DatasetPath: " + path); + } + return ((DatasetPath) path).getResolvedPath(); + } +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPath.java b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPath.java new file mode 100644 index 0000000000..0162314db9 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPath.java @@ -0,0 +1,269 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.FileSystem; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Iterator; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Path} representing a Seqera Platform dataset reference. + *

+ * URI format: {@code dataset://name} or {@code dataset://name?version=N} + *

+ * The path lazily resolves to the backing cloud storage path + * (S3/GCS/Azure) via the Platform API on first I/O access. + * + * @author Edmund Miller + */ +public class DatasetPath implements Path { + + private static final Logger log = LoggerFactory.getLogger(DatasetPath.class); + + private final DatasetFileSystem fileSystem; + private final String datasetName; + private final String version; // null = latest + private final URI uri; + + /** Cached resolved cloud path — populated lazily on first I/O */ + private volatile Path resolvedPath; + + /** + * Construct from a URI (e.g. from provider.getPath(URI)) + */ + DatasetPath(DatasetFileSystem fileSystem, URI uri) { + this.fileSystem = fileSystem; + this.uri = uri; + // dataset://my-samplesheet or dataset:///my-samplesheet + // host = dataset name, or if host is null, first path segment is the name + String name = uri.getHost(); + if (name == null || name.isEmpty()) { + // handle dataset:///name form + String path = uri.getPath(); + if (path != null && path.startsWith("/")) { + path = path.substring(1); + } + name = path; + } + this.datasetName = name; + // parse ?version=N from query string + this.version = parseVersion(uri.getQuery()); + } + + /** + * Construct from string path (e.g. from fileSystem.getPath()) + */ + DatasetPath(DatasetFileSystem fileSystem, String path) { + this.fileSystem = fileSystem; + // strip leading slash if present + if (path.startsWith("/")) { + path = path.substring(1); + } + // check for version suffix: name@version + int atIdx = path.indexOf('@'); + if (atIdx > 0) { + this.datasetName = path.substring(0, atIdx); + this.version = path.substring(atIdx + 1); + } + else { + this.datasetName = path; + this.version = null; + } + this.uri = URI.create("dataset://" + datasetName + (version != null ? "?version=" + version : "")); + } + + public String getDatasetName() { + return datasetName; + } + + public String getVersion() { + return version; + } + + /** + * Resolve this dataset reference to the backing cloud storage path. + * Lazily initialized and cached. + */ + Path getResolvedPath() throws IOException { + if (resolvedPath == null) { + synchronized (this) { + if (resolvedPath == null) { + log.debug("Resolving dataset '{}' version={}", datasetName, version != null ? version : "latest"); + resolvedPath = DatasetResolver.resolve(datasetName, version); + log.debug("Resolved dataset '{}' -> {}", datasetName, resolvedPath); + } + } + } + return resolvedPath; + } + + // -- Path interface -- + + @Override + public FileSystem getFileSystem() { + return fileSystem; + } + + @Override + public boolean isAbsolute() { + return true; + } + + @Override + public Path getRoot() { + return null; + } + + @Override + public Path getFileName() { + // The dataset name is the "file name" + return new DatasetPath(fileSystem, datasetName); + } + + @Override + public Path getParent() { + return null; + } + + @Override + public int getNameCount() { + return 1; + } + + @Override + public Path getName(int index) { + if (index != 0) { + throw new IllegalArgumentException("Invalid name index: " + index); + } + return this; + } + + @Override + public Path subpath(int beginIndex, int endIndex) { + if (beginIndex != 0 || endIndex != 1) { + throw new IllegalArgumentException("Invalid subpath range"); + } + return this; + } + + @Override + public boolean startsWith(Path other) { + return equals(other); + } + + @Override + public boolean endsWith(Path other) { + return equals(other); + } + + @Override + public Path normalize() { + return this; + } + + @Override + public Path resolve(Path other) { + // dataset paths are leaf nodes, cannot resolve children + throw new UnsupportedOperationException("Cannot resolve against a dataset path"); + } + + @Override + public Path relativize(Path other) { + throw new UnsupportedOperationException("Cannot relativize dataset paths"); + } + + @Override + public URI toUri() { + return uri; + } + + @Override + public Path toAbsolutePath() { + return this; + } + + @Override + public Path toRealPath(LinkOption... options) throws IOException { + // Return the resolved cloud path as the "real" path + return getResolvedPath(); + } + + @Override + public WatchKey register(WatchService watcher, WatchEvent.Kind[] events, WatchEvent.Modifier... modifiers) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(Path other) { + if (other instanceof DatasetPath) { + DatasetPath o = (DatasetPath) other; + int cmp = datasetName.compareTo(o.datasetName); + if (cmp != 0) return cmp; + if (version == null && o.version == null) return 0; + if (version == null) return -1; + if (o.version == null) return 1; + return version.compareTo(o.version); + } + return toString().compareTo(other.toString()); + } + + @Override + public Iterator iterator() { + return java.util.List.of(this).iterator(); + } + + @Override + public String toString() { + return "dataset://" + datasetName + (version != null ? "?version=" + version : ""); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof DatasetPath)) return false; + DatasetPath that = (DatasetPath) o; + return Objects.equals(datasetName, that.datasetName) && Objects.equals(version, that.version); + } + + @Override + public int hashCode() { + return Objects.hash(datasetName, version); + } + + // -- helpers -- + + private static String parseVersion(String query) { + if (query == null || query.isEmpty()) return null; + for (String param : query.split("&")) { + String[] kv = param.split("=", 2); + if (kv.length == 2 && "version".equals(kv[0])) { + return kv[1]; + } + } + return null; + } +} From eb048db21086fce1f07847b0f1217e2cd2f1d30d Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 16:52:29 -0600 Subject: [PATCH 2/8] feat(nf-tower): add DatasetResolver for Platform API resolution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves dataset name → cloud storage Path via Platform API: 1. GET /datasets?workspaceId=X → match by name → dataset ID 2. GET /datasets/{id}/versions → latest or specific version → cloud URL 3. FileHelper.asPath(cloudUrl) → concrete S3/GCS/Azure Path Uses java.net.http.HttpClient with Bearer token auth. Config from existing tower.* settings (endpoint, accessToken, workspaceId). Signed-off-by: Edmund Miller --- .../plugin/dataset/DatasetResolver.groovy | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy new file mode 100644 index 0000000000..d90a38d979 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy @@ -0,0 +1,193 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.nio.file.Path + +import groovy.json.JsonSlurper +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Global +import nextflow.Session +import nextflow.exception.AbortOperationException +import nextflow.file.FileHelper +import nextflow.platform.PlatformHelper + +/** + * Resolves a Seqera Platform dataset reference to its backing cloud storage path. + *

+ * Resolution chain: + * 1. Dataset name → GET /datasets?workspaceId=X → DatasetDto.id + * 2. Dataset id + version → GET /datasets/{id}/versions → DatasetVersionDto.url + * 3. Cloud URL string → FileHelper.asPath() → concrete cloud Path (S3/GCS/Azure) + * + * @author Edmund Miller + */ +@Slf4j +@CompileStatic +class DatasetResolver { + + /** + * Resolve a dataset name (and optional version) to the backing cloud storage Path. + * + * @param datasetName The dataset name as shown in Seqera Platform + * @param version The version number (null = latest) + * @return A concrete cloud storage Path (e.g. S3Path, GcsPath) + */ + static Path resolve(String datasetName, String version) { + if (!datasetName) + throw new IllegalArgumentException("Dataset name cannot be null or empty") + + final String endpoint = getEndpoint() + final String accessToken = getAccessToken() + final String workspaceId = getWorkspaceId() + + if (!accessToken) + throw new AbortOperationException("Missing Seqera Platform access token -- set TOWER_ACCESS_TOKEN or tower.accessToken in config") + + final HttpClient httpClient = HttpClient.newHttpClient() + + // Step 1: Resolve dataset name → dataset ID + final String datasetId = resolveDatasetId(httpClient, endpoint, accessToken, workspaceId, datasetName) + + // Step 2: Resolve dataset ID + version → cloud storage URL + final String cloudUrl = resolveCloudUrl(httpClient, endpoint, accessToken, workspaceId, datasetId, version) + + log.debug "Dataset '{}' resolved to cloud URL: {}", datasetName, cloudUrl + + // Step 3: Convert cloud URL → Path via Nextflow's FileHelper + return FileHelper.asPath(cloudUrl) + } + + /** + * Look up a dataset by name, return its ID. + */ + static private String resolveDatasetId(HttpClient httpClient, String endpoint, String accessToken, String workspaceId, String datasetName) { + String url = "${endpoint}/datasets" + if (workspaceId) { + url += "?workspaceId=${workspaceId}" + } + + log.debug "Listing datasets from: {}", url + + final Map json = httpGet(httpClient, url, accessToken) + final List datasets = json.datasets as List + + if (!datasets) { + throw new AbortOperationException("No datasets found in workspace") + } + + // Find dataset by name (case-sensitive match) + final Map dataset = datasets.find { Map it -> it.name == datasetName } + if (!dataset) { + final String available = datasets.collect { Map it -> it.name }.join(', ') + throw new AbortOperationException("Dataset '${datasetName}' not found. Available: ${available}") + } + + return dataset.id as String + } + + /** + * Look up the dataset version's backing cloud URL. + * If version is null, uses the latest version. + */ + static private String resolveCloudUrl(HttpClient httpClient, String endpoint, String accessToken, String workspaceId, String datasetId, String version) { + String url = "${endpoint}/datasets/${datasetId}/versions" + if (workspaceId) { + url += "?workspaceId=${workspaceId}" + } + + log.debug "Listing dataset versions from: {}", url + + final Map json = httpGet(httpClient, url, accessToken) + final List versions = json.versions as List + + if (!versions) { + throw new AbortOperationException("No versions found for dataset ID: ${datasetId}") + } + + Map targetVersion + if (version) { + // Find specific version + targetVersion = versions.find { Map it -> it.version?.toString() == version } + if (!targetVersion) { + throw new AbortOperationException("Version '${version}' not found for dataset ID: ${datasetId}") + } + } + else { + // Use the latest version (highest version number) + targetVersion = versions.max { Map it -> (it.version as Integer) ?: 0 } as Map + } + + final String cloudUrl = targetVersion.url as String + if (!cloudUrl) { + throw new AbortOperationException("Dataset version has no backing storage URL -- dataset ID: ${datasetId}, version: ${targetVersion.version}") + } + + return cloudUrl + } + + /** + * Execute a GET request against the Platform API and parse the JSON response. + */ + static private Map httpGet(HttpClient httpClient, String url, String accessToken) { + final request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Authorization", "Bearer ${accessToken}") + .GET() + .build() + + final HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()) + + if (response.statusCode() < 200 || response.statusCode() >= 300) { + if (response.statusCode() == 401 || response.statusCode() == 403) { + throw new AbortOperationException("Access denied to Seqera Platform API -- check your access token") + } + throw new AbortOperationException("Seqera Platform API error -- HTTP ${response.statusCode()}: ${response.body()}") + } + + return new JsonSlurper().parseText(response.body()) as Map + } + + // -- config helpers -- + + static private String getEndpoint() { + final Map opts = getTowerOpts() + return PlatformHelper.getEndpoint(opts, System.getenv()) + } + + static private String getAccessToken() { + final Map opts = getTowerOpts() + return PlatformHelper.getAccessToken(opts, System.getenv()) + } + + static private String getWorkspaceId() { + final Map opts = getTowerOpts() + return PlatformHelper.getWorkspaceId(opts, System.getenv()) + } + + static private Map getTowerOpts() { + final session = Global.session as Session + if (!session) + throw new AbortOperationException("Nextflow session not initialized -- dataset:// URIs require an active session") + + return session.config?.navigate('tower') as Map ?: Collections.emptyMap() + } +} From 7fb961cb387339d29460d158cc1280130a4f39ee Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 16:52:35 -0600 Subject: [PATCH 3/8] feat(nf-tower): register dataset:// provider and path factory DatasetPathFactory: FileSystemPathFactory extension point that intercepts dataset:// URIs in parseUri(), making FileHelper.asPath() and Nextflow.file() work transparently. Register DatasetFileSystemProvider via META-INF/services SPI. Add DatasetPathFactory to plugin extensionPoints in build.gradle. Signed-off-by: Edmund Miller --- plugins/nf-tower/build.gradle | 3 +- .../plugin/dataset/DatasetPathFactory.groovy | 75 +++++++++++++++++++ .../java.nio.file.spi.FileSystemProvider | 17 +++++ 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPathFactory.groovy create mode 100644 plugins/nf-tower/src/resources/META-INF/services/java.nio.file.spi.FileSystemProvider diff --git a/plugins/nf-tower/build.gradle b/plugins/nf-tower/build.gradle index 484374f212..5ffe85f27a 100644 --- a/plugins/nf-tower/build.gradle +++ b/plugins/nf-tower/build.gradle @@ -26,7 +26,8 @@ nextflowPlugin { 'io.seqera.tower.plugin.TowerFactory', 'io.seqera.tower.plugin.TowerFusionToken', 'io.seqera.tower.plugin.auth.AuthCommandImpl', - 'io.seqera.tower.plugin.launch.LaunchCommandImpl' + 'io.seqera.tower.plugin.launch.LaunchCommandImpl', + 'io.seqera.tower.plugin.dataset.DatasetPathFactory', ] } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPathFactory.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPathFactory.groovy new file mode 100644 index 0000000000..0c5b0e2a51 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetPathFactory.groovy @@ -0,0 +1,75 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import java.nio.file.Path + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.file.FileHelper +import nextflow.file.FileSystemPathFactory + +/** + * Factory that intercepts {@code dataset://} URI strings and returns + * a {@link DatasetPath} via the NIO FileSystem provider. + *

+ * Registered as a Nextflow {@code ExtensionPoint} so that + * {@code FileHelper.asPath("dataset://my-samplesheet")} works + * transparently — no pipeline code changes needed. + * + * @author Edmund Miller + */ +@Slf4j +@CompileStatic +class DatasetPathFactory extends FileSystemPathFactory { + + @Override + protected Path parseUri(String str) { + if (!str.startsWith('dataset://')) + return null + + log.debug "Parsing dataset URI: {}", str + + // Normalise to triple-slash form for URI parsing: + // dataset://name → dataset:///name + final normalized = str.startsWith('dataset:///') ? str : 'dataset:///' + str.substring('dataset://'.length()) + + final uri = new URI(null, null, normalized, null, null) + return FileHelper.getOrCreateFileSystemFor(uri).provider().getPath(uri) + } + + @Override + protected String toUriString(Path path) { + if (path instanceof DatasetPath) { + return path.toString() + } + return null + } + + @Override + protected String getBashLib(Path target) { + // dataset:// paths are resolved to cloud paths before execution, + // no special bash lib needed + return null + } + + @Override + protected String getUploadCmd(String source, Path target) { + // read-only — no upload support + return null + } +} diff --git a/plugins/nf-tower/src/resources/META-INF/services/java.nio.file.spi.FileSystemProvider b/plugins/nf-tower/src/resources/META-INF/services/java.nio.file.spi.FileSystemProvider new file mode 100644 index 0000000000..b89f518e56 --- /dev/null +++ b/plugins/nf-tower/src/resources/META-INF/services/java.nio.file.spi.FileSystemProvider @@ -0,0 +1,17 @@ +# +# Copyright 2013-2024, Seqera Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.seqera.tower.plugin.dataset.DatasetFileSystemProvider From 8f6501f9a0ff09290ad331469f53aedfc7f52a46 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 16:52:40 -0600 Subject: [PATCH 4/8] test(nf-tower): unit tests for dataset:// provider DatasetPathTest: URI/string parsing, Path interface, equality DatasetFileSystemProviderTest: scheme, FS creation, read-only enforcement DatasetPathFactoryTest: URI matching, toUriString DatasetResolverTest: WireMock API error cases, auth, workspace param Signed-off-by: Edmund Miller --- .../DatasetFileSystemProviderTest.groovy | 178 ++++++++++++++++ .../dataset/DatasetPathFactoryTest.groovy | 74 +++++++ .../plugin/dataset/DatasetPathTest.groovy | 181 +++++++++++++++++ .../plugin/dataset/DatasetResolverTest.groovy | 191 ++++++++++++++++++ 4 files changed, 624 insertions(+) create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy new file mode 100644 index 0000000000..6231f979e1 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy @@ -0,0 +1,178 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import java.nio.file.AccessMode +import java.nio.file.FileSystemAlreadyExistsException +import java.nio.file.ReadOnlyFileSystemException + +import spock.lang.Specification + +/** + * @author Edmund Miller + */ +class DatasetFileSystemProviderTest extends Specification { + + def 'should return dataset scheme'() { + given: + def provider = new DatasetFileSystemProvider() + + expect: + provider.getScheme() == 'dataset' + } + + def 'should create filesystem'() { + given: + def provider = new DatasetFileSystemProvider() + + when: + def fs = provider.newFileSystem(new URI('dataset:///'), [:]) + + then: + fs instanceof DatasetFileSystem + fs.isOpen() + fs.isReadOnly() + } + + def 'should throw on duplicate filesystem creation'() { + given: + def provider = new DatasetFileSystemProvider() + provider.newFileSystem(new URI('dataset:///'), [:]) + + when: + provider.newFileSystem(new URI('dataset:///'), [:]) + + then: + thrown(FileSystemAlreadyExistsException) + } + + def 'should get path from URI'() { + given: + def provider = new DatasetFileSystemProvider() + + when: + def path = provider.getPath(new URI('dataset://my-data')) + + then: + path instanceof DatasetPath + (path as DatasetPath).datasetName == 'my-data' + } + + def 'should reject non-dataset URI'() { + given: + def provider = new DatasetFileSystemProvider() + + when: + provider.getPath(new URI('s3://bucket/key')) + + then: + thrown(IllegalArgumentException) + } + + // -- read-only enforcement -- + + def 'should throw ReadOnlyFileSystemException on createDirectory'() { + given: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test')) + + when: + provider.createDirectory(path) + + then: + thrown(ReadOnlyFileSystemException) + } + + def 'should throw ReadOnlyFileSystemException on delete'() { + given: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test')) + + when: + provider.delete(path) + + then: + thrown(ReadOnlyFileSystemException) + } + + def 'should throw ReadOnlyFileSystemException on copy'() { + given: + def provider = new DatasetFileSystemProvider() + def src = provider.getPath(new URI('dataset://src')) + def dst = provider.getPath(new URI('dataset://dst')) + + when: + provider.copy(src, dst) + + then: + thrown(ReadOnlyFileSystemException) + } + + def 'should throw ReadOnlyFileSystemException on move'() { + given: + def provider = new DatasetFileSystemProvider() + def src = provider.getPath(new URI('dataset://src')) + def dst = provider.getPath(new URI('dataset://dst')) + + when: + provider.move(src, dst) + + then: + thrown(ReadOnlyFileSystemException) + } + + def 'should throw ReadOnlyFileSystemException on write access check'() { + given: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test')) + + when: + provider.checkAccess(path, AccessMode.WRITE) + + then: + thrown(ReadOnlyFileSystemException) + } + + def 'should not be hidden'() { + given: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test')) + + expect: + !provider.isHidden(path) + } + + def 'should detect same file for equal dataset paths'() { + given: + def provider = new DatasetFileSystemProvider() + def a = provider.getPath(new URI('dataset://test')) + def b = provider.getPath(new URI('dataset://test')) + + expect: + provider.isSameFile(a, b) + } + + def 'should detect different files for different dataset paths'() { + given: + def provider = new DatasetFileSystemProvider() + def a = provider.getPath(new URI('dataset://test1')) + def b = provider.getPath(new URI('dataset://test2')) + + expect: + !provider.isSameFile(a, b) + } +} diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy new file mode 100644 index 0000000000..4e2b90004e --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy @@ -0,0 +1,74 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import spock.lang.Specification + +/** + * @author Edmund Miller + */ +class DatasetPathFactoryTest extends Specification { + + def 'should return null for non-dataset URIs'() { + given: + def factory = new DatasetPathFactory() + + expect: + factory.parseUri('s3://bucket/key') == null + factory.parseUri('/local/path') == null + factory.parseUri('gs://bucket/key') == null + } + + def 'should return null for toUriString with non-DatasetPath'() { + given: + def factory = new DatasetPathFactory() + def localPath = java.nio.file.Paths.get('/tmp/test') + + expect: + factory.toUriString(localPath) == null + } + + def 'should return uri string for DatasetPath'() { + given: + def factory = new DatasetPathFactory() + def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) + def path = new DatasetPath(fs, 'my-data') + + expect: + factory.toUriString(path) == 'dataset://my-data' + } + + def 'should return null for getBashLib'() { + given: + def factory = new DatasetPathFactory() + def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) + def path = new DatasetPath(fs, 'my-data') + + expect: + factory.getBashLib(path) == null + } + + def 'should return null for getUploadCmd'() { + given: + def factory = new DatasetPathFactory() + def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) + def path = new DatasetPath(fs, 'my-data') + + expect: + factory.getUploadCmd('/tmp/file', path) == null + } +} diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy new file mode 100644 index 0000000000..57c880d43a --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy @@ -0,0 +1,181 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import spock.lang.Specification + +/** + * @author Edmund Miller + */ +class DatasetPathTest extends Specification { + + DatasetFileSystem makeFs() { + def provider = new DatasetFileSystemProvider() + new DatasetFileSystem(provider, null) + } + + // -- URI construction -- + + def 'should parse dataset URI with host-style name'() { + given: + def uri = new URI('dataset://my-samplesheet') + def path = new DatasetPath(makeFs(), uri) + + expect: + path.datasetName == 'my-samplesheet' + path.version == null + path.toString() == 'dataset://my-samplesheet' + } + + def 'should parse dataset URI with triple-slash form'() { + given: + def uri = new URI('dataset:///my-samplesheet') + def path = new DatasetPath(makeFs(), uri) + + expect: + path.datasetName == 'my-samplesheet' + path.version == null + } + + def 'should parse dataset URI with version query param'() { + given: + def uri = new URI('dataset://my-samplesheet?version=3') + def path = new DatasetPath(makeFs(), uri) + + expect: + path.datasetName == 'my-samplesheet' + path.version == '3' + path.toString() == 'dataset://my-samplesheet?version=3' + } + + // -- string construction -- + + def 'should parse string path'() { + given: + def path = new DatasetPath(makeFs(), 'my-samplesheet') + + expect: + path.datasetName == 'my-samplesheet' + path.version == null + } + + def 'should parse string path with leading slash'() { + given: + def path = new DatasetPath(makeFs(), '/my-samplesheet') + + expect: + path.datasetName == 'my-samplesheet' + path.version == null + } + + def 'should parse string path with version suffix'() { + given: + def path = new DatasetPath(makeFs(), 'my-samplesheet@2') + + expect: + path.datasetName == 'my-samplesheet' + path.version == '2' + } + + // -- Path interface -- + + def 'should be absolute'() { + expect: + new DatasetPath(makeFs(), 'test').isAbsolute() + } + + def 'should have name count of 1'() { + expect: + new DatasetPath(makeFs(), 'test').getNameCount() == 1 + } + + def 'should return self for getName(0)'() { + given: + def path = new DatasetPath(makeFs(), 'test') + + expect: + path.getName(0) == path + } + + def 'should throw for getName with invalid index'() { + when: + new DatasetPath(makeFs(), 'test').getName(1) + + then: + thrown(IllegalArgumentException) + } + + def 'should return correct URI'() { + given: + def path = new DatasetPath(makeFs(), 'my-data') + + expect: + path.toUri() == new URI('dataset://my-data') + } + + def 'should return self for toAbsolutePath'() { + given: + def path = new DatasetPath(makeFs(), 'test') + + expect: + path.toAbsolutePath().is(path) + } + + // -- equality -- + + def 'should be equal for same name and version'() { + given: + def a = new DatasetPath(makeFs(), 'data') + def b = new DatasetPath(makeFs(), 'data') + + expect: + a == b + a.hashCode() == b.hashCode() + } + + def 'should not be equal for different names'() { + given: + def a = new DatasetPath(makeFs(), 'data1') + def b = new DatasetPath(makeFs(), 'data2') + + expect: + a != b + } + + def 'should not be equal for different versions'() { + given: + def a = new DatasetPath(makeFs(), 'data@1') + def b = new DatasetPath(makeFs(), 'data@2') + + expect: + a != b + } + + // -- compareTo -- + + def 'should compare by name then version'() { + given: + def a = new DatasetPath(makeFs(), 'alpha') + def b = new DatasetPath(makeFs(), 'beta') + def c = new DatasetPath(makeFs(), 'alpha@2') + + expect: + a.compareTo(b) < 0 + b.compareTo(a) > 0 + a.compareTo(c) < 0 // null version < "2" + } +} diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy new file mode 100644 index 0000000000..8b97218280 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy @@ -0,0 +1,191 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock +import nextflow.Global +import nextflow.Session +import nextflow.exception.AbortOperationException +import spock.lang.AutoCleanup +import spock.lang.Specification + +import static com.github.tomakehurst.wiremock.client.WireMock.* + +/** + * @author Edmund Miller + */ +class DatasetResolverTest extends Specification { + + @AutoCleanup('stop') + WireMockServer wireMock + + def setup() { + wireMock = new WireMockServer(0) + wireMock.start() + WireMock.configureFor(wireMock.port()) + } + + def cleanup() { + Global.session = null + } + + private void mockSession(Map extra = [:]) { + def endpoint = "http://localhost:${wireMock.port()}" + def config = [tower: [endpoint: endpoint, accessToken: 'test-token', workspaceId: '12345'] + extra] + Global.session = Mock(Session) { + getConfig() >> config + } + } + + def 'should throw when no session'() { + given: + Global.session = null + + when: + DatasetResolver.resolve('my-data', null) + + then: + thrown(AbortOperationException) + } + + def 'should throw when dataset name is empty'() { + when: + DatasetResolver.resolve('', null) + + then: + thrown(IllegalArgumentException) + } + + def 'should throw when dataset not found'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "1", "name": "other-data"}]}'))) + + when: + DatasetResolver.resolve('my-data', null) + + then: + def e = thrown(AbortOperationException) + e.message.contains("not found") + e.message.contains("other-data") + } + + def 'should throw when no datasets in workspace'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": []}'))) + + when: + DatasetResolver.resolve('my-data', null) + + then: + thrown(AbortOperationException) + } + + def 'should throw when API returns 401'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(unauthorized())) + + when: + DatasetResolver.resolve('my-data', null) + + then: + def e = thrown(AbortOperationException) + e.message.contains("Access denied") + } + + def 'should throw when version has no backing URL'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) + .willReturn(okJson('{"versions": [{"version": 1, "url": null}]}'))) + + when: + DatasetResolver.resolve('my-data', null) + + then: + def e = thrown(AbortOperationException) + e.message.contains("no backing storage URL") + } + + def 'should throw when specific version not found'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) + .willReturn(okJson('{"versions": [{"version": 1, "url": "s3://bucket/v1.csv"}]}'))) + + when: + DatasetResolver.resolve('my-data', '99') + + then: + def e = thrown(AbortOperationException) + e.message.contains("Version '99' not found") + } + + def 'should pass workspace ID as query param'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .withQueryParam('workspaceId', equalTo('12345')) + .willReturn(okJson('{"datasets": []}'))) + + when: + DatasetResolver.resolve('my-data', null) + + then: + // will throw because no datasets, but the important thing + // is the request was made with correct query param + thrown(AbortOperationException) + + and: + wireMock.verify(getRequestedFor(urlPathEqualTo('/datasets')) + .withQueryParam('workspaceId', equalTo('12345'))) + } + + def 'should send bearer token in Authorization header'() { + given: + mockSession() + + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": []}'))) + + when: + DatasetResolver.resolve('my-data', null) + + then: + thrown(AbortOperationException) + + and: + wireMock.verify(getRequestedFor(urlPathEqualTo('/datasets')) + .withHeader('Authorization', equalTo('Bearer test-token'))) + } +} From 8e16e49396cd159ba355d31dd02fec4dfe9899f3 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 16:52:44 -0600 Subject: [PATCH 5/8] test(nf-tower): integration tests for dataset:// end-to-end flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WireMock Platform API + local file:// as resolved storage. Tests: - Full resolve → read file contents - Specific version selection - Latest version selection (picks highest) - Provider newInputStream/readAttributes delegation - Resolved path caching (API called once across multiple reads) Signed-off-by: Edmund Miller --- .../dataset/DatasetIntegrationTest.groovy | 230 ++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy new file mode 100644 index 0000000000..97557ef1c6 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy @@ -0,0 +1,230 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import java.nio.file.Files + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock +import nextflow.Global +import nextflow.Session +import spock.lang.AutoCleanup +import spock.lang.Specification +import spock.lang.TempDir + +import static com.github.tomakehurst.wiremock.client.WireMock.* + +/** + * Integration tests that exercise the full dataset:// flow: + * URI string → DatasetPathFactory → DatasetPath → DatasetResolver → resolved Path → I/O + * + * Uses WireMock for Platform API and local file:// paths as the resolved "cloud" storage. + * + * @author Edmund Miller + */ +class DatasetIntegrationTest extends Specification { + + @AutoCleanup('stop') + WireMockServer wireMock + + @TempDir + File tempDir + + def setup() { + wireMock = new WireMockServer(0) + wireMock.start() + WireMock.configureFor(wireMock.port()) + } + + def cleanup() { + Global.session = null + } + + private void mockSession() { + def endpoint = "http://localhost:${wireMock.port()}" + def config = [tower: [endpoint: endpoint, accessToken: 'test-token', workspaceId: '100']] + Global.session = Mock(Session) { + getConfig() >> config + } + } + + def 'should resolve dataset path and read file contents'() { + given: 'a local file simulating cloud storage' + def csvFile = new File(tempDir, 'samples.csv') + csvFile.text = 'sample,fastq_1,fastq_2\nSAMPLE1,s1_R1.fq.gz,s1_R2.fq.gz\n' + def fileUri = csvFile.toURI().toString() + + and: 'Platform API mocks' + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .withQueryParam('workspaceId', equalTo('100')) + .willReturn(okJson("""{"datasets": [ + {"id": "42", "name": "my-samplesheet", "mediaType": "text/csv"} + ]}"""))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) + .withQueryParam('workspaceId', equalTo('100')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${fileUri}", "fileName": "samples.csv"} + ]}"""))) + + when: 'resolving the dataset path' + def path = DatasetResolver.resolve('my-samplesheet', null) + + then: 'resolved to the local file and readable' + path != null + Files.exists(path) + Files.readString(path).contains('SAMPLE1') + + and: 'correct API calls were made' + wireMock.verify(getRequestedFor(urlPathEqualTo('/datasets')) + .withHeader('Authorization', equalTo('Bearer test-token')) + .withQueryParam('workspaceId', equalTo('100'))) + wireMock.verify(getRequestedFor(urlPathEqualTo('/datasets/42/versions')) + .withQueryParam('workspaceId', equalTo('100'))) + } + + def 'should resolve specific version'() { + given: + def v1File = new File(tempDir, 'v1.csv') + v1File.text = 'version1' + def v2File = new File(tempDir, 'v2.csv') + v2File.text = 'version2' + + and: + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${v1File.toURI()}"}, + {"version": 2, "url": "${v2File.toURI()}"} + ]}"""))) + + when: 'requesting version 1' + def path = DatasetResolver.resolve('my-data', '1') + + then: + Files.readString(path) == 'version1' + } + + def 'should resolve latest version when multiple exist'() { + given: + def v1File = new File(tempDir, 'v1.csv') + v1File.text = 'old' + def v3File = new File(tempDir, 'v3.csv') + v3File.text = 'latest' + + and: + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${v1File.toURI()}"}, + {"version": 3, "url": "${v3File.toURI()}"} + ]}"""))) + + when: 'requesting latest (no version specified)' + def path = DatasetResolver.resolve('my-data', null) + + then: 'gets version 3 (highest)' + Files.readString(path) == 'latest' + } + + def 'should read dataset file via provider newInputStream'() { + given: + def csvFile = new File(tempDir, 'data.csv') + csvFile.text = 'col1,col2\na,b\n' + + and: + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${csvFile.toURI()}"} + ]}"""))) + + and: 'a DatasetPath created via the provider' + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test-ds')) + + when: + def content = provider.newInputStream(path).text + + then: + content == 'col1,col2\na,b\n' + } + + def 'should read attributes via provider'() { + given: + def csvFile = new File(tempDir, 'data.csv') + csvFile.text = 'hello' + + and: + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${csvFile.toURI()}"} + ]}"""))) + + and: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test-ds')) + + when: + def attrs = provider.readAttributes(path, java.nio.file.attribute.BasicFileAttributes, new java.nio.file.LinkOption[0]) + + then: + attrs.size() == 5 + !attrs.isDirectory() + attrs.isRegularFile() + } + + def 'should cache resolved path across multiple reads'() { + given: + def csvFile = new File(tempDir, 'data.csv') + csvFile.text = 'cached' + + and: + mockSession() + wireMock.stubFor(get(urlPathEqualTo('/datasets')) + .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) + wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) + .willReturn(okJson("""{"versions": [ + {"version": 1, "url": "${csvFile.toURI()}"} + ]}"""))) + + and: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://test-ds')) + + when: 'reading twice' + def content1 = provider.newInputStream(path).text + def content2 = provider.newInputStream(path).text + + then: 'both reads succeed' + content1 == 'cached' + content2 == 'cached' + + and: 'API was only called once (path cached on DatasetPath)' + wireMock.verify(1, getRequestedFor(urlPathEqualTo('/datasets'))) + wireMock.verify(1, getRequestedFor(urlPathEqualTo('/datasets/7/versions'))) + } +} From 11a400eefee8d6e864f4a04c46c5c50637dbd486 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 17:01:16 -0600 Subject: [PATCH 6/8] test: simplify dataset specs w spock tables Signed-off-by: Edmund Miller --- .../DatasetFileSystemProviderTest.groovy | 98 +++------- .../dataset/DatasetIntegrationTest.groovy | 176 +++++------------ .../dataset/DatasetPathFactoryTest.groovy | 64 +++---- .../plugin/dataset/DatasetPathTest.groovy | 177 ++++++------------ .../plugin/dataset/DatasetResolverTest.groovy | 127 +++---------- .../plugin/dataset/DatasetWireMockSpec.groovy | 76 ++++++++ 6 files changed, 270 insertions(+), 448 deletions(-) create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetWireMockSpec.groovy diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy index 6231f979e1..22d398fdc1 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetFileSystemProviderTest.groovy @@ -20,6 +20,7 @@ import java.nio.file.AccessMode import java.nio.file.FileSystemAlreadyExistsException import java.nio.file.ReadOnlyFileSystemException +import spock.lang.Unroll import spock.lang.Specification /** @@ -28,11 +29,8 @@ import spock.lang.Specification class DatasetFileSystemProviderTest extends Specification { def 'should return dataset scheme'() { - given: - def provider = new DatasetFileSystemProvider() - expect: - provider.getScheme() == 'dataset' + new DatasetFileSystemProvider().getScheme() == 'dataset' } def 'should create filesystem'() { @@ -72,79 +70,39 @@ class DatasetFileSystemProviderTest extends Specification { (path as DatasetPath).datasetName == 'my-data' } - def 'should reject non-dataset URI'() { + @Unroll + def 'should reject non-dataset URI #uriString'() { given: def provider = new DatasetFileSystemProvider() when: - provider.getPath(new URI('s3://bucket/key')) + provider.getPath(new URI(uriString)) then: thrown(IllegalArgumentException) - } - - // -- read-only enforcement -- - - def 'should throw ReadOnlyFileSystemException on createDirectory'() { - given: - def provider = new DatasetFileSystemProvider() - def path = provider.getPath(new URI('dataset://test')) - - when: - provider.createDirectory(path) - - then: - thrown(ReadOnlyFileSystemException) - } - - def 'should throw ReadOnlyFileSystemException on delete'() { - given: - def provider = new DatasetFileSystemProvider() - def path = provider.getPath(new URI('dataset://test')) - - when: - provider.delete(path) - - then: - thrown(ReadOnlyFileSystemException) - } - - def 'should throw ReadOnlyFileSystemException on copy'() { - given: - def provider = new DatasetFileSystemProvider() - def src = provider.getPath(new URI('dataset://src')) - def dst = provider.getPath(new URI('dataset://dst')) - - when: - provider.copy(src, dst) - then: - thrown(ReadOnlyFileSystemException) + where: + uriString << ['s3://bucket/key', 'gs://bucket/key', 'file:///tmp/data.csv'] } - def 'should throw ReadOnlyFileSystemException on move'() { + @Unroll + def 'should throw ReadOnlyFileSystemException on #operation'() { given: def provider = new DatasetFileSystemProvider() - def src = provider.getPath(new URI('dataset://src')) - def dst = provider.getPath(new URI('dataset://dst')) when: - provider.move(src, dst) + invoke.call(provider) then: thrown(ReadOnlyFileSystemException) - } - def 'should throw ReadOnlyFileSystemException on write access check'() { - given: - def provider = new DatasetFileSystemProvider() - def path = provider.getPath(new URI('dataset://test')) - - when: - provider.checkAccess(path, AccessMode.WRITE) - - then: - thrown(ReadOnlyFileSystemException) + where: + operation | invoke + 'createDirectory' | { DatasetFileSystemProvider p -> p.createDirectory(p.getPath(new URI('dataset://test'))) } + 'delete' | { DatasetFileSystemProvider p -> p.delete(p.getPath(new URI('dataset://test'))) } + 'copy' | { DatasetFileSystemProvider p -> p.copy(p.getPath(new URI('dataset://src')), p.getPath(new URI('dataset://dst'))) } + 'move' | { DatasetFileSystemProvider p -> p.move(p.getPath(new URI('dataset://src')), p.getPath(new URI('dataset://dst'))) } + 'write access check' | { DatasetFileSystemProvider p -> p.checkAccess(p.getPath(new URI('dataset://test')), AccessMode.WRITE) } } def 'should not be hidden'() { @@ -156,23 +114,19 @@ class DatasetFileSystemProviderTest extends Specification { !provider.isHidden(path) } - def 'should detect same file for equal dataset paths'() { + @Unroll + def 'should report isSameFile=#expected for #left vs #right'() { given: def provider = new DatasetFileSystemProvider() - def a = provider.getPath(new URI('dataset://test')) - def b = provider.getPath(new URI('dataset://test')) + def a = provider.getPath(new URI(left)) + def b = provider.getPath(new URI(right)) expect: - provider.isSameFile(a, b) - } + provider.isSameFile(a, b) == expected - def 'should detect different files for different dataset paths'() { - given: - def provider = new DatasetFileSystemProvider() - def a = provider.getPath(new URI('dataset://test1')) - def b = provider.getPath(new URI('dataset://test2')) - - expect: - !provider.isSameFile(a, b) + where: + left | right | expected + 'dataset://test' | 'dataset://test' | true + 'dataset://test1' | 'dataset://test2' | false } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy index 97557ef1c6..7d1026a12e 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy @@ -17,14 +17,11 @@ package io.seqera.tower.plugin.dataset import java.nio.file.Files +import java.nio.file.LinkOption +import java.nio.file.attribute.BasicFileAttributes -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock -import nextflow.Global -import nextflow.Session -import spock.lang.AutoCleanup -import spock.lang.Specification import spock.lang.TempDir +import spock.lang.Unroll import static com.github.tomakehurst.wiremock.client.WireMock.* @@ -36,50 +33,19 @@ import static com.github.tomakehurst.wiremock.client.WireMock.* * * @author Edmund Miller */ -class DatasetIntegrationTest extends Specification { - - @AutoCleanup('stop') - WireMockServer wireMock +class DatasetIntegrationTest extends DatasetWireMockSpec { @TempDir File tempDir - def setup() { - wireMock = new WireMockServer(0) - wireMock.start() - WireMock.configureFor(wireMock.port()) - } - - def cleanup() { - Global.session = null - } - - private void mockSession() { - def endpoint = "http://localhost:${wireMock.port()}" - def config = [tower: [endpoint: endpoint, accessToken: 'test-token', workspaceId: '100']] - Global.session = Mock(Session) { - getConfig() >> config - } - } - def 'should resolve dataset path and read file contents'() { given: 'a local file simulating cloud storage' - def csvFile = new File(tempDir, 'samples.csv') - csvFile.text = 'sample,fastq_1,fastq_2\nSAMPLE1,s1_R1.fq.gz,s1_R2.fq.gz\n' - def fileUri = csvFile.toURI().toString() + def csvFile = makeFile('samples.csv', 'sample,fastq_1,fastq_2\nSAMPLE1,s1_R1.fq.gz,s1_R2.fq.gz\n') and: 'Platform API mocks' - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .withQueryParam('workspaceId', equalTo('100')) - .willReturn(okJson("""{"datasets": [ - {"id": "42", "name": "my-samplesheet", "mediaType": "text/csv"} - ]}"""))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) - .withQueryParam('workspaceId', equalTo('100')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${fileUri}", "fileName": "samples.csv"} - ]}"""))) + mockSession(workspaceId: '100') + stubDatasets([[id: '42', name: 'my-samplesheet', mediaType: 'text/csv']], '100') + stubDatasetVersions('42', [[version: 1, url: csvFile.toURI().toString(), fileName: 'samples.csv']], '100') when: 'resolving the dataset path' def path = DatasetResolver.resolve('my-samplesheet', null) @@ -97,119 +63,65 @@ class DatasetIntegrationTest extends Specification { .withQueryParam('workspaceId', equalTo('100'))) } - def 'should resolve specific version'() { + @Unroll + def 'should resolve #scenario'() { given: - def v1File = new File(tempDir, 'v1.csv') - v1File.text = 'version1' - def v2File = new File(tempDir, 'v2.csv') - v2File.text = 'version2' + mockSession(workspaceId: '100') + stubDatasets([[id: '42', name: 'my-data']]) and: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${v1File.toURI()}"}, - {"version": 2, "url": "${v2File.toURI()}"} - ]}"""))) - - when: 'requesting version 1' - def path = DatasetResolver.resolve('my-data', '1') - - then: - Files.readString(path) == 'version1' - } - - def 'should resolve latest version when multiple exist'() { - given: - def v1File = new File(tempDir, 'v1.csv') - v1File.text = 'old' - def v3File = new File(tempDir, 'v3.csv') - v3File.text = 'latest' - - and: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${v1File.toURI()}"}, - {"version": 3, "url": "${v3File.toURI()}"} - ]}"""))) - - when: 'requesting latest (no version specified)' - def path = DatasetResolver.resolve('my-data', null) - - then: 'gets version 3 (highest)' - Files.readString(path) == 'latest' - } - - def 'should read dataset file via provider newInputStream'() { - given: - def csvFile = new File(tempDir, 'data.csv') - csvFile.text = 'col1,col2\na,b\n' - - and: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${csvFile.toURI()}"} - ]}"""))) - - and: 'a DatasetPath created via the provider' - def provider = new DatasetFileSystemProvider() - def path = provider.getPath(new URI('dataset://test-ds')) + def versions = versionRows.collect { row -> + def file = makeFile(row.fileName as String, row.content as String) + [version: row.version, url: file.toURI().toString()] + } + stubDatasetVersions('42', versions) when: - def content = provider.newInputStream(path).text + def path = DatasetResolver.resolve('my-data', requestedVersion) then: - content == 'col1,col2\na,b\n' + Files.readString(path) == expectedContent + + where: + scenario | versionRows | requestedVersion | expectedContent + 'specific dataset version' | [[version: 1, fileName: 'v1.csv', content: 'version1'], [version: 2, fileName: 'v2.csv', content: 'version2']] | '1' | 'version1' + 'latest dataset version' | [[version: 1, fileName: 'v1.csv', content: 'old'], [version: 3, fileName: 'v3.csv', content: 'latest']] | null | 'latest' } - def 'should read attributes via provider'() { + @Unroll + def 'should delegate #operation through provider'() { given: - def csvFile = new File(tempDir, 'data.csv') - csvFile.text = 'hello' + def dataFile = makeFile('data.csv', fileContent) and: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${csvFile.toURI()}"} - ]}"""))) + mockSession(workspaceId: '100') + stubDatasets([[id: '7', name: 'test-ds']]) + stubDatasetVersions('7', [[version: 1, url: dataFile.toURI().toString()]]) and: def provider = new DatasetFileSystemProvider() def path = provider.getPath(new URI('dataset://test-ds')) when: - def attrs = provider.readAttributes(path, java.nio.file.attribute.BasicFileAttributes, new java.nio.file.LinkOption[0]) + def result = operationFn.call(provider, path) then: - attrs.size() == 5 - !attrs.isDirectory() - attrs.isRegularFile() + assert assertFn.call(result) + + where: + operation | fileContent | operationFn | assertFn + 'newInputStream' | 'col1,col2\na,b\n' | { DatasetFileSystemProvider providerRef, dsPath -> providerRef.newInputStream(dsPath).text } | { value -> value == 'col1,col2\na,b\n' } + 'readAttributes' | 'hello' | { DatasetFileSystemProvider providerRef, dsPath -> providerRef.readAttributes(dsPath, BasicFileAttributes, new LinkOption[0]) } | { attrs -> attrs.size() == 5 && !attrs.isDirectory() && attrs.isRegularFile() } } def 'should cache resolved path across multiple reads'() { given: - def csvFile = new File(tempDir, 'data.csv') - csvFile.text = 'cached' + def dataFile = makeFile('data.csv', 'cached') and: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "7", "name": "test-ds"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/7/versions')) - .willReturn(okJson("""{"versions": [ - {"version": 1, "url": "${csvFile.toURI()}"} - ]}"""))) + mockSession(workspaceId: '100') + stubDatasets([[id: '7', name: 'test-ds']]) + stubDatasetVersions('7', [[version: 1, url: dataFile.toURI().toString()]]) and: def provider = new DatasetFileSystemProvider() @@ -227,4 +139,10 @@ class DatasetIntegrationTest extends Specification { wireMock.verify(1, getRequestedFor(urlPathEqualTo('/datasets'))) wireMock.verify(1, getRequestedFor(urlPathEqualTo('/datasets/7/versions'))) } + + private File makeFile(String name, String content) { + def file = new File(tempDir, name) + file.text = content + return file + } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy index 4e2b90004e..85842ccb65 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathFactoryTest.groovy @@ -16,59 +16,57 @@ package io.seqera.tower.plugin.dataset +import java.nio.file.Paths + +import spock.lang.Shared import spock.lang.Specification +import spock.lang.Unroll /** * @author Edmund Miller */ class DatasetPathFactoryTest extends Specification { - def 'should return null for non-dataset URIs'() { - given: - def factory = new DatasetPathFactory() - - expect: - factory.parseUri('s3://bucket/key') == null - factory.parseUri('/local/path') == null - factory.parseUri('gs://bucket/key') == null - } + @Shared + DatasetPathFactory factory = new DatasetPathFactory() - def 'should return null for toUriString with non-DatasetPath'() { - given: - def factory = new DatasetPathFactory() - def localPath = java.nio.file.Paths.get('/tmp/test') + @Shared + DatasetFileSystem datasetFs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) + @Unroll + def 'parseUri should return null for non-dataset URI #value'() { expect: - factory.toUriString(localPath) == null - } - - def 'should return uri string for DatasetPath'() { - given: - def factory = new DatasetPathFactory() - def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) - def path = new DatasetPath(fs, 'my-data') + factory.parseUri(value) == null - expect: - factory.toUriString(path) == 'dataset://my-data' + where: + value << ['s3://bucket/key', '/local/path', 'gs://bucket/key'] } - def 'should return null for getBashLib'() { + @Unroll + def 'toUriString should return #expected for #scenario'() { given: - def factory = new DatasetPathFactory() - def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) - def path = new DatasetPath(fs, 'my-data') + def path = pathFactory.call() expect: - factory.getBashLib(path) == null + factory.toUriString(path) == expected + + where: + scenario | pathFactory | expected + 'DatasetPath input' | { new DatasetPath(datasetFs, 'my-data') } | 'dataset://my-data' + 'non-DatasetPath' | { Paths.get('/tmp/test') } | null } - def 'should return null for getUploadCmd'() { + @Unroll + def '#methodName should return null for dataset path'() { given: - def factory = new DatasetPathFactory() - def fs = new DatasetFileSystem(new DatasetFileSystemProvider(), null) - def path = new DatasetPath(fs, 'my-data') + def path = new DatasetPath(datasetFs, 'my-data') expect: - factory.getUploadCmd('/tmp/file', path) == null + invoke.call(factory, path) == null + + where: + methodName | invoke + 'getBashLib' | { DatasetPathFactory f, target -> f.getBashLib(target) } + 'getUploadCmd' | { DatasetPathFactory f, target -> f.getUploadCmd('/tmp/file', target) } } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy index 57c880d43a..fd18706130 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetPathTest.groovy @@ -16,166 +16,109 @@ package io.seqera.tower.plugin.dataset +import spock.lang.Shared import spock.lang.Specification +import spock.lang.Unroll /** * @author Edmund Miller */ class DatasetPathTest extends Specification { - DatasetFileSystem makeFs() { - def provider = new DatasetFileSystemProvider() - new DatasetFileSystem(provider, null) - } - - // -- URI construction -- - - def 'should parse dataset URI with host-style name'() { - given: - def uri = new URI('dataset://my-samplesheet') - def path = new DatasetPath(makeFs(), uri) - - expect: - path.datasetName == 'my-samplesheet' - path.version == null - path.toString() == 'dataset://my-samplesheet' - } - - def 'should parse dataset URI with triple-slash form'() { - given: - def uri = new URI('dataset:///my-samplesheet') - def path = new DatasetPath(makeFs(), uri) - - expect: - path.datasetName == 'my-samplesheet' - path.version == null - } + @Shared + DatasetFileSystem fileSystem = new DatasetFileSystem(new DatasetFileSystemProvider(), null) - def 'should parse dataset URI with version query param'() { + @Unroll + def 'should parse dataset URI #uriString'() { given: - def uri = new URI('dataset://my-samplesheet?version=3') - def path = new DatasetPath(makeFs(), uri) + def path = new DatasetPath(fileSystem, new URI(uriString)) expect: - path.datasetName == 'my-samplesheet' - path.version == '3' - path.toString() == 'dataset://my-samplesheet?version=3' - } - - // -- string construction -- - - def 'should parse string path'() { - given: - def path = new DatasetPath(makeFs(), 'my-samplesheet') - - expect: - path.datasetName == 'my-samplesheet' - path.version == null - } - - def 'should parse string path with leading slash'() { - given: - def path = new DatasetPath(makeFs(), '/my-samplesheet') + path.datasetName == expectedName + path.version == expectedVersion + path.toString() == expectedToString - expect: - path.datasetName == 'my-samplesheet' - path.version == null + where: + uriString | expectedName | expectedVersion | expectedToString + 'dataset://my-samplesheet' | 'my-samplesheet' | null | 'dataset://my-samplesheet' + 'dataset:///my-samplesheet' | 'my-samplesheet' | null | 'dataset://my-samplesheet' + 'dataset://my-samplesheet?version=3' | 'my-samplesheet' | '3' | 'dataset://my-samplesheet?version=3' } - def 'should parse string path with version suffix'() { + @Unroll + def 'should parse string path #rawPath'() { given: - def path = new DatasetPath(makeFs(), 'my-samplesheet@2') - - expect: - path.datasetName == 'my-samplesheet' - path.version == '2' - } - - // -- Path interface -- + def path = new DatasetPath(fileSystem, rawPath) - def 'should be absolute'() { expect: - new DatasetPath(makeFs(), 'test').isAbsolute() - } + path.datasetName == expectedName + path.version == expectedVersion - def 'should have name count of 1'() { - expect: - new DatasetPath(makeFs(), 'test').getNameCount() == 1 + where: + rawPath | expectedName | expectedVersion + 'my-samplesheet' | 'my-samplesheet' | null + '/my-samplesheet' | 'my-samplesheet' | null + 'my-samplesheet@2' | 'my-samplesheet' | '2' } - def 'should return self for getName(0)'() { + def 'should expose basic path semantics'() { given: - def path = new DatasetPath(makeFs(), 'test') + def path = new DatasetPath(fileSystem, 'test') expect: + path.isAbsolute() + path.getNameCount() == 1 path.getName(0) == path + path.toUri() == new URI('dataset://test') + path.toAbsolutePath().is(path) } - def 'should throw for getName with invalid index'() { + @Unroll + def 'should throw for getName(#index)'() { when: - new DatasetPath(makeFs(), 'test').getName(1) + new DatasetPath(fileSystem, 'test').getName(index) then: thrown(IllegalArgumentException) - } - - def 'should return correct URI'() { - given: - def path = new DatasetPath(makeFs(), 'my-data') - - expect: - path.toUri() == new URI('dataset://my-data') - } - - def 'should return self for toAbsolutePath'() { - given: - def path = new DatasetPath(makeFs(), 'test') - expect: - path.toAbsolutePath().is(path) + where: + index << [-1, 1] } - // -- equality -- - - def 'should be equal for same name and version'() { + @Unroll + def 'should compare equality for #left vs #right'() { given: - def a = new DatasetPath(makeFs(), 'data') - def b = new DatasetPath(makeFs(), 'data') + def a = new DatasetPath(fileSystem, left) + def b = new DatasetPath(fileSystem, right) expect: - a == b - a.hashCode() == b.hashCode() - } + (a == b) == expectedEqual - def 'should not be equal for different names'() { - given: - def a = new DatasetPath(makeFs(), 'data1') - def b = new DatasetPath(makeFs(), 'data2') + and: + if (expectedEqual) { + assert a.hashCode() == b.hashCode() + } - expect: - a != b + where: + left | right | expectedEqual + 'data' | 'data' | true + 'data1' | 'data2' | false + 'data@1' | 'data@2' | false } - def 'should not be equal for different versions'() { + @Unroll + def 'should compare order for #left compared to #right'() { given: - def a = new DatasetPath(makeFs(), 'data@1') - def b = new DatasetPath(makeFs(), 'data@2') + def a = new DatasetPath(fileSystem, left) + def b = new DatasetPath(fileSystem, right) expect: - a != b - } - - // -- compareTo -- + Integer.signum(a.compareTo(b)) == expectedSign - def 'should compare by name then version'() { - given: - def a = new DatasetPath(makeFs(), 'alpha') - def b = new DatasetPath(makeFs(), 'beta') - def c = new DatasetPath(makeFs(), 'alpha@2') - - expect: - a.compareTo(b) < 0 - b.compareTo(a) > 0 - a.compareTo(c) < 0 // null version < "2" + where: + left | right | expectedSign + 'alpha' | 'beta' | -1 + 'beta' | 'alpha' | 1 + 'alpha' | 'alpha@2' | -1 } } diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy index 8b97218280..f45e01a1d3 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetResolverTest.groovy @@ -16,41 +16,16 @@ package io.seqera.tower.plugin.dataset -import com.github.tomakehurst.wiremock.WireMockServer -import com.github.tomakehurst.wiremock.client.WireMock import nextflow.Global -import nextflow.Session import nextflow.exception.AbortOperationException -import spock.lang.AutoCleanup -import spock.lang.Specification +import spock.lang.Unroll import static com.github.tomakehurst.wiremock.client.WireMock.* /** * @author Edmund Miller */ -class DatasetResolverTest extends Specification { - - @AutoCleanup('stop') - WireMockServer wireMock - - def setup() { - wireMock = new WireMockServer(0) - wireMock.start() - WireMock.configureFor(wireMock.port()) - } - - def cleanup() { - Global.session = null - } - - private void mockSession(Map extra = [:]) { - def endpoint = "http://localhost:${wireMock.port()}" - def config = [tower: [endpoint: endpoint, accessToken: 'test-token', workspaceId: '12345'] + extra] - Global.session = Mock(Session) { - getConfig() >> config - } - } +class DatasetResolverTest extends DatasetWireMockSpec { def 'should throw when no session'() { given: @@ -63,107 +38,67 @@ class DatasetResolverTest extends Specification { thrown(AbortOperationException) } - def 'should throw when dataset name is empty'() { + @Unroll + def 'should reject invalid dataset name: #datasetName'() { when: - DatasetResolver.resolve('', null) + DatasetResolver.resolve(datasetName, null) then: thrown(IllegalArgumentException) - } - - def 'should throw when dataset not found'() { - given: - mockSession() - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "1", "name": "other-data"}]}'))) - - when: - DatasetResolver.resolve('my-data', null) - - then: - def e = thrown(AbortOperationException) - e.message.contains("not found") - e.message.contains("other-data") + where: + datasetName << ['', null] } - def 'should throw when no datasets in workspace'() { + @Unroll + def 'should fail dataset lookup when #scenario'() { given: mockSession() - - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": []}'))) - - when: - DatasetResolver.resolve('my-data', null) - - then: - thrown(AbortOperationException) - } - - def 'should throw when API returns 401'() { - given: - mockSession() - - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(unauthorized())) + stubLookup.call() when: DatasetResolver.resolve('my-data', null) then: def e = thrown(AbortOperationException) - e.message.contains("Access denied") - } - - def 'should throw when version has no backing URL'() { - given: - mockSession() + messageParts.each { part -> assert e.message.contains(part) } - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) - .willReturn(okJson('{"versions": [{"version": 1, "url": null}]}'))) - - when: - DatasetResolver.resolve('my-data', null) - - then: - def e = thrown(AbortOperationException) - e.message.contains("no backing storage URL") + where: + scenario | stubLookup | messageParts + 'dataset does not exist' | { stubDatasets([[id: '1', name: 'other-data']]) } | ['not found', 'other-data'] + 'workspace has no datasets' | { stubDatasets([]) } | ['No datasets found in workspace'] + 'api returns unauthorized' | { wireMock.stubFor(get(urlPathEqualTo('/datasets')).willReturn(unauthorized())) } | ['Access denied'] } - def 'should throw when specific version not found'() { + @Unroll + def 'should fail version lookup when #scenario'() { given: mockSession() - - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": [{"id": "42", "name": "my-data"}]}'))) - wireMock.stubFor(get(urlPathEqualTo('/datasets/42/versions')) - .willReturn(okJson('{"versions": [{"version": 1, "url": "s3://bucket/v1.csv"}]}'))) + stubDatasets([[id: '42', name: 'my-data']]) + stubDatasetVersions('42', versions) when: - DatasetResolver.resolve('my-data', '99') + DatasetResolver.resolve('my-data', requestedVersion) then: def e = thrown(AbortOperationException) - e.message.contains("Version '99' not found") + e.message.contains(expectedMessage) + + where: + scenario | versions | requestedVersion | expectedMessage + 'selected version has no backing URL' | [[version: 1, url: null]] | null | 'no backing storage URL' + 'requested version does not exist' | [[version: 1, url: 's3://bucket/v1.csv']] | '99' | "Version '99' not found" } def 'should pass workspace ID as query param'() { given: - mockSession() - - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .withQueryParam('workspaceId', equalTo('12345')) - .willReturn(okJson('{"datasets": []}'))) + mockSession(workspaceId: '12345') + stubDatasets([], '12345') when: DatasetResolver.resolve('my-data', null) then: - // will throw because no datasets, but the important thing - // is the request was made with correct query param thrown(AbortOperationException) and: @@ -174,9 +109,7 @@ class DatasetResolverTest extends Specification { def 'should send bearer token in Authorization header'() { given: mockSession() - - wireMock.stubFor(get(urlPathEqualTo('/datasets')) - .willReturn(okJson('{"datasets": []}'))) + stubDatasets([]) when: DatasetResolver.resolve('my-data', null) diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetWireMockSpec.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetWireMockSpec.groovy new file mode 100644 index 0000000000..7555ddf321 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetWireMockSpec.groovy @@ -0,0 +1,76 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import groovy.json.JsonOutput +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock +import nextflow.Global +import nextflow.Session +import spock.lang.AutoCleanup +import spock.lang.Shared +import spock.lang.Specification + +import static com.github.tomakehurst.wiremock.client.WireMock.* + +/** + * Shared WireMock/session fixture for dataset resolver specs. + */ +abstract class DatasetWireMockSpec extends Specification { + + @Shared + @AutoCleanup('stop') + WireMockServer wireMock = new WireMockServer(0) + + def setupSpec() { + wireMock.start() + WireMock.configureFor('localhost', wireMock.port()) + } + + def setup() { + wireMock.resetAll() + } + + def cleanup() { + Global.session = null + } + + protected void mockSession(Map towerOverrides = [:]) { + def endpoint = "http://localhost:${wireMock.port()}" + def towerConfig = [endpoint: endpoint, accessToken: 'test-token', workspaceId: '12345'] + towerOverrides + + Global.session = Mock(Session) { + getConfig() >> [tower: towerConfig] + } + } + + protected void stubDatasets(List datasets, String workspaceId = null) { + def request = get(urlPathEqualTo('/datasets')) + if (workspaceId) + request = request.withQueryParam('workspaceId', equalTo(workspaceId)) + + wireMock.stubFor(request.willReturn(okJson(JsonOutput.toJson([datasets: datasets])))) + } + + protected void stubDatasetVersions(String datasetId, List versions, String workspaceId = null) { + def request = get(urlPathEqualTo("/datasets/${datasetId}/versions")) + if (workspaceId) + request = request.withQueryParam('workspaceId', equalTo(workspaceId)) + + wireMock.stubFor(request.willReturn(okJson(JsonOutput.toJson([versions: versions])))) + } +} From caf25e663c09bc4ab93f85dd3c19d6f7382512a1 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 18:14:52 -0600 Subject: [PATCH 7/8] test: add pending live auth regression Signed-off-by: Edmund Miller --- .../DatasetLiveAuthRegressionTest.groovy | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy new file mode 100644 index 0000000000..d02b988f18 --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.seqera.tower.plugin.dataset + +import nextflow.Global +import nextflow.Session +import spock.lang.PendingFeature +import spock.lang.Requires +import spock.lang.Specification +import spock.lang.Unroll + +/** + * Live regression for dataset file reads backed by Seqera API URLs. + * + * Requires env vars: + * - TOWER_ACCESS_TOKEN + * - TOWER_WORKSPACE_ID + * - DATASET_LIVE_NAMES (comma-separated) + */ +@Requires({ env['TOWER_ACCESS_TOKEN'] && env['TOWER_WORKSPACE_ID'] && env['DATASET_LIVE_NAMES'] }) +class DatasetLiveAuthRegressionTest extends Specification { + + def cleanup() { + Global.session = null + } + + @PendingFeature(reason = 'Dataset provider does not yet forward Tower bearer auth to resolved HTTP dataset URLs') + @Unroll + def 'should read live dataset via provider using bearer auth - #datasetName'() { + given: + Global.session = Mock(Session) { + getConfig() >> [tower: [ + endpoint: System.getenv('TOWER_ENDPOINT') ?: 'https://api.cloud.seqera.io', + accessToken: System.getenv('TOWER_ACCESS_TOKEN'), + workspaceId: System.getenv('TOWER_WORKSPACE_ID') + ]] + } + + and: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI("dataset:///${datasetName}")) + + when: + def bytes = provider.newInputStream(path).readNBytes(64) + + then: + bytes.length > 0 + + where: + datasetName << datasetNames() + } + + private static List datasetNames() { + System.getenv('DATASET_LIVE_NAMES') + .split(',') + .collect { it.trim() } + .findAll { it } + } +} From 475c1c1d4547009b215e8ea9290f70892ce69c29 Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Wed, 25 Feb 2026 18:45:01 -0600 Subject: [PATCH 8/8] fix: forward tower auth for dataset http reads Signed-off-by: Edmund Miller --- .../dataset/DatasetFileSystemProvider.java | 138 +++++++++++++++++- .../plugin/dataset/DatasetResolver.groovy | 8 + .../dataset/DatasetIntegrationTest.groovy | 64 ++++++++ .../DatasetLiveAuthRegressionTest.groovy | 2 - 4 files changed, 204 insertions(+), 8 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java index 0d1e6efc31..786a9c605d 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetFileSystemProvider.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URLConnection; import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessMode; import java.nio.file.CopyOption; @@ -36,6 +37,10 @@ import java.nio.file.spi.FileSystemProvider; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; + +import nextflow.file.http.XAuthProvider; +import nextflow.file.http.XAuthRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +60,7 @@ public class DatasetFileSystemProvider extends FileSystemProvider { private static final Logger log = LoggerFactory.getLogger(DatasetFileSystemProvider.class); + private static final Pattern PLATFORM_DATASET_PATH = Pattern.compile(".*?/workspaces/[^/]+/datasets/[^/]+/v/[^/]+/n/.+"); private volatile DatasetFileSystem fileSystem; @@ -99,13 +105,13 @@ public Path getPath(URI uri) { public InputStream newInputStream(Path path, OpenOption... options) throws IOException { final Path resolved = resolvedPath(path); log.debug("newInputStream for dataset path {} -> {}", path, resolved); - return resolved.getFileSystem().provider().newInputStream(resolved, options); + return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().newInputStream(resolved, options)); } @Override public SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { final Path resolved = resolvedPath(path); - return resolved.getFileSystem().provider().newByteChannel(resolved, options, attrs); + return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().newByteChannel(resolved, options, attrs)); } @Override @@ -117,20 +123,20 @@ public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter @SuppressWarnings("unchecked") public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { final Path resolved = resolvedPath(path); - return resolved.getFileSystem().provider().readAttributes(resolved, type, options); + return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().readAttributes(resolved, type, options)); } @Override public Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { final Path resolved = resolvedPath(path); - return resolved.getFileSystem().provider().readAttributes(resolved, attributes, options); + return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().readAttributes(resolved, attributes, options)); } @Override public V getFileAttributeView(Path path, Class type, LinkOption... options) { try { final Path resolved = resolvedPath(path); - return resolved.getFileSystem().provider().getFileAttributeView(resolved, type, options); + return withPlatformDatasetAuth(resolved, () -> resolved.getFileSystem().provider().getFileAttributeView(resolved, type, options)); } catch (IOException e) { throw new RuntimeException("Failed to resolve dataset path: " + path, e); @@ -145,7 +151,10 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException { } } final Path resolved = resolvedPath(path); - resolved.getFileSystem().provider().checkAccess(resolved, modes); + withPlatformDatasetAuth(resolved, () -> { + resolved.getFileSystem().provider().checkAccess(resolved, modes); + return null; + }); } // -- write operations: read-only in phase 1 -- @@ -206,6 +215,86 @@ private DatasetFileSystem getOrCreateFileSystem() { return fileSystem; } + private T withPlatformDatasetAuth(Path resolved, IoCallable action) throws IOException { + final XAuthProvider authProvider = authProviderFor(resolved); + if (authProvider == null) { + return action.call(); + } + + final XAuthRegistry registry = XAuthRegistry.getInstance(); + registry.register(authProvider); + try { + return action.call(); + } + finally { + registry.unregister(authProvider); + } + } + + private XAuthProvider authProviderFor(Path resolved) { + final URI targetUri = resolved.toUri(); + if (targetUri == null || !isHttpScheme(targetUri.getScheme())) { + return null; + } + if (!isPlatformDatasetDownloadPath(targetUri.getPath())) { + return null; + } + + final String endpoint = DatasetResolver.towerEndpoint(); + final String accessToken = DatasetResolver.towerAccessToken(); + if (isBlank(endpoint) || isBlank(accessToken)) { + return null; + } + + final URI endpointUri = URI.create(endpoint); + if (!isHttpScheme(endpointUri.getScheme()) || !isSameOrigin(endpointUri, targetUri)) { + return null; + } + + return new DatasetBearerAuthProvider(endpointUri, accessToken); + } + + private boolean isHttpScheme(String scheme) { + return "http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme); + } + + private boolean isPlatformDatasetDownloadPath(String path) { + return path != null && PLATFORM_DATASET_PATH.matcher(path).matches(); + } + + private static boolean isSameOrigin(URI left, URI right) { + if (left.getScheme() == null || right.getScheme() == null) { + return false; + } + if (!left.getScheme().equalsIgnoreCase(right.getScheme())) { + return false; + } + if (left.getHost() == null || right.getHost() == null) { + return false; + } + if (!left.getHost().equalsIgnoreCase(right.getHost())) { + return false; + } + return defaultPort(left) == defaultPort(right); + } + + private static int defaultPort(URI uri) { + if (uri.getPort() != -1) { + return uri.getPort(); + } + if ("https".equalsIgnoreCase(uri.getScheme())) { + return 443; + } + if ("http".equalsIgnoreCase(uri.getScheme())) { + return 80; + } + return -1; + } + + private boolean isBlank(String value) { + return value == null || value.trim().isEmpty(); + } + /** * Resolve a dataset path to its backing cloud storage path. * Caches the resolved path on the DatasetPath instance. @@ -216,4 +305,41 @@ private Path resolvedPath(Path path) throws IOException { } return ((DatasetPath) path).getResolvedPath(); } + + @FunctionalInterface + private interface IoCallable { + T call() throws IOException; + } + + private static final class DatasetBearerAuthProvider implements XAuthProvider { + private final URI endpoint; + private final String accessToken; + + private DatasetBearerAuthProvider(URI endpoint, String accessToken) { + this.endpoint = endpoint; + this.accessToken = accessToken; + } + + @Override + public boolean authorize(URLConnection connection) { + final URI target = URI.create(connection.getURL().toString()); + if (!isSameOrigin(endpoint, target)) { + return false; + } + if (!PLATFORM_DATASET_PATH.matcher(target.getPath()).matches()) { + return false; + } + if (connection.getRequestProperty("Authorization") != null) { + return false; + } + + connection.setRequestProperty("Authorization", "Bearer " + accessToken); + return true; + } + + @Override + public boolean refreshToken(URLConnection connection) { + return false; + } + } } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy index d90a38d979..f77b8c9d9a 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/dataset/DatasetResolver.groovy @@ -44,6 +44,14 @@ import nextflow.platform.PlatformHelper @CompileStatic class DatasetResolver { + static String towerEndpoint() { + return getEndpoint() + } + + static String towerAccessToken() { + return getAccessToken() + } + /** * Resolve a dataset name (and optional version) to the backing cloud storage Path. * diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy index 7d1026a12e..8c0434cbfc 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetIntegrationTest.groovy @@ -20,6 +20,7 @@ import java.nio.file.Files import java.nio.file.LinkOption import java.nio.file.attribute.BasicFileAttributes +import com.github.tomakehurst.wiremock.WireMockServer import spock.lang.TempDir import spock.lang.Unroll @@ -114,6 +115,69 @@ class DatasetIntegrationTest extends DatasetWireMockSpec { 'readAttributes' | 'hello' | { DatasetFileSystemProvider providerRef, dsPath -> providerRef.readAttributes(dsPath, BasicFileAttributes, new LinkOption[0]) } | { attrs -> attrs.size() == 5 && !attrs.isDirectory() && attrs.isRegularFile() } } + def 'should forward bearer auth when reading platform dataset download URLs'() { + given: + mockSession(workspaceId: '100') + stubDatasets([[id: '7', name: 'secure-ds']], '100') + + and: + def downloadPath = '/workspaces/100/datasets/7/v/1/n/data.csv' + def downloadUrl = "http://localhost:${wireMock.port()}${downloadPath}" + stubDatasetVersions('7', [[version: 1, url: downloadUrl, fileName: 'data.csv']], '100') + wireMock.stubFor(get(urlPathEqualTo(downloadPath)) + .withHeader('Authorization', equalTo('Bearer test-token')) + .willReturn(ok('secure,data\na,b\n'))) + + and: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://secure-ds')) + + when: + def content = provider.newInputStream(path).text + + then: + content == 'secure,data\na,b\n' + + and: + wireMock.verify(1, getRequestedFor(urlPathEqualTo(downloadPath)) + .withHeader('Authorization', equalTo('Bearer test-token'))) + } + + def 'should not forward bearer auth to non-platform hosts'() { + given: + def externalHost = new WireMockServer(0) + externalHost.start() + + and: + mockSession(workspaceId: '100') + stubDatasets([[id: '7', name: 'external-ds']], '100') + + and: + def downloadPath = '/workspaces/100/datasets/7/v/1/n/data.csv' + def downloadUrl = "http://localhost:${externalHost.port()}${downloadPath}" + stubDatasetVersions('7', [[version: 1, url: downloadUrl, fileName: 'data.csv']], '100') + externalHost.stubFor(get(urlPathEqualTo(downloadPath)) + .withHeader('Authorization', matching('.+')) + .atPriority(1) + .willReturn(unauthorized())) + externalHost.stubFor(get(urlPathEqualTo(downloadPath)) + .atPriority(10) + .willReturn(ok('public,data\nx,y\n'))) + + and: + def provider = new DatasetFileSystemProvider() + def path = provider.getPath(new URI('dataset://external-ds')) + + when: + def content = provider.newInputStream(path).text + + then: + content == 'public,data\nx,y\n' + + cleanup: + externalHost.stop() + } + def 'should cache resolved path across multiple reads'() { given: def dataFile = makeFile('data.csv', 'cached') diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy index d02b988f18..b2fef81d4d 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/dataset/DatasetLiveAuthRegressionTest.groovy @@ -18,7 +18,6 @@ package io.seqera.tower.plugin.dataset import nextflow.Global import nextflow.Session -import spock.lang.PendingFeature import spock.lang.Requires import spock.lang.Specification import spock.lang.Unroll @@ -38,7 +37,6 @@ class DatasetLiveAuthRegressionTest extends Specification { Global.session = null } - @PendingFeature(reason = 'Dataset provider does not yet forward Tower bearer auth to resolved HTTP dataset URLs') @Unroll def 'should read live dataset via provider using bearer auth - #datasetName'() { given: