Skip to content

Commit ead2814

Browse files
committed
Make credential cache TTLs configurable via IcebergConfig
- Add iceberg.credential-cache-ttl, iceberg.coordinator-credential-prefetch, and iceberg.worker-credential-cache-ttl config properties with defaults (25min, 2min, 5min respectively) - Remove static TTL constants from IcebergUtil - Propagate configurable TTLs through IcebergMetadataFactory, IcebergMetadata, IcebergPageSourceProviderFactory, IcebergPageSinkProvider - Use io.airlift.units.Duration throughout plumbing, convert to millis at leaf sites - Broaden vended credential detection in IcebergTableCredentials.forFileIO to cover S3, GCS, and Azure ADLS (not just S3) - Handle TableNotFoundException in getOrLoadTableCredentials for CTAS queries where the table does not yet exist - Fix checkstyle: import ordering and remove unused TableChangesFunctionHandle import - Update all test files to use configurable TTL values
1 parent 8afc7bf commit ead2814

24 files changed

Lines changed: 219 additions & 151 deletions

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static java.util.Locale.ENGLISH;
4646
import static java.util.concurrent.TimeUnit.DAYS;
4747
import static java.util.concurrent.TimeUnit.HOURS;
48+
import static java.util.concurrent.TimeUnit.MINUTES;
4849
import static java.util.concurrent.TimeUnit.SECONDS;
4950

