Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/nf-commons/src/main/nextflow/file/FileHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class FileHelper {
return asPath(toPathURI(str))
}

static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure']
static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure', seqera:'nf-tower']

static final private Map<String,Boolean> SCHEME_CHECKED = new HashMap<>()

Expand All @@ -373,6 +373,7 @@ class FileHelper {
// find out the default plugin for the given scheme and try to load it
final pluginId = PLUGINS_MAP.get(scheme)
if( pluginId ) try {
log.debug "Detected required plugin '$pluginId'"
if( Plugins.startIfMissing(pluginId) ) {
log.debug "Started plugin '$pluginId' required to handle file: $str"
// return true to signal a new plugin was loaded
Expand Down
3 changes: 2 additions & 1 deletion plugins/nf-tower/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ nextflowPlugin {
'io.seqera.tower.plugin.TowerFactory',
'io.seqera.tower.plugin.TowerFusionToken',
'io.seqera.tower.plugin.auth.AuthCommandImpl',
'io.seqera.tower.plugin.launch.LaunchCommandImpl'
'io.seqera.tower.plugin.launch.LaunchCommandImpl',
'io.seqera.tower.plugin.fs.SeqeraPathFactory'
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class TowerClient implements TraceObserverV2 {
this.aggregator = new ResourcesAggregator()
this.runName = session.getRunName()
this.runId = session.getUniqueId()
this.httpClient = newHttpClient()

// send hello to verify auth
final req = makeCreateReq(session)
Expand Down Expand Up @@ -303,12 +302,12 @@ class TowerClient implements TraceObserverV2 {
reports.flowCreate(workflowId)
}

protected HxClient newHttpClient() {
protected void initHttpClient() {
final builder = HxClient.newBuilder()
// auth settings
setupClientAuth(builder, getAccessToken())
// retry settings
builder
this.httpClient = builder
.retryConfig(this.retryPolicy)
.followRedirects(HttpClient.Redirect.NORMAL)
.version(HttpClient.Version.HTTP_1_1)
Expand Down Expand Up @@ -536,14 +535,21 @@ class TowerClient implements TraceObserverV2 {
}
}

protected HttpRequest makeRequest(String url, String payload, String verb) {
assert payload, "Tower request cannot be empty"
Response sendApiRequest(String url, Map payload=null, String method='GET') {
sendHttpMessage(url, payload, method)
}

protected HttpRequest makeRequest(String url, String payload, String verb) {
final builder = HttpRequest.newBuilder(URI.create(url))
.header('Content-Type', 'application/json; charset=utf-8')
.header('User-Agent', "Nextflow/$BuildInfo.version")
.header('Traceparent', TraceUtils.rndTrace())

if( verb == 'GET' )
return builder.GET().build()

assert payload, "Tower request cannot be empty"
builder.header('Content-Type', 'application/json; charset=utf-8')

if( verb == 'PUT' )
return builder.PUT(HttpRequest.BodyPublishers.ofString(payload)).build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TowerFactory implements TraceObserverFactoryV2 {
tower.aliveInterval = aliveInterval
if( requestInterval )
tower.requestInterval = requestInterval

tower.initHttpClient()
// register auth provider
// note: this is needed to authorize access to resources via XFileSystemProvider used by NF
// it's not needed by the tower client logic
Expand All @@ -87,7 +87,7 @@ class TowerFactory implements TraceObserverFactoryV2 {
static TowerClient client(Session session, Map<String,String> env) {
final opts = session.config.tower as Map ?: Collections.emptyMap()
final config = new TowerConfig(opts, env)
Boolean isEnabled = config.enabled || env.get('TOWER_WORKFLOW_ID') || session.config.navigate('fusion.enabled') as Boolean
Boolean isEnabled = config.enabled || config.accessToken || env.get('TOWER_WORKFLOW_ID') || session.config.navigate('fusion.enabled') as Boolean
return isEnabled
? createTowerClient0(session, config, env)
: null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.seqera.tower.plugin

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.tower.plugin.fs.SeqeraFileSystemProvider
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import nextflow.cli.PluginExecAware
import org.pf4j.PluginWrapper
Expand All @@ -37,4 +39,10 @@ class TowerPlugin extends BasePlugin implements PluginExecAware {
this.delegate = new CacheCommand()
}

@Override
void start() {
super.start()
FileHelper.getOrInstallProvider(SeqeraFileSystemProvider)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.seqera.tower.plugin.dataset

import groovy.transform.CompileStatic
import groovy.transform.ToString

/**
* DTO representing a dataset entry returned by GET /datasets
*
* @author Seqera Labs
*/
@CompileStatic
@ToString(includeNames = true)
class DatasetDto {
String id
String name
String description
long version
String mediaType
long workspaceId
String dateCreated
String lastUpdated
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.seqera.tower.plugin.dataset

import groovy.transform.CompileStatic
import groovy.transform.ToString

/**
* DTO representing a dataset version entry returned by GET /datasets/{id}/versions
*
* @author Seqera Labs
*/
@CompileStatic
@ToString(includeNames = true)
class DatasetVersionDto {
String datasetId
long version
String fileName
String mediaType
boolean hasHeader
String dateCreated
boolean disabled
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.seqera.tower.plugin.dataset

import java.nio.file.AccessDeniedException
import java.nio.file.NoSuchFileException

import groovy.json.JsonSlurper
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.tower.plugin.TowerClient
import nextflow.exception.AbortOperationException

/**
* Typed client for Seqera Platform dataset API endpoints.
* Delegates HTTP execution to {@link TowerClient#sendApiRequest}, inheriting its
* authentication token management and retry policy without exposing the underlying
* HTTP client.
*
* @author Seqera Labs
*/
@Slf4j
@CompileStatic
class SeqeraDatasetClient {

private final TowerClient towerClient

SeqeraDatasetClient(TowerClient towerClient) {
this.towerClient = towerClient
}

private String getEndpoint() { towerClient.endpoint }

/**
* @return current user info (id, userName, etc.) from GET /user-info
*/
Map<String, Object> getUserInfo() {
final url = "${endpoint}/user-info"
log.debug "SeqeraDatasetClient GET $url"
final resp = towerClient.sendApiRequest(url)
checkResponse(resp, url)
final json = new JsonSlurper().parseText(resp.message) as Map
return json.user as Map<String, Object>
}

/**
* @return all orgs and workspaces accessible to the given user from GET /user/{userId}/workspaces
*/
List<WorkspaceOrgDto> listUserWorkspacesAndOrgs(long userId) {
final url = "${endpoint}/user/${userId}/workspaces"
log.debug "SeqeraDatasetClient GET $url"
final resp = towerClient.sendApiRequest(url)
checkResponse(resp, url)
final json = new JsonSlurper().parseText(resp.message) as Map
final list = json.orgsAndWorkspaces as List<Map>
return list.collect { m -> mapWorkspaceOrg(m) }
}

/**
* @return all datasets in the given workspace from GET /datasets?workspaceId={workspaceId}
*/
List<DatasetDto> listDatasets(long workspaceId) {
final url = "${endpoint}/datasets?workspaceId=${workspaceId}"
log.debug "SeqeraDatasetClient GET $url"
final resp = towerClient.sendApiRequest(url)
checkResponse(resp, url)
final json = new JsonSlurper().parseText(resp.message) as Map
final list = json.datasets as List<Map>
return list ? list.collect { m -> mapDataset(m) } : Collections.<DatasetDto>emptyList()
}

/**
* Create a new dataset in the given workspace via POST /datasets?workspaceId={workspaceId}.
* @return the created dataset DTO
*/
DatasetDto createDataset(long workspaceId, String name) {
final url = "${endpoint}/datasets?workspaceId=${workspaceId}"
log.debug "SeqeraDatasetClient POST $url name=$name"
final resp = towerClient.sendApiRequest(url, [name: name], 'POST')
checkResponse(resp, url)
final json = new JsonSlurper().parseText(resp.message) as Map
return mapDataset(json.dataset as Map)
}

/**
* @return all versions for the given dataset from GET /datasets/{datasetId}/versions
*/
List<DatasetVersionDto> listVersions(String datasetId, long workspaceId) {
final url = "${endpoint}/datasets/${datasetId}/versions?workspaceId=${workspaceId}"
log.debug "SeqeraDatasetClient GET $url"
final resp = towerClient.sendApiRequest(url)
checkResponse(resp, url)
final json = new JsonSlurper().parseText(resp.message) as Map
final list = json.versions as List<Map>
return list ? list.collect { m -> mapVersion(m) } : Collections.<DatasetVersionDto>emptyList()
}

/**
* Download a dataset version as an InputStream.
* GET /datasets/{datasetId}/v/{version}/n/{fileName}
* The fileName must exactly match DatasetVersionDto.fileName from upload time.
*/
InputStream downloadDataset(String datasetId, String version, String fileName, long workspaceId) {
final url = "${endpoint}/datasets/${datasetId}/v/${version}/n/${URLEncoder.encode(fileName, 'UTF-8')}?workspaceId=$workspaceId"
log.debug "SeqeraDatasetClient GET $url"
final resp = towerClient.sendApiRequest(url)
checkResponse(resp, url)
return new ByteArrayInputStream(resp.message.getBytes('UTF-8'))
}

/**
* Upload content as a new dataset version via multipart POST.
* POST /datasets/{datasetId}/upload?header={hasHeader}
* Requires direct HTTP access — to be implemented in US4.
*/
DatasetVersionDto uploadDataset(String datasetId, byte[] content, String fileName, boolean hasHeader) {
throw new UnsupportedOperationException("uploadDataset not yet implemented — requires US4 multipart support")
}

// ---- private helpers ----

private static void checkResponse(TowerClient.Response resp, String url) {
if (!resp.error) return
final code = resp.code
if (code == 401)
throw new AbortOperationException("Seqera authentication failed — check tower.accessToken or TOWER_ACCESS_TOKEN")
if (code == 403)
throw new AccessDeniedException(url, null, "Forbidden — check workspace permissions")
if (code == 404)
throw new NoSuchFileException(url)
throw new IOException("Seqera API error: HTTP ${code} for ${url}")
}

private static WorkspaceOrgDto mapWorkspaceOrg(Map m) {
final dto = new WorkspaceOrgDto()
dto.orgId = (m.orgId as Long) ?: 0L
dto.orgName = m.orgName as String
dto.workspaceId = (m.workspaceId as Long) ?: 0L
dto.workspaceName = m.workspaceName as String
dto.workspaceFullName = m.workspaceFullName as String
return dto
}

private static DatasetDto mapDataset(Map m) {
final dto = new DatasetDto()
dto.id = m.id as String
dto.name = m.name as String
dto.description = m.description as String
dto.version = (m.version as Long) ?: 0L
dto.mediaType = m.mediaType as String
dto.workspaceId = (m.workspaceId as Long) ?: 0L
dto.dateCreated = m.dateCreated as String
dto.lastUpdated = m.lastUpdated as String
return dto
}

private static DatasetVersionDto mapVersion(Map m) {
final dto = new DatasetVersionDto()
dto.datasetId = m.datasetId as String
dto.version = (m.version as Long) ?: 0L
dto.fileName = m.fileName as String
dto.mediaType = m.mediaType as String
dto.hasHeader = (m.hasHeader as Boolean) ?: false
dto.dateCreated = m.dateCreated as String
dto.disabled = (m.disabled as Boolean) ?: false
return dto
}
}
Loading
Loading