diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index cb766f92412c..63fed1e75159 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -662,6 +662,25 @@ EXECUTE `. ```{include} optimize.fragment ``` +Use a `WHERE` clause with [metadata columns](delta-lake-special-columns) to filter +which files are optimized. + +```sql +ALTER TABLE test_table EXECUTE optimize +WHERE "$file_modified_time" > date_trunc('day', CURRENT_TIMESTAMP); +``` + +```sql +ALTER TABLE test_table EXECUTE optimize +WHERE "$path" <> 'skipping-file-path' +``` + +```sql +-- optimze files smaller than 1MB +ALTER TABLE test_table EXECUTE optimize +WHERE "$file_size" <= 1024 * 1024 +``` + (delta-lake-alter-table-rename-to)= #### ALTER TABLE RENAME TO diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index 3e7996325ef6..573426de65e9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -60,6 +60,8 @@ public record DeltaLakeColumnHandle( field("position", BIGINT), field("partition", VARCHAR)); + // Note: update `isMetadataColumnHandle` method when adding new metadata columns + public static final String PATH_COLUMN_NAME = "$path"; public static final Type PATH_TYPE = VARCHAR; @@ -161,4 +163,9 @@ public static DeltaLakeColumnHandle mergeRowIdColumnHandle() { return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED, Optional.empty()); } + + public static boolean isMetadataColumnHandle(DeltaLakeColumnHandle columnHandle) + { + return columnHandle.equals(fileModifiedTimeColumnHandle()) || columnHandle.equals(pathColumnHandle()) || columnHandle.equals(fileSizeColumnHandle()); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 0b4c83da9e68..c993bb46fad4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -219,6 +219,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.PATH_COLUMN_NAME; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileModifiedTimeColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileSizeColumnHandle; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.isMetadataColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.mergeRowIdColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; @@ -236,7 +237,7 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.DeltaLakeSplitManager.buildSplitPath; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.COLUMN_MAPPING_MODE_PROPERTY; @@ -293,6 +294,13 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileModifiedTimeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileSizeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileModifiedTimeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileSizeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getPathDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.pathMatchesPredicate; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; @@ -3332,11 +3340,13 @@ public Optional> applyFilter(C TupleDomain newEnforcedConstraint; TupleDomain newUnenforcedConstraint; + TupleDomain newRemainingConstraint; Set newConstraintColumns; if (predicate.isNone()) { // Engine does not pass none Constraint.summary. It can become none when combined with the expression and connector's domain knowledge. newEnforcedConstraint = TupleDomain.none(); newUnenforcedConstraint = TupleDomain.all(); + newRemainingConstraint = TupleDomain.all(); newConstraintColumns = constraint.getPredicateColumns().stream() .flatMap(Collection::stream) .map(DeltaLakeColumnHandle.class::cast) @@ -3348,6 +3358,7 @@ public Optional> applyFilter(C ImmutableMap.Builder enforceableDomains = ImmutableMap.builder(); ImmutableMap.Builder unenforceableDomains = ImmutableMap.builder(); + ImmutableMap.Builder remainingDomains = ImmutableMap.builder(); ImmutableSet.Builder constraintColumns = ImmutableSet.builder(); // We need additional field to track partition columns used in queries as enforceDomains seem to be not catching // cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0' @@ -3357,9 +3368,14 @@ public Optional> applyFilter(C .forEach(constraintColumns::add); for (Entry domainEntry : constraintDomains.entrySet()) { DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey(); - if (!partitionColumns.contains(column)) { + if (isMetadataColumnHandle(column)) { unenforceableDomains.put(column, domainEntry.getValue()); } + else if (!partitionColumns.contains(column)) { + unenforceableDomains.put(column, domainEntry.getValue()); + // the column can not be pusheddown + remainingDomains.put(column, domainEntry.getValue()); + } else { enforceableDomains.put(column, domainEntry.getValue()); } @@ -3368,6 +3384,7 @@ public Optional> applyFilter(C newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow()); newUnenforcedConstraint = TupleDomain.withColumnDomains(unenforceableDomains.buildOrThrow()); + newRemainingConstraint = TupleDomain.withColumnDomains(remainingDomains.buildOrThrow()); newConstraintColumns = constraintColumns.build(); } @@ -3405,7 +3422,7 @@ public Optional> applyFilter(C return Optional.of(new ConstraintApplicationResult<>( newHandle, - newUnenforcedConstraint.transformKeys(ColumnHandle.class::cast), + newRemainingConstraint.transformKeys(ColumnHandle.class::cast), extractionResult.remainingExpression(), false)); } @@ -3831,7 +3848,7 @@ private void generateMissingFileStatistics(ConnectorSession session, DeltaLakeTa || addFileEntry.getStats().get().getNullCount().isEmpty()) .filter(addFileEntry -> !URI.create(addFileEntry.getPath()).isAbsolute()) // TODO: Support absolute paths https://github.com/trinodb/trino/issues/18277 // Statistics returns whole path to file build in DeltaLakeSplitManager, so we need to create corresponding map key for AddFileEntry. - .collect(toImmutableMap(addFileEntry -> DeltaLakeSplitManager.buildSplitPath(Location.of(tableHandle.getLocation()), addFileEntry).toString(), identity())); + .collect(toImmutableMap(addFileEntry -> buildSplitPath(Location.of(tableHandle.getLocation()), addFileEntry).toString(), identity())); } if (addFileEntriesWithNoStats.isEmpty()) { return; @@ -4202,12 +4219,30 @@ private CommitDeleteOperationResult commitDeleteOperation( long commitVersion = currentVersion + 1; transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), false)); + Domain pathDomain = getPathDomain(tableHandle.getNonPartitionConstraint()); + Domain fileModifiedDomain = getFileModifiedTimeDomain(tableHandle.getNonPartitionConstraint()); + Domain fileSizeDomain = getFileSizeDomain(tableHandle.getNonPartitionConstraint()); + long deletedRecords = 0L; boolean allDeletedFilesStatsPresent = true; try (Stream activeFiles = getAddFileEntriesMatchingEnforcedPartitionConstraint(session, tableHandle)) { Iterator addFileEntryIterator = activeFiles.iterator(); while (addFileEntryIterator.hasNext()) { AddFileEntry addFileEntry = addFileEntryIterator.next(); + + String splitPath = buildSplitPath(Location.of(tableHandle.getLocation()), addFileEntry).toString(); + if (!pathMatchesPredicate(pathDomain, splitPath)) { + continue; + } + + if (!fileModifiedTimeMatchesPredicate(fileModifiedDomain, addFileEntry.getModificationTime())) { + continue; + } + + if (!fileSizeMatchesPredicate(fileSizeDomain, addFileEntry.getSize())) { + continue; + } + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty())); Optional fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 180f457e4ae9..da47143f846f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -100,11 +100,11 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetIgnoreStatistics; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetVectorizedDecodingEnabled; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 2fb372064cbf..464559058e51 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -58,14 +58,19 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH; -import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileModifiedTimeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileSizeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileModifiedTimeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileSizeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getPathDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.pathMatchesPredicate; import static io.trino.spi.connector.FixedSplitSource.emptySplitSource; import static java.lang.Math.clamp; import static java.util.Objects.requireNonNull; @@ -163,6 +168,8 @@ private Stream getSplits( TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); TupleDomain nonPartitionConstraint = tableHandle.getNonPartitionConstraint(); Domain pathDomain = getPathDomain(nonPartitionConstraint); + Domain fileModifiedDomain = getFileModifiedTimeDomain(nonPartitionConstraint); + Domain fileSizeDomain = getFileSizeDomain(nonPartitionConstraint); boolean splittable = // Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource @@ -207,10 +214,18 @@ private Stream getSplits( return Stream.empty(); } + if (!fileModifiedTimeMatchesPredicate(fileModifiedDomain, addAction.getModificationTime())) { + return Stream.empty(); + } + if (addAction.getDeletionVector().isEmpty() && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() > maxScannedFileSizeInBytes.get()) { return Stream.empty(); } + if (!fileSizeMatchesPredicate(fileSizeDomain, addAction.getSize())) { + return Stream.empty(); + } + Map enforcedDomains = enforcedPartitionConstraint.getDomains().orElseThrow(); if (!partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), enforcedDomains)) { return Stream.empty(); @@ -281,30 +296,6 @@ private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandl .anyMatch(DeltaLakeColumnType.REGULAR::equals); } - public static boolean partitionMatchesPredicate(Map> partitionKeys, Map domains) - { - for (Map.Entry enforcedDomainsEntry : domains.entrySet()) { - DeltaLakeColumnHandle partitionColumn = enforcedDomainsEntry.getKey(); - Domain partitionDomain = enforcedDomainsEntry.getValue(); - if (!partitionDomain.includesNullableValue(deserializePartitionValue(partitionColumn, partitionKeys.get(partitionColumn.basePhysicalColumnName())))) { - return false; - } - } - return true; - } - - private static Domain getPathDomain(TupleDomain effectivePredicate) - { - return effectivePredicate.getDomains() - .flatMap(domains -> Optional.ofNullable(domains.get(pathColumnHandle()))) - .orElseGet(() -> Domain.all(pathColumnHandle().baseType())); - } - - private static boolean pathMatchesPredicate(Domain pathDomain, String path) - { - return pathDomain.includesNullableValue(utf8Slice(path)); - } - private List splitsForFile( ConnectorSession session, AddFileEntry addFileEntry, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java index 6f1bc9c9d473..075c1665b1a9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java @@ -48,7 +48,7 @@ import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.toCompletableFuture; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java index 4221722476af..31802bf62910 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; +import io.trino.filesystem.Location; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.DeltaLakeTableHandle; @@ -25,6 +26,7 @@ import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.statistics.ColumnStatistics; import io.trino.spi.statistics.DoubleRange; @@ -48,7 +50,14 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.DeltaLakeSplitManager.buildSplitPath; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileModifiedTimeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.fileSizeMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileModifiedTimeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getFileSizeDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.getPathDomain; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.pathMatchesPredicate; import static io.trino.spi.statistics.StatsUtil.toStatsRepresentation; import static java.lang.Double.NEGATIVE_INFINITY; import static java.lang.Double.NaN; @@ -113,6 +122,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab .filter(column -> predicatedColumnNames.contains(column.name())) .collect(toImmutableList()); + Domain pathDomain = getPathDomain(tableHandle.getNonPartitionConstraint()); + Domain fileModifiedDomain = getFileModifiedTimeDomain(tableHandle.getNonPartitionConstraint()); + Domain fileSizeDomain = getFileSizeDomain(tableHandle.getNonPartitionConstraint()); try (Stream addEntries = transactionLogAccess.getActiveFiles( session, tableSnapshot, @@ -133,6 +145,19 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab continue; } + String splitPath = buildSplitPath(Location.of(tableHandle.getLocation()), addEntry).toString(); + if (!pathMatchesPredicate(pathDomain, splitPath)) { + continue; + } + + if (!fileModifiedTimeMatchesPredicate(fileModifiedDomain, addEntry.getModificationTime())) { + continue; + } + + if (!fileSizeMatchesPredicate(fileSizeDomain, addEntry.getSize())) { + continue; + } + TupleDomain statisticsPredicate = createStatisticsPredicate( addEntry, predicatedColumns, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 19a730e9efe8..06632583572d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -86,7 +86,6 @@ import static io.trino.cache.CacheUtils.invalidateAllIf; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCheckpointFilteringEnabled; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; @@ -94,6 +93,7 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; import static java.lang.String.format; import static java.util.Objects.requireNonNull; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index aed0976fd917..ef1710ca588d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -77,7 +77,6 @@ import static com.google.common.collect.MoreCollectors.toOptional; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; -import static io.trino.plugin.deltalake.DeltaLakeSplitManager.partitionMatchesPredicate; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; @@ -89,6 +88,7 @@ import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.SIDECAR; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.TRANSACTION; +import static io.trino.plugin.deltalake.util.DeltaLakeDomains.partitionMatchesPredicate; import static io.trino.plugin.hive.util.HiveTypeTranslator.toHiveType; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.BigintType.BIGINT; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeDomains.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeDomains.java new file mode 100644 index 000000000000..96a94cf2c3e6 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/util/DeltaLakeDomains.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.util; + +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.TupleDomain; + +import java.util.Map; +import java.util.Optional; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileModifiedTimeColumnHandle; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileSizeColumnHandle; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue; +import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; + +public final class DeltaLakeDomains +{ + private DeltaLakeDomains() {} + + public static boolean partitionMatchesPredicate(Map> partitionKeys, Map domains) + { + for (Map.Entry enforcedDomainsEntry : domains.entrySet()) { + DeltaLakeColumnHandle partitionColumn = enforcedDomainsEntry.getKey(); + Domain partitionDomain = enforcedDomainsEntry.getValue(); + if (!partitionDomain.includesNullableValue(deserializePartitionValue(partitionColumn, partitionKeys.get(partitionColumn.basePhysicalColumnName())))) { + return false; + } + } + return true; + } + + public static Domain getFileModifiedTimeDomain(TupleDomain effectivePredicate) + { + return effectivePredicate.getDomains() + .flatMap(domains -> Optional.ofNullable(domains.get(fileModifiedTimeColumnHandle()))) + .orElseGet(() -> Domain.all(fileModifiedTimeColumnHandle().baseType())); + } + + public static boolean fileModifiedTimeMatchesPredicate(Domain fileModifiedTimeDomain, long fileModifiedTime) + { + return fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY)); + } + + public static Domain getPathDomain(TupleDomain effectivePredicate) + { + return effectivePredicate.getDomains() + .flatMap(domains -> Optional.ofNullable(domains.get(pathColumnHandle()))) + .orElseGet(() -> Domain.all(pathColumnHandle().baseType())); + } + + public static boolean pathMatchesPredicate(Domain pathDomain, String path) + { + return pathDomain.includesNullableValue(utf8Slice(path)); + } + + public static Domain getFileSizeDomain(TupleDomain effectivePredicate) + { + return effectivePredicate.getDomains() + .flatMap(domains -> Optional.ofNullable(domains.get(fileSizeColumnHandle()))) + .orElseGet(() -> Domain.all(fileSizeColumnHandle().baseType())); + } + + public static boolean fileSizeMatchesPredicate(Domain fileSizeDomain, long fileSize) + { + return fileSizeDomain.includesNullableValue(fileSize); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index 7c82950a9509..01f61fa8787e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -96,6 +96,7 @@ import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; import static java.util.Map.entry; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -1005,18 +1006,271 @@ public void testTargetMaxFileSize() @Test public void testPathColumn() { - try (TestTable table = newTrinoTable("test_path_column", "(x VARCHAR)")) { - assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1); + try (TestTable table = newTrinoTable("test_path_column", "(x VARCHAR, part VARCHAR) WITH (partitioned_by = ARRAY['part'])")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first', 'a#sharp'", 1); String firstFilePath = (String) computeScalar("SELECT \"$path\" FROM " + table.getName()); - assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second'", 1); + assertThat(firstFilePath.contains("a#sharp")).isFalse(); + assertThat(firstFilePath.contains("a%23sharp")).isTrue(); + + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'a%23sharp'", 1); String secondFilePath = (String) computeScalar("SELECT \"$path\" FROM " + table.getName() + " WHERE x = 'second'"); + assertThat(secondFilePath.contains("a%23sharp")).isFalse(); + assertThat(secondFilePath.contains("a%2523sharp")).isTrue(); - // Verify predicate correctness on $path column - assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" = '" + firstFilePath + "'", "VALUES 'first'"); - assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" <> '" + firstFilePath + "'", "VALUES 'second'"); - assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" IN ('" + firstFilePath + "', '" + secondFilePath + "')", "VALUES ('first'), ('second')"); - assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" IS NOT NULL", "VALUES ('first'), ('second')"); + assertQuery("SELECT x FROM " + table.getName() + " WHERE part = 'a#sharp'", "VALUES 'first'"); + assertQuery("SELECT x FROM " + table.getName() + " WHERE part = 'a%23sharp'", "VALUES 'second'"); + + // Verify predicate correctness on $path column, and check it is pusheddown + assertThat(query("SELECT x FROM " + table.getName() + " WHERE \"$path\" = '" + firstFilePath + "'")) + .matches("VALUES CAST('first' AS VARCHAR)") + .isFullyPushedDown(); + assertThat(query("SELECT x FROM " + table.getName() + " WHERE \"$path\" <> '" + firstFilePath + "'")) + .matches("VALUES CAST('second' AS VARCHAR)") + .isFullyPushedDown(); + assertThat(query("SELECT x FROM " + table.getName() + " WHERE \"$path\" IN ('" + firstFilePath + "', '" + secondFilePath + "')")) + .matches("VALUES (CAST('first' AS VARCHAR)), (CAST('second' AS VARCHAR))") + .isFullyPushedDown(); + assertThat(query("SELECT x FROM " + table.getName() + " WHERE \"$path\" IS NOT NULL")) + .matches("VALUES (CAST('first' AS VARCHAR)), (CAST('second' AS VARCHAR))") + .isFullyPushedDown(); assertQueryReturnsEmptyResult("SELECT x FROM " + table.getName() + " WHERE \"$path\" IS NULL"); + + assertQuery("SHOW STATS FOR (SELECT x FROM " + table.getName() + " WHERE \"$path\" = '" + firstFilePath + "')", + "VALUES " + + "('x', 11.0, 1.0, 0.0, null, null, null)," + + "(null, null, null, null, 1.0, null, null)"); + + // test simple delete correctness + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$path\" = 'not exist'", 0); + assertQuery("SELECT x FROM " + table.getName(), "VALUES 'first', 'second'"); + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$path\" = '" + firstFilePath + "'", 1); + assertQuery("SELECT x FROM " + table.getName(), "VALUES 'second'"); + + // test simple update correctness + assertUpdate("UPDATE " + table.getName() + " SET x = 'update' WHERE \"$path\" = '" + secondFilePath + "'", 1); + assertQuery("SELECT x FROM " + table.getName(), "VALUES 'update'"); + } + } + + @Test + public void testOptimizeWithPathColumn() + { + try (TestTable table = newTrinoTable("test_optimize_with_path_column", "(id integer)")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES 1", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES 3", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES 4", 1); + + String firstPath = (String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 1"); + String secondPath = (String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 2"); + String thirdPath = (String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 3"); + String fourthPath = (String) computeScalar("SELECT \"$path\" FROM " + tableName + " WHERE id = 4"); + + Set initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(4); + + // For optimize we need to set task_min_writer_count to 1, otherwise it will create more than one file. + Session singleWriterSession = Session.builder(getSession()) + .setSystemProperty("task_min_writer_count", "1") + .build(); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE \"$path\" = '" + firstPath + "' OR \"$path\" = '" + secondPath + "'"); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE \"$path\" = '" + thirdPath + "' OR \"$path\" = '" + fourthPath + "'"); + + Set updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles) + .hasSize(2) + .doesNotContainAnyElementsOf(initialFiles); + } + } + + @Test + public void testFileModifiedTimeHiddenColumn() + throws Exception + { + ZonedDateTime beforeTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)"); + MILLISECONDS.sleep(1); + try (TestTable table = newTrinoTable("test_file_modified_time_", "(col) AS VALUES 1")) { + // Describe output should not have the $file_modified_time hidden column + assertThat(query("DESCRIBE " + table.getName())) + .skippingTypesCheck() + .matches("VALUES ('col', 'integer', '', '')"); + + ZonedDateTime fileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + table.getName()); + ZonedDateTime afterTime = (ZonedDateTime) computeScalar("SELECT current_timestamp(3)"); + assertThat(fileModifiedTime).isBetween(beforeTime, afterTime); + + MILLISECONDS.sleep(1); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (2)", 1); + ZonedDateTime anotherFileModifiedTime = (ZonedDateTime) computeScalar("SELECT max(\"$file_modified_time\") FROM " + table.getName()); + assertThat(fileModifiedTime) + .isNotEqualTo(anotherFileModifiedTime); + assertThat(anotherFileModifiedTime).isAfter(fileModifiedTime); // to detect potential clock backward adjustment + + assertThat(query("SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')")) + .matches("VALUES 1") + .isFullyPushedDown(); + assertThat(query("SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" IN (from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "'), from_iso8601_timestamp('" + anotherFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "'))")) + .matches("VALUES 1, 2") + .isFullyPushedDown(); + assertThat(query("SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" <> from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')")) + .matches("VALUES 2") + .isFullyPushedDown(); + assertThat(query("SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" IS NOT NULL")) + .matches("VALUES 1, 2") + .isFullyPushedDown(); + assertThat(query("SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" IS NULL")) + .returnsEmptyResult() + .isFullyPushedDown(); + + assertQuery("SHOW STATS FOR (SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "'))", + "VALUES " + + "('col', null, 1.0, 0.0, null, 1, 1), " + + "(null, null, null, null, 1.0, null, null)"); + + // test simple delete correctness + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + beforeTime.format(ISO_OFFSET_DATE_TIME) + "')", 0); + assertQuery("SELECT col FROM " + table.getName(), "VALUES 1, 2"); + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')", 1); + assertQuery("SELECT col FROM " + table.getName(), "VALUES 2"); + + // test simple update correctness + assertUpdate("UPDATE " + table.getName() + " SET col = 100 WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + anotherFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')", 1); + assertQuery("SELECT col FROM " + table.getName(), "VALUES 100"); + + // EXPLAIN triggers stats calculation and also rendering + assertQuerySucceeds("EXPLAIN SELECT col FROM " + table.getName() + " WHERE \"$file_modified_time\" = from_iso8601_timestamp('" + fileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')"); + } + } + + @Test + public void testOptimizeWithFileModifiedTimeColumn() + throws Exception + { + try (TestTable table = newTrinoTable("test_optimize_with_file_modified_time_", "(id INT)")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES 1", 1); + MILLISECONDS.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1); + MILLISECONDS.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES 3", 1); + MILLISECONDS.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES 4", 1); + + ZonedDateTime firstFileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + tableName + " WHERE id = 1"); + ZonedDateTime secondFileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + tableName + " WHERE id = 2"); + ZonedDateTime thirdFileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + tableName + " WHERE id = 3"); + ZonedDateTime fourthFileModifiedTime = (ZonedDateTime) computeScalar("SELECT \"$file_modified_time\" FROM " + tableName + " WHERE id = 4"); + // Sanity check + assertThat(List.of(firstFileModifiedTime, secondFileModifiedTime, thirdFileModifiedTime, fourthFileModifiedTime)) + .doesNotHaveDuplicates(); + + Set initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(4); + + MILLISECONDS.sleep(1); + + // For optimize we need to set task_min_writer_count to 1, otherwise it will create more than one file. + Session singleWriterSession = Session.builder(getSession()) + .setSystemProperty("task_min_writer_count", "1") + .build(); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE " + + "\"$file_modified_time\" = from_iso8601_timestamp('" + firstFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "') OR " + + "\"$file_modified_time\" = from_iso8601_timestamp('" + secondFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')"); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE " + + "\"$file_modified_time\" = from_iso8601_timestamp('" + thirdFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "') OR " + + "\"$file_modified_time\" = from_iso8601_timestamp('" + fourthFileModifiedTime.format(ISO_OFFSET_DATE_TIME) + "')"); + + Set updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles) + .hasSize(2) + .doesNotContainAnyElementsOf(initialFiles); + } + } + + @Test + public void testFileSizeHiddenColumn() + { + try (TestTable table = newTrinoTable("test_file_size_column", "(val VARCHAR)")) { + String tableName = table.getName(); + + // Describe output should not have the $file_size hidden column + assertThat(query("DESCRIBE " + table.getName())) + .skippingTypesCheck() + .matches("VALUES ('val', 'varchar', '', '')"); + + assertUpdate("INSERT INTO " + tableName + " VALUES '1'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '12345'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '1234567890'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '12345678901234567890'", 1); + + long firstFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '1'"); + long secondFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '12345'"); + long thirdFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '1234567890'"); + long fourthFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '12345678901234567890'"); + + assertThat(query("SELECT val FROM " + table.getName() + " WHERE \"$file_size\" = " + firstFileSize)) + .matches("VALUES CAST('1' AS VARCHAR)") + .isFullyPushedDown(); + + assertThat(query("SELECT val FROM " + table.getName() + " WHERE \"$file_size\" > " + firstFileSize + " AND \"$file_size\" <= " + thirdFileSize)) + .matches("VALUES CAST('12345' AS VARCHAR), CAST('1234567890' AS VARCHAR)") + .isFullyPushedDown(); + + assertThat(query("SELECT val FROM " + table.getName() + " WHERE \"$file_size\" > " + secondFileSize + " AND \"$file_size\" <= " + fourthFileSize)) + .matches("VALUES CAST('1234567890' AS VARCHAR), CAST('12345678901234567890' AS VARCHAR)") + .isFullyPushedDown(); + + assertQuery("SHOW STATS FOR (SELECT val FROM " + table.getName() + " WHERE \"$file_size\" > " + thirdFileSize + ")", + "VALUES " + + "('val', 36.0, 1.0, 0.0, null, null, null), " + + "(null, null, null, null, 1.0, null, null)"); + + // test simple delete correctness + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_size\" = 0", 0); + assertQuery("SELECT val FROM " + table.getName(), "VALUES '1', '12345', '1234567890', '12345678901234567890'"); + assertUpdate("DELETE FROM " + table.getName() + " WHERE \"$file_size\" = " + firstFileSize, 1); + assertQuery("SELECT val FROM " + table.getName(), "VALUES '12345', '1234567890', '12345678901234567890'"); + + // test simple update correctness + assertUpdate("UPDATE " + table.getName() + " SET val = 'update' WHERE \"$file_size\" = " + secondFileSize, 1); + assertQuery("SELECT val FROM " + table.getName(), "VALUES 'update', '1234567890', '12345678901234567890'"); + } + } + + @Test + public void testOptimizeWithFileSizeColumn() + throws Exception + { + try (TestTable table = newTrinoTable("test_optimize_with_file_size_", "(val VARCHAR)")) { + String tableName = table.getName(); + + assertUpdate("INSERT INTO " + tableName + " VALUES '1'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '12345'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '1234567890'", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES '12345678901234567890'", 1); + + long secondFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '12345'"); + long thirdFileSize = (long) computeScalar("SELECT \"$file_size\" FROM " + tableName + " WHERE val = '1234567890'"); + + Set initialFiles = getActiveFiles(tableName); + assertThat(initialFiles).hasSize(4); + + MILLISECONDS.sleep(1); + + // For optimize we need to set task_min_writer_count to 1, otherwise it will create more than one file. + Session singleWriterSession = Session.builder(getSession()) + .setSystemProperty("task_min_writer_count", "1") + .build(); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE " + "\"$file_size\" <= " + secondFileSize); + assertQuerySucceeds(singleWriterSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE " + "\"$file_size\" >= " + thirdFileSize); + + Set updatedFiles = getActiveFiles(tableName); + assertThat(updatedFiles) + .hasSize(2) + .doesNotContainAnyElementsOf(initialFiles); } } @@ -3541,7 +3795,7 @@ public void testDeleteWithFilter() // delete filter applied on partitioned field and on synthesized field "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])", "address = 'Antioch' AND \"$file_size\" > 0", - false); + true); testDeleteWithFilter( // delete filter applied on function over partitioned field "CREATE TABLE %s (customer VARCHAR, address VARCHAR, purchases INT) WITH (location = 's3://%s/%s', partitioned_by = ARRAY['address'])",