5051
@DefunctConfig({
@@ -104,6 +105,8 @@ public class IcebergConfig
104105
private int metadataParallelism = 8;
105106
private boolean bucketExecutionEnabled = true;
106107
private boolean fileBasedConflictDetectionEnabled = true;
108+
private Duration credentialCacheTtl = new Duration(25, MINUTES);
109+
private Duration workerCredentialCacheTtl = new Duration(5, MINUTES);
107110

108111
public CatalogType getCatalogType()
109112
{
@@ -695,4 +698,32 @@ public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConfl
695698
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
696699
return this;
697700
}
701+
702+
@NotNull
703+
public Duration getCredentialCacheTtl()
704+
{
705+
return credentialCacheTtl;
706+
}
707+
708+
@Config("iceberg.credential-cache-ttl")
709+
@ConfigDescription("How long vended credentials are assumed to be valid. Must be shorter than the minimum STS token lifetime.")
710+
public IcebergConfig setCredentialCacheTtl(Duration credentialCacheTtl)
711+
{
712+
this.credentialCacheTtl = credentialCacheTtl;
713+
return this;
714+
}
715+
716+
@NotNull
717+
public Duration getWorkerCredentialCacheTtl()
718+
{
719+
return workerCredentialCacheTtl;
720+
}
721+
722+
@Config("iceberg.worker-credential-cache-ttl")
723+
@ConfigDescription("How long a worker caches refreshed credentials to coalesce concurrent refresh requests")
724+
public IcebergConfig setWorkerCredentialCacheTtl(Duration workerCredentialCacheTtl)
725+
{
726+
this.workerCredentialCacheTtl = workerCredentialCacheTtl;
727+
return this;
728+
}
698729
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,20 @@
1818
import com.google.common.base.Splitter.MapSplitter;
1919
import com.google.common.base.Suppliers;
2020
import com.google.common.base.VerifyException;
21-
import com.google.common.cache.Cache;
2221
import com.google.common.collect.ImmutableList;
2322
import com.google.common.collect.ImmutableMap;
2423
import com.google.common.collect.ImmutableSet;
2524
import com.google.common.collect.Iterables;
2625
import com.google.common.collect.Lists;
2726
import com.google.common.collect.Sets;
2827
import com.google.common.collect.Streams;
29-
import com.google.common.util.concurrent.UncheckedExecutionException;
3028
import io.airlift.concurrent.MoreFutures;
3129
import io.airlift.json.JsonCodec;
3230
import io.airlift.log.Logger;
3331
import io.airlift.slice.Slice;
3432
import io.airlift.slice.Slices;
3533
import io.airlift.units.DataSize;
3634
import io.airlift.units.Duration;
37-
import io.trino.cache.EvictableCacheBuilder;
3835
import io.trino.filesystem.FileEntry;
3936
import io.trino.filesystem.FileIterator;
4037
import io.trino.filesystem.Location;
@@ -55,7 +52,6 @@
5552
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
5653
import io.trino.plugin.iceberg.delete.DeletionVectorWriter.DeletionVectorInfo;
5754
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
58-
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
5955
import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle;
6056
import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle;
6157
import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle;
@@ -268,7 +264,6 @@
268264

269265
import static com.google.common.base.Preconditions.checkArgument;
270266
import static com.google.common.base.Preconditions.checkState;
271-
import static com.google.common.base.Throwables.throwIfUnchecked;
272267
import static com.google.common.base.Verify.verify;
273268
import static com.google.common.base.Verify.verifyNotNull;
274269
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -279,7 +274,6 @@
279274
import static com.google.common.collect.Maps.transformValues;
280275
import static com.google.common.collect.Sets.difference;
281276
import static io.airlift.units.Duration.ZERO;
282-
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
283277
import static io.trino.filesystem.Locations.isS3Tables;
284278
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
285279
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
@@ -359,8 +353,6 @@
359353
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
360354
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
361355
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
362-
import static io.trino.plugin.iceberg.IcebergUtil.COORDINATOR_CREDENTIAL_PREFETCH;
363-
import static io.trino.plugin.iceberg.IcebergUtil.CREDENTIAL_CACHE_TTL;
364356
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
365357
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
366358
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
@@ -538,7 +530,8 @@ public class IcebergMetadata
538530
private final int materializedViewRefreshMaxSnapshotsToExpire;
539531
private final Duration materializedViewRefreshSnapshotRetentionPeriod;
540532
private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();
541-
private final Cache<SchemaTableName, IcebergTableCredentials> tableCredentialsCache;
533+
private final ConcurrentHashMap<SchemaTableName, IcebergTableCredentials> tableCredentialsCache = new ConcurrentHashMap<>();
534+
private final Duration credentialCacheTtl;
542535
private final DeletionVectorWriter deletionVectorWriter;
543536

544537
private Transaction transaction;
@@ -560,7 +553,8 @@ public IcebergMetadata(
560553
ExecutorService icebergPlanningExecutor,
561554
ExecutorService icebergFileDeleteExecutor,
562555
int materializedViewRefreshMaxSnapshotsToExpire,
563-
Duration materializedViewRefreshSnapshotRetentionPeriod)
556+
Duration materializedViewRefreshSnapshotRetentionPeriod,
557+
Duration credentialCacheTtl)
564558
{
565559
this.typeManager = requireNonNull(typeManager, "typeManager is null");
566560
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
@@ -578,15 +572,7 @@ public IcebergMetadata(
578572
this.deletionVectorWriter = requireNonNull(deletionVectorWriter, "deletionVectorWriter is null");
579573
this.materializedViewRefreshMaxSnapshotsToExpire = materializedViewRefreshMaxSnapshotsToExpire;
580574
this.materializedViewRefreshSnapshotRetentionPeriod = materializedViewRefreshSnapshotRetentionPeriod;
581-
// Credentials are cached with a TTL so that the coordinator will re-fetch fresh credentials
582-
// when getTableCredentials() is called for new tasks in long-running queries.
583-
// The cache evicts COORDINATOR_CREDENTIAL_PREFETCH (2 min) before the token expires,
584-
// ensuring the next getTableCredentials() call re-fetches while the old token is still valid.
585-
// EvictableCache guarantees that invalidate() is immediately visible to subsequent get() calls.
586-
this.tableCredentialsCache = EvictableCacheBuilder.newBuilder()
587-
.expireAfterWrite(CREDENTIAL_CACHE_TTL.minus(COORDINATOR_CREDENTIAL_PREFETCH))
588-
.shareNothingWhenDisabled()
589-
.build();
575+
this.credentialCacheTtl = requireNonNull(credentialCacheTtl, "credentialCacheTtl is null");
590576
}
591577

592578
@Override
@@ -604,20 +590,31 @@ public Optional<ConnectorTableCredentials> getTableCredentials(ConnectorSession
604590
private Optional<ConnectorTableCredentials> getOrLoadTableCredentials(ConnectorSession session, SchemaTableName schemaTableName)
605591
{
606592
try {
607-
return Optional.of(uncheckedCacheGet(
608-
tableCredentialsCache,
609-
schemaTableName,
610-
() -> {
611-
BaseTable baseTable = catalog.loadTable(session, schemaTableName);
612-
return IcebergTableCredentials.forFileIO(baseTable.io());
613-
}));
593+
// Check if existing cached entry is still fresh (per-entry expiry).
594+
// Each IcebergTableCredentials carries its own expiresAt, so entries with
595+
// different token lifetimes are handled correctly without a cache-wide static TTL.
596+
IcebergTableCredentials cached = tableCredentialsCache.get(schemaTableName);
597+
if (cached != null && !cached.shouldRefresh()) {
598+
return Optional.of(cached);
599+
}
600+
// Entry is missing or approaching expiry — reload from catalog
601+
IcebergTableCredentials fresh = loadTableCredentials(session, schemaTableName);
602+
tableCredentialsCache.put(schemaTableName, fresh);
603+
return Optional.of(fresh);
614604
}
615-
catch (UncheckedExecutionException e) {
616-
throwIfUnchecked(e.getCause());
617-
throw e;
605+
catch (TableNotFoundException e) {
606+
// Table may not exist yet (e.g. during CREATE TABLE AS SELECT);
607+
// no credentials to vend in that case.
608+
return Optional.empty();
618609
}
619610
}
620611

612+
private IcebergTableCredentials loadTableCredentials(ConnectorSession session, SchemaTableName schemaTableName)
613+
{
614+
BaseTable baseTable = catalog.loadTable(session, schemaTableName);
615+
return IcebergTableCredentials.forFileIO(baseTable.io(), credentialCacheTtl.toMillis());
616+
}
617+
621618
private static SchemaTableName getSchemaTableName(ConnectorTableHandle tableHandle)
622619
{
623620
if (tableHandle instanceof IcebergTableHandle handle) {
@@ -1621,10 +1618,9 @@ private List<String> getChildNamespaces(ConnectorSession session, String parentN
16211618

16221619
private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table)
16231620
{
1624-
// Invalidate so the next getOrLoadTableCredentials() call re-loads fresh credentials
1625-
// from the catalog. EvictableCache does not support put(); invalidate + lazy reload is
1626-
// equivalent and avoids races with concurrent get() calls.
1627-
tableCredentialsCache.invalidate(name);
1621+
// Remove so the next getOrLoadTableCredentials() call re-loads fresh credentials
1622+
// from the catalog for the newly created writable table handle.
1623+
tableCredentialsCache.remove(name);
16281624
SortFieldInfo sortInfo = getSupportedSortFields(table.schema(), table.sortOrder());
16291625
return new IcebergWritableTableHandle(
16301626
name,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class IcebergMetadataFactory
5252
private final DeletionVectorWriter deletionVectorWriter;
5353
private final int materializedViewRefreshMaxSnapshotsToExpire;
5454
private final Duration materializedViewRefreshSnapshotRetentionPeriod;
55+
private final Duration credentialCacheTtl;
5556

5657
@Inject
5758
public IcebergMetadataFactory(
@@ -96,6 +97,7 @@ public IcebergMetadataFactory(
9697
this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null");
9798
this.materializedViewRefreshMaxSnapshotsToExpire = config.getMaterializedViewRefreshMaxSnapshotsToExpire();
9899
this.materializedViewRefreshSnapshotRetentionPeriod = config.getMaterializedViewRefreshSnapshotRetentionPeriod();
100+
this.credentialCacheTtl = config.getCredentialCacheTtl();
99101
}
100102

101103
public IcebergMetadata create(ConnectorIdentity identity)
@@ -116,6 +118,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
116118
icebergPlanningExecutor,
117119
icebergFileDeleteExecutor,
118120
materializedViewRefreshMaxSnapshotsToExpire,
119-
materializedViewRefreshSnapshotRetentionPeriod);
121+
materializedViewRefreshSnapshotRetentionPeriod,
122+
credentialCacheTtl);
120123
}
121124
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@
5454

5555
import static com.google.common.collect.Maps.transformValues;
5656
import static io.trino.plugin.iceberg.IcebergSessionProperties.maxPartitionsPerWriter;
57-
import static io.trino.plugin.iceberg.IcebergUtil.WORKER_CREDENTIAL_CACHE_TTL;
5857
import static io.trino.plugin.iceberg.IcebergUtil.getFileIoProperties;
5958
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
6059
import static io.trino.plugin.iceberg.IcebergUtil.maybeRefreshVendedCredentials;
6160
import static java.util.Objects.requireNonNull;
61+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
6262

6363
public class IcebergPageSinkProvider
6464
implements ConnectorPageSinkProvider
@@ -73,12 +73,8 @@ public class IcebergPageSinkProvider
7373
private final TypeManager typeManager;
7474
private final PageSorter pageSorter;
7575
private final TrinoCatalogFactory catalogFactory;
76-
private final Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache =
77-
EvictableCacheBuilder.newBuilder()
78-
.maximumSize(1_000)
79-
.expireAfterWrite(WORKER_CREDENTIAL_CACHE_TTL)
80-
.shareNothingWhenDisabled()
81-
.build();
76+
private final long credentialCacheTtlMillis;
77+
private final Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache;
8278

8379
@Inject
8480
public IcebergPageSinkProvider(
@@ -102,6 +98,12 @@ public IcebergPageSinkProvider(
10298
this.typeManager = requireNonNull(typeManager, "typeManager is null");
10399
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
104100
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
101+
this.credentialCacheTtlMillis = icebergConfig.getCredentialCacheTtl().toMillis();
102+
this.refreshedCredentialCache = EvictableCacheBuilder.newBuilder()
103+
.maximumSize(1_000)
104+
.expireAfterWrite(icebergConfig.getWorkerCredentialCacheTtl().toMillis(), MILLISECONDS)
105+
.shareNothingWhenDisabled()
106+
.build();
105107
}
106108

107109
@Override
@@ -246,7 +248,7 @@ private Optional<ConnectorTableCredentials> maybeRefreshCredentials(
246248
() -> {
247249
TrinoCatalog catalog = catalogFactory.create(session.getIdentity());
248250
BaseTable freshTable = catalog.loadTable(session, schemaTableName);
249-
return IcebergTableCredentials.forFileIO(freshTable.io());
251+
return IcebergTableCredentials.forFileIO(freshTable.io(), credentialCacheTtlMillis);
250252
});
251253
}
252254
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public class IcebergPageSourceProvider
238238
// Owned by the factory so it is shared across all providers created on this worker,
239239
// preventing many concurrent tasks from all calling the REST catalog simultaneously.
240240
private final Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache;
241+
private final long credentialCacheTtlMillis;
241242
private final DeleteManager unpartitionedTableDeleteManager;
242243
private final Map<Integer, Function<PartitionData, PartitionKey>> partitionKeyFactories = new ConcurrentHashMap<>();
243244
private final Map<PartitionKey, DeleteManager> partitionedDeleteManagers = new ConcurrentHashMap<>();
@@ -250,7 +251,8 @@ public IcebergPageSourceProvider(
250251
ParquetReaderOptions parquetReaderOptions,
251252
TypeManager typeManager,
252253
TrinoCatalogFactory catalogFactory,
253-
Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache)
254+
Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache,
255+
long credentialCacheTtlMillis)
254256
{
255257
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
256258
this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null");
@@ -260,6 +262,7 @@ public IcebergPageSourceProvider(
260262
this.typeManager = requireNonNull(typeManager, "typeManager is null");
261263
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
262264
this.refreshedCredentialCache = requireNonNull(refreshedCredentialCache, "refreshedCredentialCache is null");
265+
this.credentialCacheTtlMillis = credentialCacheTtlMillis;
263266
this.unpartitionedTableDeleteManager = new DeleteManager(typeManager);
264267
}
265268

@@ -332,7 +335,7 @@ private Optional<ConnectorTableCredentials> maybeRefreshCredentials(
332335
() -> {
333336
TrinoCatalog catalog = catalogFactory.create(session.getIdentity());
334337
BaseTable freshTable = catalog.loadTable(session, schemaTableName);
335-
return IcebergTableCredentials.forFileIO(freshTable.io());
338+
return IcebergTableCredentials.forFileIO(freshTable.io(), credentialCacheTtlMillis);
336339
});
337340
}
338341

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProviderFactory.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
2727
import io.trino.spi.type.TypeManager;
2828

29-
import static io.trino.plugin.iceberg.IcebergUtil.WORKER_CREDENTIAL_CACHE_TTL;
3029
import static java.util.Objects.requireNonNull;
30+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3131

3232
public class IcebergPageSourceProviderFactory
3333
implements ConnectorPageSourceProviderFactory
@@ -39,15 +39,11 @@ public class IcebergPageSourceProviderFactory
3939
private final ParquetReaderOptions parquetReaderOptions;
4040
private final TypeManager typeManager;
4141
private final TrinoCatalogFactory catalogFactory;
42+
private final long credentialCacheTtlMillis;
4243
// Shared across all providers created by this factory so that concurrent tasks on the
4344
// same worker do not all call the REST catalog when credentials need refreshing.
4445
// Size-bounded to cap memory even when many tables are being scanned simultaneously.
45-
private final Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache =
46-
EvictableCacheBuilder.newBuilder()
47-
.maximumSize(10_000)
48-
.expireAfterWrite(WORKER_CREDENTIAL_CACHE_TTL)
49-
.shareNothingWhenDisabled()
50-
.build();
46+
private final Cache<CredentialCacheKey, IcebergTableCredentials> refreshedCredentialCache;
5147

5248
@Inject
5349
public IcebergPageSourceProviderFactory(
@@ -57,7 +53,8 @@ public IcebergPageSourceProviderFactory(
5753
OrcReaderConfig orcReaderConfig,
5854
ParquetReaderConfig parquetReaderConfig,
5955
TypeManager typeManager,
60-
TrinoCatalogFactory catalogFactory)
56+
TrinoCatalogFactory catalogFactory,
57+
IcebergConfig config)
6158
{
6259
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
6360
this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null");
@@ -66,11 +63,17 @@ public IcebergPageSourceProviderFactory(
6663
this.parquetReaderOptions = parquetReaderConfig.toParquetReaderOptions();
6764
this.typeManager = requireNonNull(typeManager, "typeManager is null");
6865
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
66+
this.credentialCacheTtlMillis = config.getCredentialCacheTtl().toMillis();
67+
this.refreshedCredentialCache = EvictableCacheBuilder.newBuilder()
68+
.maximumSize(10_000)
69+
.expireAfterWrite(config.getWorkerCredentialCacheTtl().toMillis(), MILLISECONDS)
70+
.shareNothingWhenDisabled()
71+
.build();
6972
}
7073

7174
@Override
7275
public IcebergPageSourceProvider createPageSourceProvider()
7376
{
74-
return new IcebergPageSourceProvider(fileSystemFactory, fileIoFactory, fileFormatDataSourceStats, orcReaderOptions, parquetReaderOptions, typeManager, catalogFactory, refreshedCredentialCache);
77+
return new IcebergPageSourceProvider(fileSystemFactory, fileIoFactory, fileFormatDataSourceStats, orcReaderOptions, parquetReaderOptions, typeManager, catalogFactory, refreshedCredentialCache, credentialCacheTtlMillis);
7578
}
7679
}

0 commit comments

Comments
 (0)