diff --git a/build.gradle.kts b/build.gradle.kts index 322cb783f8..12a0045240 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -53,6 +53,7 @@ if (System.getProperty("idea.sync.active").toBoolean()) { eclipse { project { name = ideName } } tasks.named("rat").configure { + mustRunAfter(":polaris-config-docs-site:copyConfigSectionsToSite") // Gradle excludes.add("**/build/**") excludes.add("gradle/wrapper/gradle-wrapper*") @@ -147,6 +148,9 @@ tasks.named("rat").configure { // Rat can't scan binary images excludes.add("**/*.png") + + // Auto-generated site content (copied from docs by copyConfigSectionsToSite) + excludes.add("site/content/**") } tasks.register("buildPythonClient") { diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DataSourceResolver.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DataSourceResolver.java new file mode 100644 index 0000000000..a64d6649ad --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DataSourceResolver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc; + +import javax.sql.DataSource; +import org.apache.polaris.core.context.RealmContext; + +/** + * Service to resolve the correct {@link DataSource} for a given realm. Note: Currently this is + * implemented as a foundation for metastore routing. + */ +public interface DataSourceResolver { + + /** The type of store representing the workload pattern. */ + enum StoreType { + METASTORE, + METRICS, + EVENTS + } + + /** + * Resolves the DataSource for a given realm and store type. + * + * @param realmContext the realm context + * @param storeType the type of store + * @return the resolved DataSource + */ + DataSource resolve(RealmContext realmContext, StoreType storeType); +} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java index cc104e1f39..affb3de8ae 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java @@ -149,6 +149,16 @@ public static DatabaseType inferFromConnection( * caller. */ public InputStream openInitScriptResource(int schemaVersion) { + return openInitScriptResource(schemaVersion, null); + } + + /** + * Open an InputStream that contains data from an init script for a specific {@link + * DataSourceResolver.StoreType}. If a specialized script (e.g., schema-v4-metrics.sql) is not + * found, it falls back to the main script (schema-v4.sql). + */ + public InputStream openInitScriptResource( + int schemaVersion, DataSourceResolver.StoreType storeType) { // Validate schema version is within acceptable range for this database type int latestVersion = getLatestSchemaVersion(); if (schemaVersion <= 0 || schemaVersion > latestVersion) { @@ -158,10 +168,21 @@ public InputStream openInitScriptResource(int schemaVersion) { schemaVersion, this, latestVersion)); } + ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); + if (storeType != null) { + String specializedResourceName = + String.format( + "%s/schema-v%d-%s.sql", + this.getDisplayName(), schemaVersion, storeType.name().toLowerCase(Locale.ROOT)); + InputStream specializedStream = classLoader.getResourceAsStream(specializedResourceName); + if (specializedStream != null) { + return specializedStream; + } + } + final String resourceName = String.format("%s/schema-v%d.sql", this.getDisplayName(), schemaVersion); - ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); InputStream stream = classLoader.getResourceAsStream(resourceName); if (stream == null) { throw new IllegalStateException( diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java index 4006ef28af..33bb8c6ab2 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java @@ -94,6 +94,33 @@ DatabaseType getDatabaseType() { return databaseType; } + /** + * Checks if a table exists in the database. + * + * @param tableName The name of the table to check for. + * @return true if the table exists, false otherwise. + * @throws SQLException if a database access error occurs. + */ + public boolean tableExists(String tableName) throws SQLException { + try (Connection connection = borrowConnection()) { + var metaData = connection.getMetaData(); + // Try uppercase first (Standard/H2) + try (ResultSet resultSet = + metaData.getTables( + null, null, tableName.toUpperCase(Locale.ROOT), new String[] {"TABLE"})) { + if (resultSet.next()) { + return true; + } + } + // Try lowercase (Postgres/CockroachDB) + try (ResultSet resultSet = + metaData.getTables( + null, null, tableName.toLowerCase(Locale.ROOT), new String[] {"TABLE"})) { + return resultSet.next(); + } + } + } + /** * Execute SQL script and close the associated input stream * @@ -326,7 +353,8 @@ private boolean isRetryable(SQLException e) { || e.getMessage().toLowerCase(Locale.ROOT).contains("connection reset"); } - // TODO: consider refactoring to use a retry library, inorder to have fair retries + // TODO: consider refactoring to use a retry library, inorder to have fair + // retries // and more knobs for tuning retry pattern. @VisibleForTesting T withRetries(Operation operation) throws SQLException { @@ -347,7 +375,8 @@ T withRetries(Operation operation) throws SQLException { } catch (SQLException | RuntimeException e) { SQLException sqlException; if (e instanceof RuntimeException) { - // Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, ignore RTE from + // Handle Exceptions from ResultSet Iterator consumer, as it throws a RTE, + // ignore RTE from // the transactions. if (e.getCause() instanceof SQLException && !(e instanceof EntityAlreadyExistsException)) { diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DefaultDataSourceResolver.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DefaultDataSourceResolver.java new file mode 100644 index 0000000000..bc970d95d5 --- /dev/null +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DefaultDataSourceResolver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import javax.sql.DataSource; +import org.apache.polaris.core.context.RealmContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation of {@link DataSourceResolver} that routes all realms to a single default + * {@link DataSource}. + */ +@ApplicationScoped +@Identifier("default") +public class DefaultDataSourceResolver implements DataSourceResolver { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDataSourceResolver.class); + + private final Instance defaultDataSource; + + @Inject + public DefaultDataSourceResolver(@Any Instance defaultDataSource) { + this.defaultDataSource = defaultDataSource; + } + + @Override + public DataSource resolve(RealmContext realmContext, StoreType storeType) { + LOGGER.debug( + "Using default DataSource for realm '{}' and store '{}'", + realmContext.getRealmIdentifier(), + storeType); + return defaultDataSource.get(); + } +} diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index b154a610d5..0d80041b34 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -87,7 +87,9 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBasePersistenceImpl.class); private final PolarisDiagnostics diagnostics; - private final DatasourceOperations datasourceOperations; + private final DatasourceOperations metastoreOps; + private final DatasourceOperations metricsOps; + private final DatasourceOperations eventOps; private final PrincipalSecretsGenerator secretsGenerator; private final PolarisStorageIntegrationProvider storageIntegrationProvider; private final String realmId; @@ -98,13 +100,17 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers public JdbcBasePersistenceImpl( PolarisDiagnostics diagnostics, - DatasourceOperations databaseOperations, + DatasourceOperations metastoreOps, + DatasourceOperations metricsOps, + DatasourceOperations eventOps, PrincipalSecretsGenerator secretsGenerator, PolarisStorageIntegrationProvider storageIntegrationProvider, String realmId, int schemaVersion) { this.diagnostics = diagnostics; - this.datasourceOperations = databaseOperations; + this.metastoreOps = metastoreOps; + this.metricsOps = metricsOps; + this.eventOps = eventOps; this.secretsGenerator = secretsGenerator; this.storageIntegrationProvider = storageIntegrationProvider; this.realmId = realmId; @@ -129,7 +135,7 @@ public void writeEntity( originalEntity, null, (connection, preparedQuery) -> { - return datasourceOperations.executeUpdate(preparedQuery); + return metastoreOps.executeUpdate(preparedQuery); }); } catch (SQLException e) { throw new RuntimeException("Error persisting entity", e); @@ -142,7 +148,7 @@ public void writeEntities( @Nonnull List entities, List originalEntities) { try { - datasourceOperations.runWithinTransaction( + metastoreOps.runWithinTransaction( connection -> { for (int i = 0; i < entities.size(); i++) { PolarisBaseEntity entity = entities.get(i); @@ -159,8 +165,7 @@ public void writeEntities( // already been updated after the creation. continue; } - persistEntity( - callCtx, entity, originalEntity, connection, datasourceOperations::execute); + persistEntity(callCtx, entity, originalEntity, connection, metastoreOps::execute); } return true; }); @@ -183,7 +188,7 @@ private void persistEntity( if (originalEntity == null) { try { List values = - modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(); + modelEntity.toMap(metastoreOps.getDatabaseType()).values().stream().toList(); queryAction.apply( connection, QueryGenerator.generateInsertQuery( @@ -192,7 +197,7 @@ private void persistEntity( values, realmId)); } catch (SQLException e) { - if (datasourceOperations.isConstraintViolation(e)) { + if (metastoreOps.isConstraintViolation(e)) { PolarisBaseEntity existingEntity = lookupEntityByName( callCtx, @@ -225,7 +230,7 @@ private void persistEntity( realmId); try { List values = - modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(); + modelEntity.toMap(metastoreOps.getDatabaseType()).values().stream().toList(); int rowsUpdated = queryAction.apply( connection, @@ -252,8 +257,8 @@ public void writeToGrantRecords( ModelGrantRecord modelGrantRecord = ModelGrantRecord.fromGrantRecord(grantRec); try { List values = - modelGrantRecord.toMap(datasourceOperations.getDatabaseType()).values().stream().toList(); - datasourceOperations.executeUpdate( + modelGrantRecord.toMap(metastoreOps.getDatabaseType()).values().stream().toList(); + metastoreOps.executeUpdate( QueryGenerator.generateInsertQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, values, realmId)); } catch (SQLException e) { @@ -264,6 +269,10 @@ public void writeToGrantRecords( @Override public void writeEvents(@Nonnull List events) { + if (eventOps == null) { + LOGGER.debug("Skipping events write as no JDBC events store is configured"); + return; + } if (events.isEmpty()) { return; // or throw if empty list is invalid } @@ -275,7 +284,7 @@ public void writeEvents(@Nonnull List events) { ModelEvent.ALL_COLUMNS, ModelEvent.TABLE_NAME, ModelEvent.fromEvent(events.getFirst()) - .toMap(datasourceOperations.getDatabaseType()) + .toMap(eventOps.getDatabaseType()) .values() .stream() .toList(), @@ -292,10 +301,7 @@ public void writeEvents(@Nonnull List events) { QueryGenerator.generateInsertQuery( ModelEvent.ALL_COLUMNS, ModelEvent.TABLE_NAME, - ModelEvent.fromEvent(event) - .toMap(datasourceOperations.getDatabaseType()) - .values() - .stream() + ModelEvent.fromEvent(event).toMap(eventOps.getDatabaseType()).values().stream() .toList(), realmId); @@ -307,7 +313,7 @@ public void writeEvents(@Nonnull List events) { } int totalUpdated = - datasourceOperations.executeBatchUpdate( + eventOps.executeBatchUpdate( new QueryGenerator.PreparedBatchQuery(expectedSql, parametersList)); if (totalUpdated == 0) { @@ -331,7 +337,7 @@ public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBa "realm_id", realmId); try { - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQuery( ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params)); } catch (SQLException e) { @@ -345,10 +351,9 @@ public void deleteFromGrantRecords( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) { ModelGrantRecord modelGrantRecord = ModelGrantRecord.fromGrantRecord(grantRec); try { - Map whereClause = - modelGrantRecord.toMap(datasourceOperations.getDatabaseType()); + Map whereClause = modelGrantRecord.toMap(metastoreOps.getDatabaseType()); whereClause.put("realm_id", realmId); - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, whereClause)); } catch (SQLException e) { @@ -364,7 +369,7 @@ public void deleteAllEntityGrantRecords( @Nonnull List grantsOnGrantee, @Nonnull List grantsOnSecurable) { try { - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQueryForEntityGrantRecords(entity, realmId)); } catch (SQLException e) { throw new RuntimeException( @@ -376,23 +381,23 @@ public void deleteAllEntityGrantRecords( public void deleteAll(@Nonnull PolarisCallContext callCtx) { try { Map params = Map.of("realm_id", realmId); - datasourceOperations.runWithinTransaction( + metastoreOps.runWithinTransaction( connection -> { - datasourceOperations.execute( + metastoreOps.execute( connection, QueryGenerator.generateDeleteQuery( ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params)); - datasourceOperations.execute( + metastoreOps.execute( connection, QueryGenerator.generateDeleteQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, params)); - datasourceOperations.execute( + metastoreOps.execute( connection, QueryGenerator.generateDeleteQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, params)); - datasourceOperations.execute( + metastoreOps.execute( connection, QueryGenerator.generateDeleteQuery( ModelPolicyMappingRecord.ALL_COLUMNS, @@ -400,6 +405,38 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) { params)); return true; }); + + // Clear metrics store if isolated + if (metricsOps.tableExists(ModelScanMetricsReport.TABLE_NAME)) { + metricsOps.runWithinTransaction( + connection -> { + metricsOps.execute( + connection, + QueryGenerator.generateDeleteQuery( + ModelScanMetricsReport.ALL_COLUMNS, + ModelScanMetricsReport.TABLE_NAME, + params)); + metricsOps.execute( + connection, + QueryGenerator.generateDeleteQuery( + ModelCommitMetricsReport.ALL_COLUMNS, + ModelCommitMetricsReport.TABLE_NAME, + params)); + return true; + }); + } + + // Clear events store if isolated + if (eventOps.tableExists(ModelEvent.TABLE_NAME)) { + eventOps.runWithinTransaction( + connection -> { + eventOps.execute( + connection, + QueryGenerator.generateDeleteQuery( + ModelEvent.ALL_COLUMNS, ModelEvent.TABLE_NAME, params)); + return true; + }); + } } catch (SQLException e) { throw new RuntimeException( String.format("Failed to delete all due to %s", e.getMessage()), e); @@ -443,7 +480,7 @@ public PolarisBaseEntity lookupEntityByName( @Nullable private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery query) { try { - var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)); + var results = metastoreOps.executeSelect(query, new ModelEntity(schemaVersion)); if (results.isEmpty()) { return null; } else if (results.size() > 1) { @@ -469,7 +506,7 @@ public List lookupEntities( QueryGenerator.generateSelectQueryWithEntityIds(realmId, schemaVersion, entityIds); try { Map idMap = - datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)).stream() + metastoreOps.executeSelect(query, new ModelEntity(schemaVersion)).stream() .collect( Collectors.toMap( e -> new PolarisEntityId(e.getCatalogId(), e.getId()), Function.identity())); @@ -565,7 +602,7 @@ public Page listEntities( pageToken, ModelEntity.ENTITY_LOOKUP_COLUMNS); AtomicReference> results = new AtomicReference<>(); - datasourceOperations.executeSelectOverStream( + metastoreOps.executeSelectOverStream( query, new EntityNameLookupRecordConverter(), stream -> { @@ -600,7 +637,7 @@ public Page listFullEntities( pageToken, ModelEntity.getAllColumnNames(schemaVersion)); AtomicReference> results = new AtomicReference<>(); - datasourceOperations.executeSelectOverStream( + metastoreOps.executeSelectOverStream( query, new ModelEntity(schemaVersion), stream -> { @@ -651,7 +688,7 @@ public PolarisGrantRecord lookupGrantRecord( realmId); try { var results = - datasourceOperations.executeSelect( + metastoreOps.executeSelect( QueryGenerator.generateSelectQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, params), new ModelGrantRecord()); @@ -683,7 +720,7 @@ public List loadAllGrantRecordsOnSecurable( realmId); try { var results = - datasourceOperations.executeSelect( + metastoreOps.executeSelect( QueryGenerator.generateSelectQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, params), new ModelGrantRecord()); @@ -706,7 +743,7 @@ public List loadAllGrantRecordsOnGrantee( "grantee_catalog_id", granteeCatalogId, "grantee_id", granteeId, "realm_id", realmId); try { var results = - datasourceOperations.executeSelect( + metastoreOps.executeSelect( QueryGenerator.generateSelectQuery( ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, params), new ModelGrantRecord()); @@ -735,7 +772,7 @@ public boolean hasChildren( } try { var results = - datasourceOperations.executeSelect( + metastoreOps.executeSelect( QueryGenerator.generateSelectQuery( ModelEntity.getAllColumnNames(schemaVersion), ModelEntity.TABLE_NAME, params), new ModelEntity(schemaVersion)); @@ -748,18 +785,16 @@ public boolean hasChildren( } } - static int loadSchemaVersion( - DatasourceOperations datasourceOperations, boolean fallbackOnDoesNotExist) { + static int loadSchemaVersion(DatasourceOperations metastoreOps, boolean fallbackOnDoesNotExist) { PreparedQuery query = QueryGenerator.generateVersionQuery(); try { - List schemaVersion = - datasourceOperations.executeSelect(query, new SchemaVersion()); + List schemaVersion = metastoreOps.executeSelect(query, new SchemaVersion()); if (schemaVersion == null || schemaVersion.size() != 1) { throw new RuntimeException("Failed to retrieve schema version"); } return schemaVersion.getFirst().getValue(); } catch (SQLException e) { - if (fallbackOnDoesNotExist && datasourceOperations.isRelationDoesNotExist(e)) { + if (fallbackOnDoesNotExist && metastoreOps.isRelationDoesNotExist(e)) { return SchemaVersion.MINIMUM.getValue(); } LOGGER.error("Failed to load schema version due to {}", e.getMessage(), e); @@ -767,14 +802,13 @@ static int loadSchemaVersion( } } - static boolean entityTableExists(DatasourceOperations datasourceOperations) { + static boolean entityTableExists(DatasourceOperations metastoreOps) { PreparedQuery query = QueryGenerator.generateEntityTableExistQuery(); try { - List entities = - datasourceOperations.executeSelect(query, new ModelEntity()); + List entities = metastoreOps.executeSelect(query, new ModelEntity()); return entities != null && !entities.isEmpty(); } catch (SQLException e) { - if (datasourceOperations.isRelationDoesNotExist(e)) { + if (metastoreOps.isRelationDoesNotExist(e)) { return false; } throw new IllegalStateException("Failed to check if Entities table exists", e); @@ -798,7 +832,7 @@ Optional> hasOverlappingSiblings( QueryGenerator.generateOverlapQuery( realmId, schemaVersion, entity.getCatalogId(), entity.getBaseLocation()); try { - var results = datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)); + var results = metastoreOps.executeSelect(query, new ModelEntity(schemaVersion)); if (!results.isEmpty()) { StorageLocation entityLocation = StorageLocation.of(entity.getBaseLocation()); for (PolarisBaseEntity result : results) { @@ -831,7 +865,7 @@ public PolarisPrincipalSecrets loadPrincipalSecrets( Map params = Map.of("principal_client_id", clientId, "realm_id", realmId); try { var results = - datasourceOperations.executeSelect( + metastoreOps.executeSelect( QueryGenerator.generateSelectQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, @@ -872,9 +906,8 @@ public PolarisPrincipalSecrets generateNewPrincipalSecrets( // write new principal secrets try { List values = - lookupPrincipalSecrets.toMap(datasourceOperations.getDatabaseType()).values().stream() - .toList(); - datasourceOperations.executeUpdate( + lookupPrincipalSecrets.toMap(metastoreOps.getDatabaseType()).values().stream().toList(); + metastoreOps.executeUpdate( QueryGenerator.generateInsertQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, @@ -907,12 +940,12 @@ public PolarisPrincipalSecrets storePrincipalSecrets( try { ModelPrincipalAuthenticationData modelPrincipalAuthenticationData = ModelPrincipalAuthenticationData.fromPrincipalAuthenticationData(principalSecrets); - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateInsertQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, modelPrincipalAuthenticationData - .toMap(datasourceOperations.getDatabaseType()) + .toMap(metastoreOps.getDatabaseType()) .values() .stream() .toList(), @@ -968,12 +1001,12 @@ public PolarisPrincipalSecrets rotatePrincipalSecrets( try { ModelPrincipalAuthenticationData modelPrincipalAuthenticationData = ModelPrincipalAuthenticationData.fromPrincipalAuthenticationData(principalSecrets); - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateUpdateQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, modelPrincipalAuthenticationData - .toMap(datasourceOperations.getDatabaseType()) + .toMap(metastoreOps.getDatabaseType()) .values() .stream() .toList(), @@ -998,7 +1031,7 @@ public void deletePrincipalSecrets( Map params = Map.of("principal_client_id", clientId, "principal_id", principalId, "realm_id", realmId); try { - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQuery( ModelPrincipalAuthenticationData.ALL_COLUMNS, ModelPrincipalAuthenticationData.TABLE_NAME, @@ -1018,7 +1051,7 @@ public void deletePrincipalSecrets( public void writeToPolicyMappingRecords( @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord record) { try { - datasourceOperations.runWithinTransaction( + metastoreOps.runWithinTransaction( connection -> { PolicyType policyType = PolicyType.fromCode(record.getPolicyTypeCode()); Preconditions.checkArgument( @@ -1026,10 +1059,7 @@ public void writeToPolicyMappingRecords( ModelPolicyMappingRecord modelPolicyMappingRecord = ModelPolicyMappingRecord.fromPolicyMappingRecord(record); List values = - modelPolicyMappingRecord - .toMap(datasourceOperations.getDatabaseType()) - .values() - .stream() + modelPolicyMappingRecord.toMap(metastoreOps.getDatabaseType()).values().stream() .toList(); PreparedQuery insertPolicyMappingQuery = QueryGenerator.generateInsertQuery( @@ -1040,7 +1070,7 @@ public void writeToPolicyMappingRecords( if (policyType.isInheritable()) { return handleInheritablePolicy(callCtx, record, insertPolicyMappingQuery, connection); } else { - datasourceOperations.execute(connection, insertPolicyMappingQuery); + metastoreOps.execute(connection, insertPolicyMappingQuery); } return true; }); @@ -1090,16 +1120,13 @@ private boolean handleInheritablePolicy( QueryGenerator.generateUpdateQuery( ModelPolicyMappingRecord.ALL_COLUMNS, ModelPolicyMappingRecord.TABLE_NAME, - modelPolicyMappingRecord - .toMap(datasourceOperations.getDatabaseType()) - .values() - .stream() + modelPolicyMappingRecord.toMap(metastoreOps.getDatabaseType()).values().stream() .toList(), updateClause); - datasourceOperations.execute(connection, updateQuery); + metastoreOps.execute(connection, updateQuery); } else { // record doesn't exist do an insert. - datasourceOperations.executeUpdate(insertQuery); + metastoreOps.executeUpdate(insertQuery); } return true; } @@ -1110,9 +1137,9 @@ public void deleteFromPolicyMappingRecords( var modelPolicyMappingRecord = ModelPolicyMappingRecord.fromPolicyMappingRecord(record); try { Map objectMap = - modelPolicyMappingRecord.toMap(datasourceOperations.getDatabaseType()); + modelPolicyMappingRecord.toMap(metastoreOps.getDatabaseType()); objectMap.put("realm_id", realmId); - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQuery( ModelPolicyMappingRecord.ALL_COLUMNS, ModelPolicyMappingRecord.TABLE_NAME, @@ -1141,7 +1168,7 @@ public void deleteAllEntityPolicyMappingRecords( queryParams.put("target_id", entity.getId()); } queryParams.put("realm_id", realmId); - datasourceOperations.executeUpdate( + metastoreOps.executeUpdate( QueryGenerator.generateDeleteQuery( ModelPolicyMappingRecord.ALL_COLUMNS, ModelPolicyMappingRecord.TABLE_NAME, @@ -1241,7 +1268,7 @@ public List loadAllTargetsOnPolicy( private List fetchPolicyMappingRecords( QueryGenerator.PreparedQuery query) { try { - var results = datasourceOperations.executeSelect(query, new ModelPolicyMappingRecord()); + var results = metastoreOps.executeSelect(query, new ModelPolicyMappingRecord()); return results == null ? Collections.emptyList() : results; } catch (SQLException e) { throw new RuntimeException( @@ -1288,7 +1315,7 @@ private interface QueryAction { /** Returns the datasource operations to use for metrics persistence. */ private DatasourceOperations getMetricsDatasource() { - return datasourceOperations; + return metricsOps; } @Override @@ -1306,15 +1333,19 @@ public void writeCommitReport(@Nonnull CommitMetricsRecord record) { // ========== Internal Metrics JDBC methods ========== private void writeScanMetricsReport(@Nonnull ModelScanMetricsReport report) { - DatasourceOperations metricsOps = getMetricsDatasource(); + DatasourceOperations metricsOpsToUse = getMetricsDatasource(); + if (metricsOpsToUse == null) { + LOGGER.debug("Skipping scan metrics report write as no JDBC metrics store is configured"); + return; + } try { PreparedQuery pq = QueryGenerator.generateInsertQuery( ModelScanMetricsReport.ALL_COLUMNS, ModelScanMetricsReport.TABLE_NAME, - report.toMap(metricsOps.getDatabaseType()).values().stream().toList(), + report.toMap(metricsOpsToUse.getDatabaseType()).values().stream().toList(), realmId); - metricsOps.executeUpdate(pq); + metricsOpsToUse.executeUpdate(pq); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to write scan metrics report due to %s", e.getMessage()), e); @@ -1322,15 +1353,19 @@ private void writeScanMetricsReport(@Nonnull ModelScanMetricsReport report) { } private void writeCommitMetricsReport(@Nonnull ModelCommitMetricsReport report) { - DatasourceOperations metricsOps = getMetricsDatasource(); + DatasourceOperations metricsOpsToUse = getMetricsDatasource(); + if (metricsOpsToUse == null) { + LOGGER.debug("Skipping commit metrics report write as no JDBC metrics store is configured"); + return; + } try { PreparedQuery pq = QueryGenerator.generateInsertQuery( ModelCommitMetricsReport.ALL_COLUMNS, ModelCommitMetricsReport.TABLE_NAME, - report.toMap(metricsOps.getDatabaseType()).values().stream().toList(), + report.toMap(metricsOpsToUse.getDatabaseType()).values().stream().toList(), realmId); - metricsOps.executeUpdate(pq); + metricsOpsToUse.executeUpdate(pq); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to write commit metrics report due to %s", e.getMessage()), e); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtils.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtils.java index 3e0391ca79..a1a663370c 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtils.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtils.java @@ -58,7 +58,8 @@ public static int getRealmBootstrapSchemaVersion( return 1; } } else { - // A truly fresh start. Default to latest version for this database type for auto-detection, + // A truly fresh start. Default to latest version for this database type for + // auto-detection, // otherwise use the specified version. return requiredSchemaVersion == -1 ? latestSchemaVersion : requiredSchemaVersion; } @@ -66,7 +67,8 @@ public static int getRealmBootstrapSchemaVersion( // Handle auto-detection on an existing installation (current version > 0). if (requiredSchemaVersion == -1) { - // Use the current version if realms already exist; otherwise, use latest version for the new + // Use the current version if realms already exist; otherwise, use latest + // version for the new // realm. return hasAlreadyBootstrappedRealms ? currentSchemaVersion : latestSchemaVersion; } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index 032934bae1..52467d9f84 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -23,7 +23,6 @@ import io.smallrye.common.annotation.Identifier; import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import java.sql.SQLException; import java.time.Clock; @@ -72,10 +71,15 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { final Map> sessionSupplierMap = new HashMap<>(); @Inject Clock clock; + @Inject PolarisDiagnostics diagnostics; + @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; - @Inject Instance dataSource; + + @Inject DataSourceResolver dataSourceResolver; + @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; + @Inject RealmConfig realmConfig; protected JdbcMetaStoreManagerFactory() {} @@ -94,7 +98,7 @@ protected PolarisMetaStoreManager createNewMetaStoreManager() { } private void initializeForRealm( - DatasourceOperations datasourceOperations, + DatasourceOperations metastoreOps, RealmContext realmContext, RootCredentialsSet rootCredentialsSet) { // Materialize realmId so that background tasks that don't have an active @@ -103,15 +107,22 @@ private void initializeForRealm( // determine schemaVersion once per realm final int schemaVersion = JdbcBasePersistenceImpl.loadSchemaVersion( - datasourceOperations, + metastoreOps, realmConfig.getConfig(BehaviorChangeConfiguration.SCHEMA_VERSION_FALL_BACK_ON_DNE)); + DatasourceOperations metricsOps = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.METRICS); + DatasourceOperations eventOps = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.EVENTS); + sessionSupplierMap.put( realmId, () -> new JdbcBasePersistenceImpl( diagnostics, - datasourceOperations, + metastoreOps, + metricsOps, + eventOps, secretsGenerator(realmId, rootCredentialsSet), storageIntegrationProvider, realmId, @@ -121,14 +132,17 @@ private void initializeForRealm( metaStoreManagerMap.put(realmId, metaStoreManager); } - public DatasourceOperations getDatasourceOperations() { - DatasourceOperations databaseOperations; + public DatasourceOperations getDatasourceOperations( + RealmContext realmContext, DataSourceResolver.StoreType storeType) { try { - databaseOperations = new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); + DataSource resolvedDs = dataSourceResolver.resolve(realmContext, storeType); + if (resolvedDs == null) { + return null; + } + return new DatasourceOperations(resolvedDs, relationalJdbcConfiguration); } catch (SQLException sqlException) { throw new RuntimeException(sqlException); } - return databaseOperations; } @Override @@ -154,32 +168,48 @@ public synchronized Map bootstrapRealms( for (String realm : bootstrapOptions.realms()) { RealmContext realmContext = () -> realm; if (!metaStoreManagerMap.containsKey(realm)) { - DatasourceOperations datasourceOperations = getDatasourceOperations(); - int currentSchemaVersion = - JdbcBasePersistenceImpl.loadSchemaVersion(datasourceOperations, true); + DatasourceOperations metastoreOps = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.METASTORE); + DatasourceOperations metricsOps = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.METRICS); + DatasourceOperations eventOps = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.EVENTS); + + int currentSchemaVersion = JdbcBasePersistenceImpl.loadSchemaVersion(metastoreOps, true); int requestedSchemaVersion = JdbcBootstrapUtils.getRequestedSchemaVersion(bootstrapOptions); int effectiveSchemaVersion = JdbcBootstrapUtils.getRealmBootstrapSchemaVersion( - datasourceOperations.getDatabaseType(), + metastoreOps.getDatabaseType(), currentSchemaVersion, requestedSchemaVersion, - JdbcBasePersistenceImpl.entityTableExists(datasourceOperations)); + JdbcBasePersistenceImpl.entityTableExists(metastoreOps)); LOGGER.info( "Effective schema version: {} for bootstrapping realm: {}", effectiveSchemaVersion, realm); + try { - // Run the set-up script to create the tables. - datasourceOperations.executeScript( - datasourceOperations + // Run the set-up script to create the tables on all data sources. + metastoreOps.executeScript( + metastoreOps + .getDatabaseType() + .openInitScriptResource( + effectiveSchemaVersion, DataSourceResolver.StoreType.METASTORE)); + metricsOps.executeScript( + metricsOps + .getDatabaseType() + .openInitScriptResource( + effectiveSchemaVersion, DataSourceResolver.StoreType.METRICS)); + eventOps.executeScript( + eventOps .getDatabaseType() - .openInitScriptResource(effectiveSchemaVersion)); + .openInitScriptResource( + effectiveSchemaVersion, DataSourceResolver.StoreType.EVENTS)); } catch (SQLException e) { throw new RuntimeException( String.format("Error executing sql script: %s", e.getMessage()), e); } - initializeForRealm( - datasourceOperations, realmContext, bootstrapOptions.rootCredentialsSet()); + initializeForRealm(metastoreOps, realmContext, bootstrapOptions.rootCredentialsSet()); PolarisMetaStoreManager metaStoreManager = metaStoreManagerMap.get(realmContext.getRealmIdentifier()); @@ -219,7 +249,8 @@ public Map purgeRealms(Iterable realms) { public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( RealmContext realmContext) { if (!metaStoreManagerMap.containsKey(realmContext.getRealmIdentifier())) { - DatasourceOperations datasourceOperations = getDatasourceOperations(); + DatasourceOperations datasourceOperations = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.METASTORE); initializeForRealm(datasourceOperations, realmContext, null); checkPolarisServiceBootstrappedForRealm(realmContext); } @@ -229,7 +260,8 @@ public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( @Override public synchronized BasePersistence getOrCreateSession(RealmContext realmContext) { if (!sessionSupplierMap.containsKey(realmContext.getRealmIdentifier())) { - DatasourceOperations datasourceOperations = getDatasourceOperations(); + DatasourceOperations datasourceOperations = + getDatasourceOperations(realmContext, DataSourceResolver.StoreType.METASTORE); initializeForRealm(datasourceOperations, realmContext, null); } checkPolarisServiceBootstrappedForRealm(realmContext); diff --git a/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-events.sql b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-events.sql new file mode 100644 index 0000000000..2a244098d1 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-events.sql @@ -0,0 +1,45 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Events schema for v4 (CockroachDB) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INT4 NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); diff --git a/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metastore.sql b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metastore.sql new file mode 100644 index 0000000000..cc33b0b3c3 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metastore.sql @@ -0,0 +1,112 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metastore schema for v4 (CockroachDB) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INT4 NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT4 NOT NULL, + type_code INT4 NOT NULL, + sub_type_code INT4 NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties JSONB not null default '{}'::JSONB, + internal_properties JSONB not null default '{}'::JSONB, + grant_records_version INT4 NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations + ON entities USING btree (realm_id, parent_id, location_without_scheme) + WHERE location_without_scheme IS NOT NULL; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INT4, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INT4 NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + http_status INT4, + error_subtype TEXT, + response_summary TEXT, + response_headers TEXT, + finalized_at TIMESTAMP, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, + executor_id TEXT, + expires_at TIMESTAMP, + PRIMARY KEY (realm_id, idempotency_key) +); + +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); diff --git a/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metrics.sql b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metrics.sql new file mode 100644 index 0000000000..da7d7bee02 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v4-metrics.sql @@ -0,0 +1,109 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metrics schema for v4 (CockroachDB) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INT4 NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT, + schema_id INT4, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + metadata JSONB DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_duration_ms BIGINT DEFAULT 0, + attempts INT4 DEFAULT 1, + metadata JSONB DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); diff --git a/persistence/relational-jdbc/src/main/resources/h2/schema-v4-events.sql b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-events.sql new file mode 100644 index 0000000000..fe1c351adf --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-events.sql @@ -0,0 +1,44 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Events schema for v4 (H2) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key VARCHAR PRIMARY KEY, + version_value INTEGER NOT NULL +); +MERGE INTO version (version_key, version_value) + KEY (version_key) + VALUES ('version', 4); +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties TEXT NOT NULL, + PRIMARY KEY (event_id) +); diff --git a/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metastore.sql b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metastore.sql new file mode 100644 index 0000000000..8a145fce0f --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metastore.sql @@ -0,0 +1,109 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metastore schema for v4 (H2) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key VARCHAR PRIMARY KEY, + version_value INTEGER NOT NULL +); +MERGE INTO version (version_key, version_value) + KEY (version_key) + VALUES ('version', 4); +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties TEXT NOT NULL DEFAULT '{}', + internal_properties TEXT NOT NULL DEFAULT '{}', + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations ON entities(realm_id, catalog_id, location_without_scheme); + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + http_status INTEGER, + error_subtype TEXT, + response_summary TEXT, + response_headers TEXT, + finalized_at TIMESTAMP, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, + executor_id TEXT, + expires_at TIMESTAMP, + PRIMARY KEY (realm_id, idempotency_key) +); + +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); diff --git a/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metrics.sql b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metrics.sql new file mode 100644 index 0000000000..ab4fefe97e --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/h2/schema-v4-metrics.sql @@ -0,0 +1,108 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metrics schema for v4 (H2) +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key VARCHAR PRIMARY KEY, + version_value INTEGER NOT NULL +); +MERGE INTO version (version_key, version_value) + KEY (version_key) + VALUES ('version', 4); +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT, + schema_id INTEGER, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + metadata TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_duration_ms BIGINT DEFAULT 0, + attempts INTEGER DEFAULT 1, + metadata TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-events.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-events.sql new file mode 100644 index 0000000000..af80fcb80a --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-events.sql @@ -0,0 +1,45 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Events schema for v4 +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metastore.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metastore.sql new file mode 100644 index 0000000000..e43f72a924 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metastore.sql @@ -0,0 +1,118 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metastore schema for v4 +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties JSONB not null default '{}'::JSONB, + internal_properties JSONB not null default '{}'::JSONB, + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations + ON entities USING btree (realm_id, parent_id, location_without_scheme) + WHERE location_without_scheme IS NOT NULL; + +COMMENT ON TABLE entities IS 'all the entities'; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + http_status INTEGER, + error_subtype TEXT, + response_summary TEXT, + response_headers TEXT, + finalized_at TIMESTAMP, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, + executor_id TEXT, + expires_at TIMESTAMP, + PRIMARY KEY (realm_id, idempotency_key) +); + +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metrics.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metrics.sql new file mode 100644 index 0000000000..e3541787b5 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v4-metrics.sql @@ -0,0 +1,109 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Metrics schema for v4 +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT, + schema_id INTEGER, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + metadata JSONB DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_duration_ms BIGINT DEFAULT 0, + attempts INTEGER DEFAULT 1, + metadata JSONB DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, report_id) +); + +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index ec1bc59691..12127cb118 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -31,6 +31,7 @@ import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.h2.jdbcx.JdbcConnectionPool; import org.mockito.Mockito; @@ -69,8 +70,10 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { new JdbcBasePersistenceImpl( diagServices, datasourceOperations, + datasourceOperations, + datasourceOperations, RANDOM_SECRETS, - Mockito.mock(), + Mockito.mock(PolarisStorageIntegrationProvider.class), realmContext.getRealmIdentifier(), schemaVersion()); AtomicOperationMetaStoreManager metaStoreManager = diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/MetricsReportPersistenceTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/MetricsReportPersistenceTest.java index f2ae29972f..d6eafc39b0 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/MetricsReportPersistenceTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/MetricsReportPersistenceTest.java @@ -74,6 +74,8 @@ PolarisStorageIntegration getStorageIntegrationForConfig( new JdbcBasePersistenceImpl( diagnostics, datasourceOperations, + datasourceOperations, + datasourceOperations, PrincipalSecretsGenerator.RANDOM_SECRETS, storageProvider, "TEST_REALM", diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index ee95671dcd..61613b52ba 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -80,7 +80,7 @@ public Optional initialDelayInMs() { @Override public Optional databaseType() { - return Optional.empty(); + return Optional.of("postgresql"); } }; DatasourceOperations ops = new DatasourceOperations(dataSource, cfg); diff --git a/runtime/common/build.gradle.kts b/runtime/common/build.gradle.kts index e930a3e13c..aa118c89d6 100644 --- a/runtime/common/build.gradle.kts +++ b/runtime/common/build.gradle.kts @@ -24,6 +24,7 @@ plugins { dependencies { compileOnly(libs.smallrye.config.core) + implementation(project(":polaris-core")) implementation(project(":polaris-relational-jdbc")) implementation(platform(libs.quarkus.amazon.services.bom)) implementation("io.quarkiverse.amazonservices:quarkus-amazon-rds") diff --git a/runtime/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java b/runtime/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java index 7eba6eaad7..fbd7cff76b 100644 --- a/runtime/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java +++ b/runtime/common/src/main/java/org/apache/polaris/quarkus/common/config/jdbc/QuarkusRelationalJdbcConfiguration.java @@ -19,7 +19,13 @@ package org.apache.polaris.quarkus.common.config.jdbc; import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; @ConfigMapping(prefix = "polaris.persistence.relational.jdbc") -public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration {} +public interface QuarkusRelationalJdbcConfiguration extends RelationalJdbcConfiguration { + @WithName("datasource-resolver.type") + @WithDefault("default") + String dataSourceResolverType(); +} diff --git a/runtime/common/src/main/java/org/apache/polaris/quarkus/common/persistence/jdbc/JdbcCdiProducers.java b/runtime/common/src/main/java/org/apache/polaris/quarkus/common/persistence/jdbc/JdbcCdiProducers.java new file mode 100644 index 0000000000..5829c50e74 --- /dev/null +++ b/runtime/common/src/main/java/org/apache/polaris/quarkus/common/persistence/jdbc/JdbcCdiProducers.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.quarkus.common.persistence.jdbc; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import org.apache.polaris.persistence.relational.jdbc.DataSourceResolver; +import org.apache.polaris.quarkus.common.config.jdbc.QuarkusRelationalJdbcConfiguration; + +/** + * CDI producers for JDBC-specific beans. Moved to runtime-common to keep the persistence layer + * implementation-agnostic regarding configuration sources. + */ +public class JdbcCdiProducers { + + /** + * Produces the active {@link DataSourceResolver} by selecting the bean identified by {@link + * QuarkusRelationalJdbcConfiguration#dataSourceResolverType()}. + * + *

The result is {@link ApplicationScoped} because the datasource-resolver type cannot change + * at runtime. + */ + @Produces + @ApplicationScoped + public DataSourceResolver dataSourceResolver( + QuarkusRelationalJdbcConfiguration jdbcConfig, + @Any Instance dataSourceResolvers) { + String type = jdbcConfig.dataSourceResolverType(); + return dataSourceResolvers.select(Identifier.Literal.of(type)).get(); + } +} diff --git a/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_persistence_relational_jdbc.md b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_persistence_relational_jdbc.md index e50f686542..90b77f36e1 100644 --- a/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_persistence_relational_jdbc.md +++ b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_persistence_relational_jdbc.md @@ -25,6 +25,7 @@ build: | Property | Default Value | Type | Description | |----------|---------------|------|-------------| +| `polaris.persistence.relational.jdbc.datasource-resolver.type` | `default` | `string` | | | `polaris.persistence.relational.jdbc.max-retries` | | `int` | | | `polaris.persistence.relational.jdbc.max-duration-in-ms` | | `long` | | | `polaris.persistence.relational.jdbc.initial-delay-in-ms` | | `long` | | diff --git a/site/content/in-dev/unreleased/proposals/jdbc-multi-datasource.md b/site/content/in-dev/unreleased/proposals/jdbc-multi-datasource.md new file mode 100644 index 0000000000..2ca16a372f --- /dev/null +++ b/site/content/in-dev/unreleased/proposals/jdbc-multi-datasource.md @@ -0,0 +1,90 @@ + +# Design: Multi-DataSource support in JDBC persistence + +## Goal +The goal of this design is to decouple `DataSource` selection from the core JDBC persistence logic. This allows for: +1. **Workload Isolation**: Separating the Metastore, Metrics reporting, and Event logging into different physical databases or connection pools. +2. **Scalable Multi-Tenancy**: Enabling per-realm (tenant) routing to support large-scale deployments. + +## Core Interface: `DataSourceResolver` +A service interface designed to resolve the correct `DataSource` based on the workload's metadata. + +```java +public interface DataSourceResolver { + enum StoreType { + METASTORE, + METRICS, + EVENTS + } + + DataSource resolve(RealmContext realmContext, StoreType storeType); +} +``` + +### Key Components +- **`DataSourceResolver`**: SPI for resolution logic. +- **`DefaultDataSourceResolver`**: Backward-compatible implementation that returns the single primary `DataSource` for all requests. +- **`JdbcMetaStoreManagerFactory`**: Overwrites the resolution logic to use the `DataSourceResolver`. +- **`JdbcBasePersistenceImpl`**: Manages three separate `DatasourceOperations` objects. + +## Schema Management and Evolution + +### Functional SQL Script Splitting +To support hosting different workloads on different databases, the monolithic `schema-vX.sql` scripts are split into functional components: +- `schema-vX-metastore.sql`: Definitions for `polaris_entities`, `polaris_grant_records`, and `polaris_schema_version`. +- `schema-vX-metrics.sql`: Definitions for `polaris_metrics_scan` and `polaris_metrics_commit`. +- `schema-vX-events.sql`: Definitions for `polaris_events`. + +### Version Authority +The **Metastore** remains the single source of truth for the realm's logical schema version. The `polaris_schema_version` table is exclusively maintained within the `METASTORE` data source. All associated stores (Metrics, Events) are expected to be physically compatible with this detected version. + +## Operational Behaviors + +### Bootstrap +When a realm is bootstrapped via `JdbcMetaStoreManagerFactory.bootstrapRealms()`: +1. **Resolution**: The `DataSourceResolver` is called for each `StoreType` (METASTORE, METRICS, EVENTS). +2. **Initialization**: Each resolved data source is initialized with its specific functional SQL script. +3. **Idempotency**: If all three `StoreType` mappings point to the same physical database, the scripts are applied sequentially. The DDL is designed to be idempotent to prevent errors during this "merged" initialization. + +### Purge (Cleanup) +When a realm is purged: +1. The `purge()` operation is initiated. +2. The `JdbcBasePersistenceImpl.deleteAll()` method is triggered. +3. **Multi-Store Cleanup**: `deleteAll()` executes `DELETE` commands targeting the specific `realm_id` across all three `DatasourceOperations`: + - `metastoreOps`: Clears entities, grants, secrets, and policy mappings. + - `metricsOps`: Clears `scan_metrics_report` and `commit_metrics_report`. + - `eventOps`: Clears the `events` table. +4. This ensures that all data across all three potential data sources is cleaned up for the specified `realm_id`. + +## Schema Upgrades +Upgrading a Polaris deployment from version N to N+1 involves: +1. **Detection**: The service detects that the `metastore`'s `polaris_schema_version` is below the requested level. +2. **Execution**: The migration logic resolves each `StoreType` and applies the relevant "vN-to-vN+1" upgrade script. +3. **Consistency**: Because the process is unified within the `JdbcMetaStoreManagerFactory`, it provides a single point of failure and ensures that all data sources are either upgraded or rolled back (where possible). + +## Scalable Multi-Tenancy +The SPI allows for sophisticated implementations that can: +- Route "Enterprise" realms to dedicated high-performance clusters. +- Move realms between physical databases by updating the resolver's internal mapping metadata (e.g., during a maintenance window). + +## Benefits +- **Zero Impact on Existing Users**: The default configuration maintains a single-datasource model. +- **Future-Proof**: Provides the necessary hooks for table-level connection pool separation and physical database sharding. +- **Maintainable**: The functional script split clearly defines the table categories and their associated workloads.