diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 7fb84dcef8e0..b8e04d63dd70 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -2398,6 +2398,9 @@ else if (shouldFailWhenStale(materializedViewDefinition)) { // This could be a reference to a logical view or a table Optional optionalView = metadata.getView(session, name); if (optionalView.isPresent()) { + if (table.getQueryPeriod().isPresent()) { + throw new TrinoException(NOT_SUPPORTED, "Views do not support currently versioning"); + } analysis.addEmptyColumnReferencesForTable(accessControl, session.getIdentity(), name, getBranchName(table)); return createScopeForView(table, name, scope, optionalView.get()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c947ce3712cc..996b7e34369a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -70,7 +70,7 @@ import io.trino.plugin.iceberg.system.HistoryTable; import io.trino.plugin.iceberg.system.ManifestsTable; import io.trino.plugin.iceberg.system.MetadataLogEntriesTable; -import io.trino.plugin.iceberg.system.PartitionsTable; +import io.trino.plugin.iceberg.system.PartitionsView; import io.trino.plugin.iceberg.system.PropertiesTable; import io.trino.plugin.iceberg.system.RefsTable; import io.trino.plugin.iceberg.system.SnapshotsTable; @@ -79,6 +79,7 @@ import io.trino.spi.RefreshType; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; +import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.CatalogSchemaTableName; @@ -496,6 +497,7 @@ public class IcebergMetadata // Value should be ISO-8601 formatted time instant private static final String TRINO_QUERY_START_TIME = "trino-query-start-time"; + private final CatalogName catalogName; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; private final TrinoCatalog catalog; @@ -519,6 +521,7 @@ public class IcebergMetadata private OptionalLong fromSnapshotForRefresh = OptionalLong.empty(); public IcebergMetadata( + CatalogName catalogName, TypeManager typeManager, JsonCodec commitTaskCodec, TrinoCatalog catalog, @@ -536,6 +539,7 @@ public IcebergMetadata( int materializedViewRefreshMaxSnapshotsToExpire, Duration materializedViewRefreshSnapshotRetentionPeriod) { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.catalog = requireNonNull(catalog, "catalog is null"); @@ -892,7 +896,7 @@ public Optional getSystemTable(ConnectorSession session, SchemaTabl .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(ConnectorSession session, SchemaTableName tableName) + private Optional getRawBaseTable(ConnectorSession session, SchemaTableName tableName) { if (!isIcebergTableName(tableName.getTableName()) || isDataTable(tableName.getTableName()) || isMaterializedViewStorage(tableName.getTableName())) { return Optional.empty(); @@ -900,9 +904,8 @@ private Optional getRawSystemTable(ConnectorSession session, Schema // Only when dealing with an actual system table proceed to retrieve the base table for the system table String name = tableNameFrom(tableName.getTableName()); - BaseTable table; try { - table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name)); + return Optional.of(catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name))); } catch (TableNotFoundException e) { return Optional.empty(); @@ -911,14 +914,21 @@ private Optional getRawSystemTable(ConnectorSession session, Schema // avoid dealing with non Iceberg tables return Optional.empty(); } + } + private Optional getRawSystemTable(ConnectorSession session, SchemaTableName tableName) + { + Optional baseTable = getRawBaseTable(session, tableName); + if (baseTable.isEmpty()) { + return Optional.empty(); + } + BaseTable table = baseTable.get(); TableType tableType = IcebergTableName.tableTypeFrom(tableName.getTableName()); return switch (tableType) { case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above. case HISTORY -> Optional.of(new HistoryTable(tableName, table)); case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, icebergScanExecutor)); case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, icebergScanExecutor)); - case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor)); case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor)); case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table))); case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table))); @@ -926,6 +936,21 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor)); case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table)); case REFS -> Optional.of(new RefsTable(tableName, table, icebergScanExecutor)); + default -> Optional.empty(); + }; + } + + private Optional getRawSystemView(ConnectorSession session, SchemaTableName tableName) + { + Optional baseTable = getRawBaseTable(session, tableName); + if (baseTable.isEmpty()) { + return Optional.empty(); + } + BaseTable table = baseTable.get(); + TableType tableType = IcebergTableName.tableTypeFrom(tableName.getTableName()); + return switch (tableType) { + case PARTITIONS -> Optional.ofNullable(new PartitionsView(typeManager, table, catalogName.toString(), tableName.getSchemaName(), tableNameFrom(tableName.getTableName())).get()); + default -> Optional.empty(); }; } @@ -3491,6 +3516,12 @@ public Map getViews(ConnectorSession s @Override public boolean isView(ConnectorSession session, SchemaTableName viewName) { + Optional systemView = getRawSystemView(session, viewName); + + if (systemView.isPresent()) { + return true; + } + try { return catalog.getView(session, viewName).isPresent(); } @@ -3505,6 +3536,12 @@ public boolean isView(ConnectorSession session, SchemaTableName viewName) @Override public Optional getView(ConnectorSession session, SchemaTableName viewName) { + Optional systemView = getRawSystemView(session, viewName); + + if (systemView.isPresent()) { + return systemView; + } + return catalog.getView(session, viewName); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index f9dabb374868..f1de66665429 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -23,6 +23,7 @@ import io.trino.metastore.RawHiveMetastoreFactory; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.delete.DeletionVectorWriter; +import io.trino.spi.catalog.CatalogName; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; @@ -36,6 +37,7 @@ public class IcebergMetadataFactory { + private final CatalogName catalogName; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; private final TrinoCatalogFactory catalogFactory; @@ -55,6 +57,7 @@ public class IcebergMetadataFactory @Inject public IcebergMetadataFactory( + CatalogName catalogName, TypeManager typeManager, JsonCodec commitTaskCodec, TrinoCatalogFactory catalogFactory, @@ -69,6 +72,7 @@ public IcebergMetadataFactory( @ForIcebergFileDelete ExecutorService icebergFileDeleteExecutor, IcebergConfig config) { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); @@ -101,6 +105,7 @@ public IcebergMetadataFactory( public IcebergMetadata create(ConnectorIdentity identity) { return new IcebergMetadata( + catalogName, typeManager, commitTaskCodec, catalogFactory.create(identity), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java index 63790f48bb05..ba295250d6fd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java @@ -30,15 +30,21 @@ public final class IcebergTableName private IcebergTableName() {} private static final Pattern TABLE_PATTERN; + private static final Pattern SYSTEM_VIEW_PATTERN; static { String referencableTableTypes = Stream.of(TableType.values()) .filter(tableType -> tableType != DATA) .map(tableType -> tableType.name().toLowerCase(ENGLISH)) .collect(Collectors.joining("|")); + TABLE_PATTERN = Pattern.compile("" + "(?[^$@]+)" + "(?:\\$(?(?i:" + referencableTableTypes + ")))?"); + + SYSTEM_VIEW_PATTERN = Pattern.compile("" + + "(?
[^$@]+)" + + "(?:\\$(?(?i:partitions)))?"); } public static boolean isIcebergTableName(String tableName) @@ -46,6 +52,11 @@ public static boolean isIcebergTableName(String tableName) return TABLE_PATTERN.matcher(tableName).matches(); } + public static boolean isIcebergSystemViewName(String tableName) + { + return SYSTEM_VIEW_PATTERN.matcher(tableName).matches(); + } + public static String tableNameWithType(String tableName, TableType tableType) { requireNonNull(tableName, "tableName is null"); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index b1ae8a19375d..e3bb0f048081 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -541,6 +541,10 @@ private static Stream primitiveFields(NestedField nestedField) .map(field -> NestedField.from(field).withName(nestedField.name() + "." + field.name()).build()); } + if (type.isVariantType()) { + return Stream.empty(); + } + throw new IllegalStateException("Unsupported field type: " + nestedField); } @@ -596,7 +600,7 @@ public static String quotedTableName(SchemaTableName name) return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); } - private static String quotedName(String name) + public static String quotedName(String name) { if (SIMPLE_NAME.matcher(name).matches()) { return name; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java index 41655354826e..087fa61855f8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java @@ -27,6 +27,7 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; @@ -34,15 +35,20 @@ import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.iceberg.IcebergUtil.primitiveFields; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields; import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType; import static io.trino.spi.type.BigintType.BIGINT; @@ -116,6 +122,7 @@ public final class FilesTable private final Table icebergTable; private final OptionalLong snapshotId; private final Optional partitionColumnType; + private final Optional boundsColumnType; public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, OptionalLong snapshotId) { @@ -125,12 +132,16 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb List partitionFields = getAllPartitionFields(icebergTable); this.partitionColumnType = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema()) .map(IcebergPartitionColumn::rowType); + this.boundsColumnType = buildBoundsType(icebergTable.schema(), typeManager); ImmutableList.Builder columns = ImmutableList.builder(); for (String columnName : COLUMN_NAMES) { if (columnName.equals(PARTITION_COLUMN_NAME)) { partitionColumnType.ifPresent(type -> columns.add(new ColumnMetadata(columnName, type))); } + else if (columnName.equals(LOWER_BOUNDS_COLUMN_NAME) || columnName.equals(UPPER_BOUNDS_COLUMN_NAME)) { + boundsColumnType.ifPresent(type -> columns.add(new ColumnMetadata(columnName, type))); + } else { columns.add(new ColumnMetadata(columnName, getColumnType(columnName, typeManager))); } @@ -161,7 +172,8 @@ public Optional splitSource(ConnectorSession connectorSess icebergTable.specs().entrySet().stream().collect(toImmutableMap( Map.Entry::getKey, partitionSpec -> PartitionSpecParser.toJson(partitionSpec.getValue()))), - partitionColumnType)); + partitionColumnType, + boundsColumnType)); } @Override @@ -193,8 +205,6 @@ public static Type getColumnType(String columnName, TypeManager typeManager) NULL_VALUE_COUNTS_COLUMN_NAME, VALUE_COUNTS_COLUMN_NAME, NAN_VALUE_COUNTS_COLUMN_NAME -> new MapType(INTEGER, BIGINT, typeManager.getTypeOperators()); - case LOWER_BOUNDS_COLUMN_NAME, - UPPER_BOUNDS_COLUMN_NAME -> new MapType(INTEGER, VARCHAR, typeManager.getTypeOperators()); case KEY_METADATA_COLUMN_NAME -> VARBINARY; case SPLIT_OFFSETS_COLUMN_NAME -> new ArrayType(BIGINT); case EQUALITY_IDS_COLUMN_NAME -> new ArrayType(INTEGER); @@ -202,4 +212,20 @@ public static Type getColumnType(String columnName, TypeManager typeManager) default -> throw new IllegalArgumentException("Unexpected value: " + columnName); }; } + + private static Optional buildBoundsType(Schema icebergSchema, TypeManager typeManager) + { + List primitiveFields = primitiveFields(icebergSchema); + + if (primitiveFields.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(RowType.from(primitiveFields.stream() + .map(column -> { + Type trinoType = toTrinoType(column.type(), typeManager); + return RowType.field(String.valueOf(column.fieldId()), trinoType); + }) + .collect(toImmutableList()))); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsTable.java deleted file mode 100644 index 419f659f9cb3..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsTable.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.system; - -import com.google.common.collect.ImmutableList; -import io.trino.plugin.iceberg.IcebergStatistics; -import io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex; -import io.trino.spi.block.SqlRow; -import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.ConnectorTransactionHandle; -import io.trino.spi.connector.InMemoryRecordSet; -import io.trino.spi.connector.RecordCursor; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.SystemTable; -import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.type.RowType; -import io.trino.spi.type.TypeManager; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types.NestedField; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.OptionalLong; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.stream.Stream; - -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; -import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; -import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; -import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper; -import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; -import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields; -import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType; -import static io.trino.plugin.iceberg.util.SystemTableUtil.partitionTypes; -import static io.trino.spi.block.RowValueBuilder.buildRowValue; -import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.TypeUtils.writeNativeValue; -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toSet; - -public class PartitionsTable - implements SystemTable -{ - private final TypeManager typeManager; - private final Table icebergTable; - private final OptionalLong snapshotId; - private final Map idToTypeMapping; - private final List nonPartitionPrimitiveColumns; - private final Optional partitionColumnType; - private final List partitionFields; - private final Optional dataColumnType; - private final List columnMetricTypes; - private final List resultTypes; - private final ConnectorTableMetadata connectorTableMetadata; - private final ExecutorService executor; - - public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, OptionalLong snapshotId, ExecutorService executor) - { - this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); - this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); - this.idToTypeMapping = primitiveFieldTypes(icebergTable.schema()); - - List columns = icebergTable.schema().columns(); - this.partitionFields = getAllPartitionFields(icebergTable); - - ImmutableList.Builder columnMetadataBuilder = ImmutableList.builder(); - - this.partitionColumnType = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema()); - partitionColumnType.ifPresent(icebergPartitionColumn -> - columnMetadataBuilder.add(new ColumnMetadata("partition", icebergPartitionColumn.rowType()))); - - Stream.of("record_count", "file_count", "total_size") - .forEach(metric -> columnMetadataBuilder.add(new ColumnMetadata(metric, BIGINT))); - - Set identityPartitionIds = getIdentityPartitions(icebergTable.spec()).keySet().stream() - .map(PartitionField::sourceId) - .collect(toSet()); - - this.nonPartitionPrimitiveColumns = columns.stream() - .filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType()) - .collect(toImmutableList()); - - this.dataColumnType = getMetricsColumnType(typeManager, this.nonPartitionPrimitiveColumns); - if (dataColumnType.isPresent()) { - columnMetadataBuilder.add(new ColumnMetadata("data", dataColumnType.get())); - this.columnMetricTypes = dataColumnType.get().getFields().stream() - .map(RowType.Field::getType) - .map(RowType.class::cast) - .collect(toImmutableList()); - } - else { - this.columnMetricTypes = ImmutableList.of(); - } - - List columnMetadata = columnMetadataBuilder.build(); - this.resultTypes = columnMetadata.stream() - .map(ColumnMetadata::getType) - .collect(toImmutableList()); - this.connectorTableMetadata = new ConnectorTableMetadata(tableName, columnMetadata); - this.executor = requireNonNull(executor, "executor is null"); - } - - @Override - public Distribution getDistribution() - { - return Distribution.SINGLE_COORDINATOR; - } - - @Override - public ConnectorTableMetadata getTableMetadata() - { - return connectorTableMetadata; - } - - private static Optional getMetricsColumnType(TypeManager typeManager, List columns) - { - List metricColumns = columns.stream() - .map(column -> RowType.field( - column.name(), - RowType.from(ImmutableList.of( - new RowType.Field(Optional.of("min"), toTrinoType(column.type(), typeManager)), - new RowType.Field(Optional.of("max"), toTrinoType(column.type(), typeManager)), - new RowType.Field(Optional.of("null_count"), BIGINT), - new RowType.Field(Optional.of("nan_count"), BIGINT))))) - .collect(toImmutableList()); - if (metricColumns.isEmpty()) { - return Optional.empty(); - } - return Optional.of(RowType.from(metricColumns)); - } - - @Override - public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) - { - if (snapshotId.isEmpty()) { - return new InMemoryRecordSet(resultTypes, ImmutableList.of()).cursor(); - } - TableScan tableScan = icebergTable.newScan() - .useSnapshot(snapshotId.getAsLong()) - .includeColumnStats() - .planWith(executor); - // TODO make the cursor lazy - return buildRecordCursor(getStatisticsByPartition(tableScan)); - } - - private Map getStatisticsByPartition(TableScan tableScan) - { - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { - Map partitions = new HashMap<>(); - for (FileScanTask fileScanTask : fileScanTasks) { - DataFile dataFile = fileScanTask.file(); - StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(fileScanTask); - - partitions.computeIfAbsent( - structLikeWrapperWithFieldIdToIndex, - _ -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager)) - .acceptDataFile(dataFile, fileScanTask.spec()); - } - - return partitions.entrySet().stream() - .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().build())); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private RecordCursor buildRecordCursor(Map partitionStatistics) - { - List partitionTypes = partitionTypes(partitionFields, idToTypeMapping); - List> partitionColumnClass = partitionTypes.stream() - .map(type -> type.typeId().javaClass()) - .collect(toImmutableList()); - - ImmutableList.Builder> records = ImmutableList.builder(); - - for (Entry partitionEntry : partitionStatistics.entrySet()) { - StructLikeWrapperWithFieldIdToIndex partitionStruct = partitionEntry.getKey(); - IcebergStatistics icebergStatistics = partitionEntry.getValue(); - List row = new ArrayList<>(); - - // add data for partition columns - partitionColumnType.ifPresent(partitionColumnType -> { - row.add(buildRowValue(partitionColumnType.rowType(), fields -> { - List partitionColumnTypes = partitionColumnType.rowType().getFields().stream() - .map(RowType.Field::getType) - .collect(toImmutableList()); - for (int i = 0; i < partitionColumnTypes.size(); i++) { - io.trino.spi.type.Type trinoType = partitionColumnType.rowType().getFields().get(i).getType(); - Object value = null; - Integer fieldId = partitionColumnType.fieldIds().get(i); - if (partitionStruct.getFieldIdToIndex().containsKey(fieldId)) { - value = convertIcebergValueToTrino( - partitionTypes.get(i), - partitionStruct.getStructLikeWrapper().get().get(partitionStruct.getFieldIdToIndex().get(fieldId), partitionColumnClass.get(i))); - } - writeNativeValue(trinoType, fields.get(i), value); - } - })); - }); - - // add the top level metrics. - row.add(icebergStatistics.recordCount()); - row.add(icebergStatistics.fileCount()); - row.add(icebergStatistics.size()); - - // add column level metrics - dataColumnType.ifPresent(dataColumnType -> { - row.add(buildRowValue(dataColumnType, fields -> { - for (int i = 0; i < columnMetricTypes.size(); i++) { - Integer fieldId = nonPartitionPrimitiveColumns.get(i).fieldId(); - Object min = icebergStatistics.minValues().get(fieldId); - Object max = icebergStatistics.maxValues().get(fieldId); - Long nullCount = icebergStatistics.nullCounts().get(fieldId); - Long nanCount = icebergStatistics.nanCounts().get(fieldId); - RowType columnMetricType = columnMetricTypes.get(i); - if (min == null && max == null && nullCount == null) { - fields.get(i).appendNull(); - } - else { - columnMetricType.writeObject(fields.get(i), getColumnMetricBlock(columnMetricType, min, max, nullCount, nanCount)); - } - } - })); - }); - - records.add(row); - } - - return new InMemoryRecordSet(resultTypes, records.build()).cursor(); - } - - private static SqlRow getColumnMetricBlock(RowType columnMetricType, Object min, Object max, Long nullCount, Long nanCount) - { - return buildRowValue(columnMetricType, fieldBuilders -> { - List fields = columnMetricType.getFields(); - writeNativeValue(fields.get(0).getType(), fieldBuilders.get(0), min); - writeNativeValue(fields.get(1).getType(), fieldBuilders.get(1), max); - writeNativeValue(fields.get(2).getType(), fieldBuilders.get(2), nullCount); - writeNativeValue(fields.get(3).getType(), fieldBuilders.get(3), nanCount); - }); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsView.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsView.java new file mode 100644 index 000000000000..03affb773124 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsView.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.system; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.IcebergUtil; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.ConnectorViewDefinition.ViewColumn; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types.NestedField; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields; +import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType; +import static io.trino.spi.type.BigintType.BIGINT; +import static java.util.Objects.requireNonNull; + +public class PartitionsView + implements Supplier +{ + private static final Joiner COMMA_JOINER = Joiner.on(", ").skipNulls(); + + private final String catalogName; + private final String schemaName; + private final String tableName; + private final boolean hasPartitionColumn; + private final boolean hasDataColumn; + private final String dataAggregationSql; + private final List viewColumns; + + public PartitionsView(TypeManager typeManager, Table icebergTable, String catalogName, String schemaName, String tableName) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + + ImmutableList.Builder viewColumns = ImmutableList.builder(); + Optional partitionType = getPartitionColumnType(typeManager, getAllPartitionFields(icebergTable), icebergTable.schema()); + + if (partitionType.isPresent()) { + hasPartitionColumn = true; + viewColumns.add(new ViewColumn("partition", partitionType.get().rowType().getTypeId(), Optional.empty())); + } + else { + hasPartitionColumn = false; + } + + Stream.of("record_count", "file_count", "total_size") + .forEach(column -> viewColumns.add(new ViewColumn(column, BIGINT.getTypeId(), Optional.empty()))); + + Set identityPartitionIds = getIdentityPartitions(icebergTable.spec()).keySet().stream() + .map(PartitionField::sourceId) + .collect(toImmutableSet()); + + List nonPartitionPrimitiveColumns = icebergTable.schema().columns().stream() + .filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType()) + .collect(toImmutableList()); + + Optional dataColumnType = getMetricsColumnType(typeManager, nonPartitionPrimitiveColumns); + + if (dataColumnType.isPresent()) { + hasDataColumn = true; + viewColumns.add(new ViewColumn("data", dataColumnType.get().getTypeId(), Optional.empty())); + dataAggregationSql = buildDataAggregation(typeManager, nonPartitionPrimitiveColumns); + } + else { + hasDataColumn = false; + dataAggregationSql = ""; + } + + this.viewColumns = viewColumns.build(); + } + + @Override + public ConnectorViewDefinition get() + { + String viewSql = + """ + SELECT %s SUM(record_count) AS record_count, COUNT(*) AS file_count, SUM(file_size_in_bytes) AS total_size%s + FROM %s.%s.%s + %s + """.formatted( + hasPartitionColumn ? "partition," : "", + hasDataColumn ? ", " + dataAggregationSql : "", + IcebergUtil.quotedName(catalogName), + IcebergUtil.quotedName(schemaName), + IcebergUtil.quotedName(tableName + "$files"), + hasPartitionColumn ? "GROUP BY 1" : ""); + + return new ConnectorViewDefinition( + viewSql, + Optional.of(catalogName), + Optional.of(schemaName), + viewColumns, + Optional.empty(), + Optional.empty(), + true, + ImmutableList.of()); + } + + private static String buildDataAggregation(TypeManager typeManager, List nonPartitionColumns) + { + ImmutableList.Builder rowValues = ImmutableList.builder(); + ImmutableList.Builder rowTypes = ImmutableList.builder(); + + for (NestedField column : nonPartitionColumns) { + String trinoTypeDisplayName = toTrinoType(column.type(), typeManager).getDisplayName(); + rowValues.add(buildColumnAggregation(column.fieldId())); + rowTypes.add(buildColumnRowType(column.name(), trinoTypeDisplayName)); + } + + return "CAST(ROW(%s) AS ROW(%s)) AS data".formatted(COMMA_JOINER.join(rowValues.build()), COMMA_JOINER.join(rowTypes.build())); + } + + private static String buildColumnAggregation(int fieldId) + { + String min = "MIN(lower_bounds.\"%1$d\")".formatted(fieldId); + String max = "MAX(upper_bounds.\"%1$d\")".formatted(fieldId); + String nullCount = "SUM(element_at(null_value_counts, %d))".formatted(fieldId); + String nanCount = "SUM(element_at(nan_value_counts, %d))".formatted(fieldId); + + // we need this case to ensure that it is compatible with the current $partitions implementation + return """ + CASE + WHEN %1$s IS NULL AND %2$s IS NULL AND %3$s IS NULL THEN NULL + ELSE ROW(%1$s, %2$s, %3$s, %4$s) + END + """ + .formatted(min, max, nullCount, nanCount); + } + + private static String buildColumnRowType(String columnName, String trinoTypeDisplayName) + { + return "%s ROW(min %2$s, max %2$s, null_count BIGINT, nan_count BIGINT)" + .formatted("\"" + columnName + "\"", trinoTypeDisplayName); + } + + private static Optional getMetricsColumnType(TypeManager typeManager, List columns) + { + List metricColumns = columns.stream() + .map(column -> { + Type trinoType = toTrinoType(column.type(), typeManager); + return RowType.field( + column.name(), + RowType.from(ImmutableList.of( + new RowType.Field(Optional.of("min"), trinoType), + new RowType.Field(Optional.of("max"), trinoType), + new RowType.Field(Optional.of("null_count"), BIGINT), + new RowType.Field(Optional.of("nan_count"), BIGINT)))); + }) + .collect(toImmutableList()); + + if (metricColumns.isEmpty()) { + return Optional.empty(); + } + return Optional.of(RowType.from(metricColumns)); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java index f51160858ab5..5b7e434bc77a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java @@ -19,7 +19,6 @@ import io.trino.plugin.iceberg.IcebergUtil; import io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex; import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; -import io.trino.plugin.iceberg.system.FilesTable; import io.trino.plugin.iceberg.system.IcebergPartitionColumn; import io.trino.spi.Page; import io.trino.spi.PageBuilder; @@ -27,6 +26,7 @@ import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.MapBlockBuilder; import io.trino.spi.block.RowBlockBuilder; +import io.trino.spi.block.RowEntryBuilder; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.SourcePage; import io.trino.spi.type.RowType; @@ -118,6 +118,7 @@ public final class FilesTablePageSource private final Map idToPartitionSpecMapping; private final List partitionFields; private final Optional partitionColumnType; + private final Optional boundsColumnType; private final List primitiveFields; private final Iterator entryIterator; private final Map columnNameToIndex; @@ -143,6 +144,7 @@ public FilesTablePageSource( entry -> PartitionSpecParser.fromJson(schema, entry.getValue()))); this.partitionFields = getAllPartitionFields(schema, idToPartitionSpecMapping); this.partitionColumnType = getPartitionColumnType(typeManager, partitionFields, schema); + this.boundsColumnType = split.boundsColumnType(); this.primitiveFields = IcebergUtil.primitiveFields(schema).stream() .sorted(Comparator.comparing(Types.NestedField::name)) .collect(toImmutableList()); @@ -153,6 +155,9 @@ public FilesTablePageSource( if (column.equals(PARTITION_COLUMN_NAME)) { return split.partitionColumnType().orElseThrow(); } + else if (column.equals(LOWER_BOUNDS_COLUMN_NAME) || column.equals(UPPER_BOUNDS_COLUMN_NAME)) { + return split.boundsColumnType().orElseThrow(); + } return getColumnType(column, typeManager); }).collect(toImmutableList())); this.columnNameToIndex = mapWithIndex( @@ -229,16 +234,8 @@ public SourcePage getNextSourcePage() NAN_VALUE_COUNTS_COLUMN_NAME, contentFile::nanValueCounts, FilesTablePageSource::writeIntegerBigintInMap); - writeValueOrNull( - pageBuilder, - LOWER_BOUNDS_COLUMN_NAME, - contentFile::lowerBounds, - this::writeIntegerVarcharInMap); - writeValueOrNull( - pageBuilder, - UPPER_BOUNDS_COLUMN_NAME, - contentFile::upperBounds, - this::writeIntegerVarcharInMap); + writeTypedBounds(pageBuilder, LOWER_BOUNDS_COLUMN_NAME, contentFile.lowerBounds()); + writeTypedBounds(pageBuilder, UPPER_BOUNDS_COLUMN_NAME, contentFile.upperBounds()); writeValueOrNull( pageBuilder, KEY_METADATA_COLUMN_NAME, @@ -319,7 +316,7 @@ public void close() private void writePartitionColumns(ContentFile contentFile) { - if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(FilesTable.PARTITION_COLUMN_NAME)) { + if (partitionColumnType.isPresent() && columnNameToIndex.containsKey(PARTITION_COLUMN_NAME)) { PartitionSpec partitionSpec = idToPartitionSpecMapping.get(contentFile.specId()); StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper(partitionSpec.partitionType(), contentFile.partition()); List partitionTypes = partitionTypes(partitionFields, idToTypeMapping); @@ -330,7 +327,7 @@ private void writePartitionColumns(ContentFile contentFile) .map(RowType.Field::getType) .collect(toImmutableList()); - if (pageBuilder.getBlockBuilder(columnNameToIndex.get(FilesTable.PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) { + if (pageBuilder.getBlockBuilder(columnNameToIndex.get(PARTITION_COLUMN_NAME)) instanceof RowBlockBuilder rowBlockBuilder) { rowBlockBuilder.buildEntry(fields -> { for (int i = 0; i < partitionColumnTypes.size(); i++) { io.trino.spi.type.Type trinoType = partitionColumnType.get().rowType().getFields().get(i).getType(); @@ -348,6 +345,48 @@ private void writePartitionColumns(ContentFile contentFile) } } + private void writeTypedBounds(PageBuilder pageBuilder, String columnName, Map bounds) + { + Integer channel = columnNameToIndex.get(columnName); + if (boundsColumnType.isEmpty() || channel == null) { + return; + } + + BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(channel); + if (!(blockBuilder instanceof RowBlockBuilder boundsBuilder) || !(boundsColumnType.get() instanceof RowType rowType)) { + return; + } + + if (bounds == null) { + boundsBuilder.appendNull(); + return; + } + + List fields = rowType.getFields(); + // start a new lower/upper bounds entry + RowEntryBuilder boundsEntry = boundsBuilder.buildEntry(); + + for (int i = 0; i < fields.size(); i++) { + BlockBuilder fieldIdBlockBuilder = boundsEntry.getFieldBuilder(i); + // get the field id + RowType.Field fieldIdRowField = fields.get(i); + int fieldId = Integer.parseInt(fieldIdRowField.getName().orElseThrow()); + // get the iceberg type + PrimitiveType icebergType = idToTypeMapping.get(fieldId); + // get the corresponding trino type. min and max should be the same type + io.trino.spi.type.Type trinoType = fieldIdRowField.getType(); + // write bound entry entries + if (icebergType == null || !bounds.containsKey(fieldId)) { + fieldIdBlockBuilder.appendNull(); + continue; + } + Object nativeValue = convertIcebergValueToTrino(icebergType, Conversions.fromByteBuffer(icebergType, bounds.get(fieldId))); + writeNativeValue(trinoType, fieldIdBlockBuilder, nativeValue); + } + + boundsEntry.build(); + } + private void writeNull(PageBuilder pageBuilder, String columnName) { Integer channel = columnNameToIndex.get(columnName); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java index e66d14e4714b..3ad28cdd7551 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplit.java @@ -28,7 +28,8 @@ public record FilesTableSplit( String schemaJson, String metadataTableJson, Map partitionSpecsByIdJson, - Optional partitionColumnType) + Optional partitionColumnType, + Optional boundsColumnType) implements ConnectorSplit { private static final int INSTANCE_SIZE = instanceSize(FilesTableSplit.class); @@ -36,7 +37,7 @@ public record FilesTableSplit( @Override public long getRetainedSizeInBytes() { - // partitionColumnType is not accounted for as Type instances are cached (by TypeRegistry) and shared + // partitionColumnType/boundsColumnType is not accounted for as Type instances are cached (by TypeRegistry) and shared return INSTANCE_SIZE + manifestFile.getRetainedSizeInBytes() + estimatedSizeOf(schemaJson) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java index e03eb366b237..42ec92b4d964 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTableSplitSource.java @@ -41,6 +41,7 @@ public final class FilesTableSplitSource private final String metadataSchemaJson; private final Map partitionSpecsByIdJson; private final Optional partitionColumnType; + private final Optional boundsColumnType; private boolean finished; public FilesTableSplitSource( @@ -49,7 +50,8 @@ public FilesTableSplitSource( String schemaJson, String metadataSchemaJson, Map partitionSpecsByIdJson, - Optional partitionColumnType) + Optional partitionColumnType, + Optional boundsColumnType) { this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); @@ -57,6 +59,7 @@ public FilesTableSplitSource( this.metadataSchemaJson = requireNonNull(metadataSchemaJson, "metadataSchemaJson is null"); this.partitionSpecsByIdJson = ImmutableMap.copyOf(partitionSpecsByIdJson); this.partitionColumnType = requireNonNull(partitionColumnType, "partitionColumnType is null"); + this.boundsColumnType = requireNonNull(boundsColumnType, "boundsColumnType is null"); } @Override @@ -73,7 +76,8 @@ public CompletableFuture getNextBatch(int maxSize) schemaJson, metadataSchemaJson, partitionSpecsByIdJson, - partitionColumnType)); + partitionColumnType, + boundsColumnType)); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index 62467fd6f050..713b13c8a546 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -536,8 +536,8 @@ public void testFilesPartitionTable() "('value_counts', 'map(integer, bigint)', '', '')," + "('null_value_counts', 'map(integer, bigint)', '', '')," + "('nan_value_counts', 'map(integer, bigint)', '', '')," + - "('lower_bounds', 'map(integer, varchar)', '', '')," + - "('upper_bounds', 'map(integer, varchar)', '', '')," + + "('lower_bounds', 'row(\"1\" bigint, \"2\" date)', '', '')," + + "('upper_bounds', 'row(\"1\" bigint, \"2\" date)', '', '')," + "('key_metadata', 'varbinary', '', '')," + "('split_offsets', 'array(bigint)', '', '')," + "('equality_ids', 'array(integer)', '', '')," + diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java index bbeabeb1e4db..20f83f5d185c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java @@ -31,8 +31,8 @@ public class DataFileRecord private final Map valueCounts; private final Map nullValueCounts; private final Map nanValueCounts; - private final Map lowerBounds; - private final Map upperBounds; + private final MaterializedRow lowerBounds; + private final MaterializedRow upperBounds; @SuppressWarnings("unchecked") public static DataFileRecord toDataFileRecord(MaterializedRow row) @@ -48,8 +48,8 @@ public static DataFileRecord toDataFileRecord(MaterializedRow row) row.getField(7) != null ? ImmutableMap.copyOf((Map) row.getField(7)) : null, row.getField(8) != null ? ImmutableMap.copyOf((Map) row.getField(8)) : null, row.getField(9) != null ? ImmutableMap.copyOf((Map) row.getField(9)) : null, - row.getField(10) != null ? ImmutableMap.copyOf((Map) row.getField(10)) : null, - row.getField(11) != null ? ImmutableMap.copyOf((Map) row.getField(11)) : null); + row.getField(10) != null ? (MaterializedRow) row.getField(10) : null, + row.getField(11) != null ? (MaterializedRow) row.getField(11) : null); } private DataFileRecord( @@ -62,8 +62,8 @@ private DataFileRecord( Map valueCounts, Map nullValueCounts, Map nanValueCounts, - Map lowerBounds, - Map upperBounds) + MaterializedRow lowerBounds, + MaterializedRow upperBounds) { this.content = content; this.filePath = filePath; @@ -123,12 +123,12 @@ public Map getNanValueCounts() return nanValueCounts; } - public Map getLowerBounds() + public MaterializedRow getLowerBounds() { return lowerBounds; } - public Map getUpperBounds() + public MaterializedRow getUpperBounds() { return upperBounds; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 3e2e514413bf..b2822d6a05f5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -384,7 +384,7 @@ public void testSelectSystemTable() // select from $partitions assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$partitions\"", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 1) + .addCopies(GET_TABLE, 2) .build()); // select from $files diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java index 0a1b0225fa71..b42d1a94b66f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java @@ -36,8 +36,10 @@ import org.junit.jupiter.api.Test; import java.io.File; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.List; -import java.util.Map; +import java.util.Objects; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.trino.SystemSessionProperties.INITIAL_SPLITS_PER_NODE; @@ -152,8 +154,8 @@ public void testMetrics() assertThat(datafile.getRecordCount()).isEqualTo(1); assertThat(datafile.getValueCounts()).hasSize(1); assertThat(datafile.getNullValueCounts()).hasSize(1); - assertThat(datafile.getUpperBounds()).hasSize(1); - assertThat(datafile.getLowerBounds()).hasSize(1); + assertThat(datafile.getUpperBounds().getFields()).hasSize(2); + assertThat(datafile.getLowerBounds().getFields()).hasSize(2); // set c1 metrics mode to count assertUpdate("create table c1_metrics_count (c1 varchar, c2 varchar)"); @@ -194,8 +196,10 @@ public void testMetrics() assertThat(datafile.getRecordCount()).isEqualTo(1); assertThat(datafile.getValueCounts()).hasSize(1); assertThat(datafile.getNullValueCounts()).hasSize(1); - datafile.getUpperBounds().forEach((_, v) -> assertThat(v.length()).isEqualTo(10)); - datafile.getLowerBounds().forEach((_, v) -> assertThat(v.length()).isEqualTo(10)); + datafile.getUpperBounds().getFields().stream().filter(Objects::nonNull) + .forEach(v -> assertThat(v.toString().length()).isEqualTo(10)); + datafile.getLowerBounds().getFields().stream().filter(Objects::nonNull) + .forEach(v -> assertThat(v.toString().length()).isEqualTo(10)); // keep both c1 and c2 metrics assertUpdate("create table c_metrics (c1 varchar, c2 varchar)"); @@ -214,8 +218,8 @@ public void testMetrics() assertThat(datafile.getRecordCount()).isEqualTo(1); assertThat(datafile.getValueCounts()).hasSize(2); assertThat(datafile.getNullValueCounts()).hasSize(2); - assertThat(datafile.getUpperBounds()).hasSize(2); - assertThat(datafile.getLowerBounds()).hasSize(2); + assertThat(datafile.getUpperBounds().getFields()).hasSize(2); + assertThat(datafile.getLowerBounds().getFields()).hasSize(2); // keep all metrics assertUpdate("create table metrics (c1 varchar, c2 varchar)"); @@ -233,8 +237,8 @@ public void testMetrics() assertThat(datafile.getRecordCount()).isEqualTo(1); assertThat(datafile.getValueCounts()).hasSize(2); assertThat(datafile.getNullValueCounts()).hasSize(2); - assertThat(datafile.getUpperBounds()).hasSize(2); - assertThat(datafile.getLowerBounds()).hasSize(2); + assertThat(datafile.getUpperBounds().getFields()).hasSize(2); + assertThat(datafile.getLowerBounds().getFields()).hasSize(2); } @Test @@ -265,28 +269,28 @@ public void testBasic() assertThat(datafile.getNanValueCounts()).isNull(); // Check per-column lower bound - Map lowerBounds = datafile.getLowerBounds(); - assertThat(lowerBounds).containsEntry(1, "1"); - assertThat(lowerBounds).containsEntry(2, "1"); - assertThat(lowerBounds).containsEntry(3, "F"); - assertThat(lowerBounds).containsEntry(4, "874.89"); - assertThat(lowerBounds).containsEntry(5, "1992-01-01"); - assertThat(lowerBounds).containsEntry(6, "1-URGENT"); - assertThat(lowerBounds).containsEntry(7, "Clerk#000000001"); - assertThat(lowerBounds).containsEntry(8, "0"); - assertThat(lowerBounds).containsEntry(9, " about the accou"); + MaterializedRow lowerBounds = datafile.getLowerBounds(); + assertThat(lowerBounds.getField(0)).isEqualTo(1L); + assertThat(lowerBounds.getField(1)).isEqualTo(1L); + assertThat(lowerBounds.getField(2)).isEqualTo("F"); + assertThat(lowerBounds.getField(3)).isEqualTo(874.89); + assertThat(lowerBounds.getField(4)).isEqualTo(LocalDate.of(1992, 1, 1)); + assertThat(lowerBounds.getField(5)).isEqualTo("1-URGENT"); + assertThat(lowerBounds.getField(6)).isEqualTo("Clerk#000000001"); + assertThat(lowerBounds.getField(7)).isEqualTo(0); + assertThat(lowerBounds.getField(8)).isEqualTo(" about the accou"); // Check per-column upper bound - Map upperBounds = datafile.getUpperBounds(); - assertThat(upperBounds).containsEntry(1, "60000"); - assertThat(upperBounds).containsEntry(2, "1499"); - assertThat(upperBounds).containsEntry(3, "P"); - assertThat(upperBounds).containsEntry(4, "466001.28"); - assertThat(upperBounds).containsEntry(5, "1998-08-02"); - assertThat(upperBounds).containsEntry(6, "5-LOW"); - assertThat(upperBounds).containsEntry(7, "Clerk#000001000"); - assertThat(upperBounds).containsEntry(8, "0"); - assertThat(upperBounds).containsEntry(9, "zzle. carefully!"); + MaterializedRow upperBounds = datafile.getUpperBounds(); + assertThat(upperBounds.getField(0)).isEqualTo(60000L); + assertThat(upperBounds.getField(1)).isEqualTo(1499L); + assertThat(upperBounds.getField(2)).isEqualTo("P"); + assertThat(upperBounds.getField(3)).isEqualTo(466001.28); + assertThat(upperBounds.getField(4)).isEqualTo(LocalDate.of(1998, 8, 2)); + assertThat(upperBounds.getField(5)).isEqualTo("5-LOW"); + assertThat(upperBounds.getField(6)).isEqualTo("Clerk#000001000"); + assertThat(upperBounds.getField(7)).isEqualTo(0); + assertThat(upperBounds.getField(8)).isEqualTo("zzle. carefully!"); assertUpdate("DROP TABLE orders"); } @@ -314,10 +318,10 @@ public void testWithNulls() assertThat(datafile.getNullValueCounts()).containsEntry(4, (Long) 2L); // Check per-column lower bound - assertThat(datafile.getLowerBounds()).containsEntry(1, "3"); - assertThat(datafile.getLowerBounds()).containsEntry(2, "3.4"); - assertThat(datafile.getLowerBounds()).containsEntry(3, "aaa"); - assertThat(datafile.getLowerBounds()).containsEntry(4, "2020-01-01T00:00:00.123"); + assertThat(datafile.getLowerBounds().getField(0)).isEqualTo(3); + assertThat(datafile.getLowerBounds().getField(1)).isEqualTo(3.4f); + assertThat(datafile.getLowerBounds().getField(2)).isEqualTo("aaa"); + assertThat(datafile.getLowerBounds().getField(3)).isEqualTo(LocalDateTime.of(2020, 1, 1, 0, 0, 0, 123000000)); assertUpdate("DROP TABLE test_with_nulls"); @@ -357,10 +361,10 @@ public void testWithNaNs() assertThat(datafile.getNanValueCounts()).containsEntry(2, (Long) 1L); assertThat(datafile.getNanValueCounts()).containsEntry(3, (Long) 1L); - assertThat(datafile.getLowerBounds().get(2)).isNull(); - assertThat(datafile.getLowerBounds().get(3)).isNull(); - assertThat(datafile.getUpperBounds().get(2)).isNull(); - assertThat(datafile.getUpperBounds().get(3)).isNull(); + assertThat(datafile.getLowerBounds().getField(1)).isNull(); + assertThat(datafile.getLowerBounds().getField(2)).isNull(); + assertThat(datafile.getUpperBounds().getField(1)).isNull(); + assertThat(datafile.getUpperBounds().getField(2)).isNull(); assertUpdate("DROP TABLE test_with_nans"); } @@ -378,27 +382,27 @@ public void testNestedTypes() assertThat(materializedResult.getRowCount()).isEqualTo(1); DataFileRecord datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0)); - Map lowerBounds = datafile.getLowerBounds(); - Map upperBounds = datafile.getUpperBounds(); + MaterializedRow lowerBounds = datafile.getLowerBounds(); + MaterializedRow upperBounds = datafile.getUpperBounds(); // Only // 1. top-level primitive columns // 2. and nested primitive fields that are not descendants of LISTs or MAPs // should appear in lowerBounds or UpperBounds - assertThat(lowerBounds).hasSize(3); - assertThat(upperBounds).hasSize(3); + assertThat(lowerBounds.getFields()).hasSize(4); + assertThat(upperBounds.getFields()).hasSize(4); // col1 - assertThat(lowerBounds).containsEntry(1, "-9"); - assertThat(upperBounds).containsEntry(1, "8"); + assertThat(lowerBounds.getField(0)).isEqualTo(-9); + assertThat(upperBounds.getField(0)).isEqualTo(8); // col2.f1 (key in lowerBounds/upperBounds is Iceberg ID) - assertThat(lowerBounds).containsEntry(3, "0"); - assertThat(upperBounds).containsEntry(3, "10"); + assertThat(lowerBounds.getField(1)).isEqualTo(0); + assertThat(upperBounds.getField(1)).isEqualTo(10); // col2.f3 (key in lowerBounds/upperBounds is Iceberg ID) - assertThat(lowerBounds).containsEntry(5, "-2.9"); - assertThat(upperBounds).containsEntry(5, "4.9"); + assertThat(lowerBounds.getField(3)).isEqualTo(-2.9); + assertThat(upperBounds.getField(3)).isEqualTo(4.9); assertUpdate("DROP TABLE test_nested_types"); } @@ -428,12 +432,12 @@ public void testWithTimestamps() datafile.getNullValueCounts().values().forEach(nullValueCount -> assertThat(nullValueCount).isEqualTo((Long) 0L)); // Check column lower bound. Min timestamp doesn't rely on file-level statistics and will not be truncated to milliseconds. - assertThat(datafile.getLowerBounds()).containsEntry(1, "2021-01-01T00:00:00.111"); - assertQuery("SELECT min(_timestamp) FROM test_timestamp", "VALUES '2021-01-01 00:00:00.111111'"); + assertThat(datafile.getLowerBounds().getField(0)).isEqualTo(LocalDateTime.of(2021, 1, 1, 0, 0, 0, 111000000)); + assertQuery("SELECT min(_timestamp) FROM test_timestamp", "VALUES TIMESTAMP '2021-01-01 00:00:00.111111'"); // Check column upper bound. Max timestamp doesn't rely on file-level statistics and will not be truncated to milliseconds. - assertThat(datafile.getUpperBounds()).containsEntry(1, "2021-01-31T00:00:00.333999"); - assertQuery("SELECT max(_timestamp) FROM test_timestamp", "VALUES '2021-01-31 00:00:00.333333'"); + assertThat(datafile.getUpperBounds().getField(0)).isEqualTo(LocalDateTime.of(2021, 1, 31, 0, 0, 0, 333999000)); + assertQuery("SELECT max(_timestamp) FROM test_timestamp", "VALUES TIMESTAMP '2021-01-31 00:00:00.333333'"); assertUpdate("DROP TABLE test_timestamp"); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java index 140b40b8130d..59ea95a4bac1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergReadVersionedTable.java @@ -98,13 +98,22 @@ public void testEndVersionInTableNameAndForClauseShouldFail() } @Test - public void testSystemTables() + public void testSystemTableVersioning() { // TODO https://github.com/trinodb/trino/issues/12920 - assertQueryFails("SELECT * FROM \"test_iceberg_read_versioned_table$partitions\" FOR VERSION AS OF " + v1SnapshotId, + assertQueryFails("SELECT * FROM \"test_iceberg_read_versioned_table$files\" FOR VERSION AS OF " + v1SnapshotId, "This connector does not support versioned tables"); } + @Test + public void testSystemViewVersioning() + { + // TODO https://github.com/trinodb/trino/pull/28997/changes#r3216694193 + // this test is similar to the testSystemTableVersioning but errors differently because it is a view + assertQueryFails("SELECT * FROM \"test_iceberg_read_versioned_table$partitions\" FOR VERSION AS OF " + v1SnapshotId, + "Views do not support currently versioning"); + } + private long getLatestSnapshotId(String tableName) { return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName)); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index dbdc5f62cf09..456256ff6b60 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -1143,8 +1143,8 @@ public void testFilesTable() JSON '{"1":25,"2":25,"3":25,"4":25}', jSON '{"1":0,"2":0,"3":0,"4":0}', jSON '{}', - JSON '{"1":"0","2":"ALGERIA","3":"0","4":" haggle. careful"}', - JSON '{"1":"24","2":"VIETNAM","3":"4","4":"y final packaget"}', + JSON '{"1":0,"2":"ALGERIA","3":0,"4":" haggle. careful"}', + JSON '{"1":24,"2":"VIETNAM","3":4,"4":"y final packaget"}', null, ARRAY[4L], null, @@ -1161,8 +1161,8 @@ public void testFilesTable() JSON '{"1":5,"2":3,"3":2}', JSON '{"1":0,"2":2}', JSON '{"4":1}', - JSON '{"1":"0"}', - JSON '{"1":"4"}', + JSON '{"1":0,"2":null,"3":null,"4":null}', + JSON '{"1":4,"2":null,"3":null,"4":null}', X'54 72 69 6e 6f', ARRAY[4L], null, @@ -1179,8 +1179,8 @@ public void testFilesTable() JSON '{"3":1}', JSON '{"3":0}', JSON '{}', - JSON '{"3":"1"}', - JSON '{"3":"1"}', + JSON '{"1":null,"2":null,"3":1,"4":null}', + JSON '{"1":null,"2":null,"3":1,"4":null}', null, ARRAY[4], ARRAY[3], diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index e44b9e2757ed..b2b26a0cdb33 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -31,6 +31,7 @@ import io.trino.plugin.iceberg.TableStatisticsWriter; import io.trino.spi.NodeVersion; import io.trino.spi.TrinoException; +import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorSession; @@ -141,6 +142,7 @@ public void testNonLowercaseNamespace() // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, @@ -182,6 +184,7 @@ public void testSchemaWithInvalidProperties() createNamespaceWithProperties(catalog, namespace, ImmutableMap.of("invalid_property", "test-value")); try { ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index 3b8c7dc08d95..980ea38744a6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -155,6 +155,7 @@ public void testNonLowercaseGlueDatabase() // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java index 61a983811cd5..4bbd91397753 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java @@ -201,6 +201,7 @@ public void testNonLowercaseNamespace() // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index 8c84c026596e..b3faa9f07dbb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -133,6 +133,7 @@ public void testNonLowercaseNamespace() // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java index d45de9f92692..6a410d49ecd6 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java @@ -226,6 +226,7 @@ public void testNonLowercaseNamespace() // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class ConnectorMetadata icebergMetadata = new IcebergMetadata( + new CatalogName("iceberg"), PLANNER_CONTEXT.getTypeManager(), jsonCodec(CommitTaskData.class), catalog, diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java index dea2a8b15829..a1c8109aca3f 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -115,6 +115,7 @@ import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable; import static io.trino.plugin.hive.util.HiveUtil.isHudiTable; import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.iceberg.IcebergTableName.isIcebergSystemViewName; import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName; import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; import static io.trino.plugin.lakehouse.LakehouseTableProperties.getTableType; @@ -681,12 +682,20 @@ public Map getViews(ConnectorSession s @Override public Optional getView(ConnectorSession session, SchemaTableName viewName) { + if (isIcebergSystemViewName(viewName.getTableName())) { + return icebergMetadata.getView(session, viewName); + } + return hiveMetadata.getView(session, viewName); } @Override public boolean isView(ConnectorSession session, SchemaTableName viewName) { + if (isIcebergSystemViewName(viewName.getTableName())) { + return icebergMetadata.isView(session, viewName); + } + return hiveMetadata.isView(session, viewName); }