Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,9 @@ else if (shouldFailWhenStale(materializedViewDefinition)) {
// This could be a reference to a logical view or a table
Optional<ViewDefinition> optionalView = metadata.getView(session, name);
if (optionalView.isPresent()) {
if (table.getQueryPeriod().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Views do not support currently versioning");
}
analysis.addEmptyColumnReferencesForTable(accessControl, session.getIdentity(), name, getBranchName(table));
return createScopeForView(table, name, scope, optionalView.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import io.trino.plugin.iceberg.system.HistoryTable;
import io.trino.plugin.iceberg.system.ManifestsTable;
import io.trino.plugin.iceberg.system.MetadataLogEntriesTable;
import io.trino.plugin.iceberg.system.PartitionsTable;
import io.trino.plugin.iceberg.system.PartitionsView;
import io.trino.plugin.iceberg.system.PropertiesTable;
import io.trino.plugin.iceberg.system.RefsTable;
import io.trino.plugin.iceberg.system.SnapshotsTable;
Expand All @@ -79,6 +79,7 @@
import io.trino.spi.RefreshType;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogSchemaTableName;
Expand Down Expand Up @@ -496,6 +497,7 @@ public class IcebergMetadata
// Value should be ISO-8601 formatted time instant
private static final String TRINO_QUERY_START_TIME = "trino-query-start-time";

private final CatalogName catalogName;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalog catalog;
Expand All @@ -519,6 +521,7 @@ public class IcebergMetadata
private OptionalLong fromSnapshotForRefresh = OptionalLong.empty();

public IcebergMetadata(
CatalogName catalogName,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalog catalog,
Expand All @@ -536,6 +539,7 @@ public IcebergMetadata(
int materializedViewRefreshMaxSnapshotsToExpire,
Duration materializedViewRefreshSnapshotRetentionPeriod)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalog = requireNonNull(catalog, "catalog is null");
Expand Down Expand Up @@ -892,17 +896,16 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
private Optional<BaseTable> getRawBaseTable(ConnectorSession session, SchemaTableName tableName)
{
if (!isIcebergTableName(tableName.getTableName()) || isDataTable(tableName.getTableName()) || isMaterializedViewStorage(tableName.getTableName())) {
return Optional.empty();
}

// Only when dealing with an actual system table proceed to retrieve the base table for the system table
String name = tableNameFrom(tableName.getTableName());
BaseTable table;
try {
table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name));
return Optional.of(catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name)));
}
catch (TableNotFoundException e) {
return Optional.empty();
Expand All @@ -911,21 +914,43 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
// avoid dealing with non Iceberg tables
return Optional.empty();
}
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
Optional<BaseTable> baseTable = getRawBaseTable(session, tableName);
if (baseTable.isEmpty()) {
return Optional.empty();
}
BaseTable table = baseTable.get();
TableType tableType = IcebergTableName.tableTypeFrom(tableName.getTableName());
return switch (tableType) {
case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above.
case HISTORY -> Optional.of(new HistoryTable(tableName, table));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, icebergScanExecutor));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, icebergScanExecutor));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case ALL_ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ALL_ENTRIES, icebergScanExecutor));
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table, icebergScanExecutor));
default -> Optional.empty();
};
}

private Optional<ConnectorViewDefinition> getRawSystemView(ConnectorSession session, SchemaTableName tableName)
{
Optional<BaseTable> baseTable = getRawBaseTable(session, tableName);
if (baseTable.isEmpty()) {
return Optional.empty();
}
BaseTable table = baseTable.get();
TableType tableType = IcebergTableName.tableTypeFrom(tableName.getTableName());
return switch (tableType) {
case PARTITIONS -> Optional.ofNullable(new PartitionsView(typeManager, table, catalogName.toString(), tableName.getSchemaName(), tableNameFrom(tableName.getTableName())).get());
default -> Optional.empty();
};
}

Expand Down Expand Up @@ -3491,6 +3516,12 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
Optional<ConnectorViewDefinition> systemView = getRawSystemView(session, viewName);

if (systemView.isPresent()) {
return true;
}

try {
return catalog.getView(session, viewName).isPresent();
}
Expand All @@ -3505,6 +3536,12 @@ public boolean isView(ConnectorSession session, SchemaTableName viewName)
@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
Optional<ConnectorViewDefinition> systemView = getRawSystemView(session, viewName);

if (systemView.isPresent()) {
return systemView;
}

return catalog.getView(session, viewName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;

Expand All @@ -36,6 +37,7 @@

public class IcebergMetadataFactory
{
private final CatalogName catalogName;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalogFactory catalogFactory;
Expand All @@ -55,6 +57,7 @@ public class IcebergMetadataFactory

@Inject
public IcebergMetadataFactory(
CatalogName catalogName,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalogFactory catalogFactory,
Expand All @@ -69,6 +72,7 @@ public IcebergMetadataFactory(
@ForIcebergFileDelete ExecutorService icebergFileDeleteExecutor,
IcebergConfig config)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
Expand Down Expand Up @@ -101,6 +105,7 @@ public IcebergMetadataFactory(
public IcebergMetadata create(ConnectorIdentity identity)
{
return new IcebergMetadata(
catalogName,
typeManager,
commitTaskCodec,
catalogFactory.create(identity),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,33 @@ public final class IcebergTableName
private IcebergTableName() {}

private static final Pattern TABLE_PATTERN;
private static final Pattern SYSTEM_VIEW_PATTERN;

static {
String referencableTableTypes = Stream.of(TableType.values())
.filter(tableType -> tableType != DATA)
.map(tableType -> tableType.name().toLowerCase(ENGLISH))
.collect(Collectors.joining("|"));

TABLE_PATTERN = Pattern.compile("" +
"(?<table>[^$@]+)" +
"(?:\\$(?<type>(?i:" + referencableTableTypes + ")))?");

SYSTEM_VIEW_PATTERN = Pattern.compile("" +
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely some clean up around this regex and the corresponding TABLE_PATTERN to be done. Since both are used in similar places but cause failures when PARTITIONS is filtered out from TABLE_PATTERN.

"(?<table>[^$@]+)" +
"(?:\\$(?<type>(?i:partitions)))?");
}

public static boolean isIcebergTableName(String tableName)
{
return TABLE_PATTERN.matcher(tableName).matches();
}

public static boolean isIcebergSystemViewName(String tableName)
{
return SYSTEM_VIEW_PATTERN.matcher(tableName).matches();
}

public static String tableNameWithType(String tableName, TableType tableType)
{
requireNonNull(tableName, "tableName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ private static Stream<NestedField> primitiveFields(NestedField nestedField)
.map(field -> NestedField.from(field).withName(nestedField.name() + "." + field.name()).build());
}

if (type.isVariantType()) {
Comment thread
tbaeg marked this conversation as resolved.
return Stream.empty();
}

throw new IllegalStateException("Unsupported field type: " + nestedField);
}

Expand Down Expand Up @@ -596,7 +600,7 @@ public static String quotedTableName(SchemaTableName name)
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
}

private static String quotedName(String name)
public static String quotedName(String name)
Comment thread
tbaeg marked this conversation as resolved.
{
if (SIMPLE_NAME.matcher(name).matches()) {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFields;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getAllPartitionFields;
import static io.trino.plugin.iceberg.util.SystemTableUtil.getPartitionColumnType;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -116,6 +122,7 @@ public final class FilesTable
private final Table icebergTable;
private final OptionalLong snapshotId;
private final Optional<Type> partitionColumnType;
private final Optional<Type> boundsColumnType;

public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, OptionalLong snapshotId)
{
Expand All @@ -125,12 +132,16 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb
List<PartitionField> partitionFields = getAllPartitionFields(icebergTable);
this.partitionColumnType = getPartitionColumnType(typeManager, partitionFields, icebergTable.schema())
.map(IcebergPartitionColumn::rowType);
this.boundsColumnType = buildBoundsType(icebergTable.schema(), typeManager);

ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (String columnName : COLUMN_NAMES) {
if (columnName.equals(PARTITION_COLUMN_NAME)) {
partitionColumnType.ifPresent(type -> columns.add(new ColumnMetadata(columnName, type)));
}
else if (columnName.equals(LOWER_BOUNDS_COLUMN_NAME) || columnName.equals(UPPER_BOUNDS_COLUMN_NAME)) {
boundsColumnType.ifPresent(type -> columns.add(new ColumnMetadata(columnName, type)));
}
else {
columns.add(new ColumnMetadata(columnName, getColumnType(columnName, typeManager)));
}
Expand Down Expand Up @@ -161,7 +172,8 @@ public Optional<ConnectorSplitSource> splitSource(ConnectorSession connectorSess
icebergTable.specs().entrySet().stream().collect(toImmutableMap(
Map.Entry::getKey,
partitionSpec -> PartitionSpecParser.toJson(partitionSpec.getValue()))),
partitionColumnType));
partitionColumnType,
boundsColumnType));
}

@Override
Expand Down Expand Up @@ -193,13 +205,27 @@ public static Type getColumnType(String columnName, TypeManager typeManager)
NULL_VALUE_COUNTS_COLUMN_NAME,
VALUE_COUNTS_COLUMN_NAME,
NAN_VALUE_COUNTS_COLUMN_NAME -> new MapType(INTEGER, BIGINT, typeManager.getTypeOperators());
case LOWER_BOUNDS_COLUMN_NAME,
UPPER_BOUNDS_COLUMN_NAME -> new MapType(INTEGER, VARCHAR, typeManager.getTypeOperators());
case KEY_METADATA_COLUMN_NAME -> VARBINARY;
case SPLIT_OFFSETS_COLUMN_NAME -> new ArrayType(BIGINT);
case EQUALITY_IDS_COLUMN_NAME -> new ArrayType(INTEGER);
case READABLE_METRICS_COLUMN_NAME -> typeManager.getType(new TypeSignature(JSON));
default -> throw new IllegalArgumentException("Unexpected value: " + columnName);
};
}

private static Optional<Type> buildBoundsType(Schema icebergSchema, TypeManager typeManager)
{
List<Types.NestedField> primitiveFields = primitiveFields(icebergSchema);

if (primitiveFields.isEmpty()) {
return Optional.empty();
}

return Optional.of(RowType.from(primitiveFields.stream()
.map(column -> {
Type trinoType = toTrinoType(column.type(), typeManager);
return RowType.field(String.valueOf(column.fieldId()), trinoType);
})
.collect(toImmutableList())));
}
}
Loading
Loading