-
Notifications
You must be signed in to change notification settings - Fork 448
Introduce DataSourceResolver for multi-datasource support in JDBC per… #3960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
136e381
207aa4c
e024bea
96e7ff0
9a07e6e
75e4096
fccb254
9c22635
1e00328
493c386
6311fbe
a36e745
118e046
dc8ffda
7eb2304
7ceaa69
9df42be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.persistence.relational.jdbc; | ||
|
|
||
| import java.util.Set; | ||
| import javax.sql.DataSource; | ||
|
|
||
| /** | ||
| * Service to resolve the correct {@link DataSource} for a given realm and store | ||
| * type. | ||
| * This enables isolating different workloads (e.g., entity metadata vs metrics | ||
| * vs events) | ||
| * into different physical databases or connection pools. | ||
| */ | ||
| public interface DataSourceResolver { | ||
|
|
||
| String STORE_TYPE_MAIN = "main"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have refactored the store types into the StoreType enum as suggested. This makes the resolution logic much cleaner and more type-safe. |
||
| String STORE_TYPE_METRICS = "metrics"; | ||
| String STORE_TYPE_EVENTS = "events"; | ||
|
|
||
| /** | ||
| * Resolves the DataSource for a given realm and store type. | ||
| * | ||
| * @param realmId the realm identifier | ||
| * @param storeType the type of store (e.g., main, metrics, events) | ||
| * @return the resolved DataSource | ||
| */ | ||
| DataSource resolve(String realmId, String storeType); | ||
|
|
||
| /** | ||
| * Returns all unique DataSources managed by this resolver. | ||
| * This is useful for global operations like schema initialization against all | ||
| * data sources. | ||
| * | ||
| * @return a set of all DataSources | ||
| */ | ||
| Set<DataSource> getAllUniqueDataSources(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine to have the "list all" method, but I tend to think it needs a separate interface because it's a different use case. Specifically, providing a DataSource per realm does not have to be co-location with the code to list all data sources. The former is a Server runtime concerns, the later is more of an Admin tool concern. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.persistence.relational.jdbc; | ||
|
|
||
| import jakarta.enterprise.context.ApplicationScoped; | ||
| import jakarta.enterprise.inject.Instance; | ||
| import jakarta.inject.Inject; | ||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
| import javax.sql.DataSource; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Default implementation of {@link DataSourceResolver} that routes all realms | ||
| * and store types to a | ||
| * single default {@link DataSource}. This serves as both the production default | ||
| * and the base for | ||
| * multi-datasource extensions. | ||
| * | ||
| * <p> | ||
| * To enable per-realm or per-store datasource routing, this class can be | ||
| * extended or replaced | ||
| * with a custom implementation that resolves named datasources based on | ||
| * configuration. | ||
| */ | ||
| @ApplicationScoped | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the current extension story is still a bit fragile here. This default resolver is just an unqualified
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoever introduces an alternative implementation for This is a common pattern in Polaris in a few other extension points.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, let' use |
||
| public class DefaultDataSourceResolver implements DataSourceResolver { | ||
|
dimas-b marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand why this class was placed in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the question of where to place implementations of this interface is a tricky one. When designing a true multi-datasource implementation, it will need the Quarkus Agroal extension, and therefore, would likely have to live in It would look like this: @ApplicationScoped
@Identifier("per-realm")
public class PerRealmDataSourceResolver implements DataSourceResolver {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDataSourceResolver.class);
@Override
public DataSource resolve(RealmContext realmContext, StoreType storeType) {
String dataSourceName = findDataSourceName(realmContext, storeType);
LOGGER.debug(
"Using DataSource '{}' for realm '{}' and store '{}'",
dataSourceName,
realmContext.getRealmIdentifier(),
storeType);
return AgroalDataSourceUtil.dataSourceIfActive(dataSourceName)
.orElseThrow(
() ->
new IllegalStateException(
"DataSource '" + dataSourceName + "' is not active or does not exist"));
}
private String findDataSourceName(RealmContext realmContext, StoreType storeType) {
...
}
}But the problem is that I think this needs some more thinking.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we add a new module like The idea is for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we use Downstream alternatives will have the the option of reusing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a compelling reason not to turn
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only "general principles" kind of reason 🙂 I tend to think it valuable to isolate runtime / CDI concerns from the basic code / logic of a particular component like JDBC persistence. In that regard, ideally, I believe the NoSQL Persistence impl and the OPA Authorizer follow similar principles (perhaps varying in degree, but similar in essence). I also proposed a similar refactoring for the Admin tool in #3947. I do not really know whether this matters for downstream Polaris users right now 🤔 🤷 In my personal experience, I find that it is much easier to include another module into a build than struggle with excluding intruding dependencies 😅 I know some concerns were raised about module proliferation in Polaris, but from my POV, Gradle is able to deal with a large number of modules very well, so I personally do not see it as an issue.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the evolution of this PR (and this class specifically), I think it's probably fine to move CDI/quarkus-related code into the |
||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDataSourceResolver.class); | ||
|
|
||
| private final Instance<DataSource> defaultDataSource; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this have to be an |
||
|
|
||
| @Inject | ||
| public DefaultDataSourceResolver(Instance<DataSource> defaultDataSource) { | ||
| this.defaultDataSource = defaultDataSource; | ||
| } | ||
|
|
||
| @Override | ||
| public DataSource resolve(String realmId, String storeType) { | ||
| LOGGER.debug("Using default DataSource for realm '{}' and store '{}'", realmId, storeType); | ||
| return defaultDataSource.get(); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<DataSource> getAllUniqueDataSources() { | ||
| Set<DataSource> dataSources = new HashSet<>(); | ||
| dataSources.add(defaultDataSource.get()); | ||
| return dataSources; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,7 +57,8 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * The implementation of Configuration interface for configuring the {@link PolarisMetaStoreManager} | ||
| * The implementation of Configuration interface for configuring the | ||
| * {@link PolarisMetaStoreManager} | ||
| * using a JDBC backed by SQL metastore. TODO: refactor - <a | ||
| * href="https://github.com/apache/polaris/pull/1287/files#r2047487588">...</a> | ||
| */ | ||
|
|
@@ -71,14 +72,21 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { | |
| final Map<String, EntityCache> entityCacheMap = new HashMap<>(); | ||
| final Map<String, Supplier<BasePersistence>> sessionSupplierMap = new HashMap<>(); | ||
|
|
||
| @Inject Clock clock; | ||
| @Inject PolarisDiagnostics diagnostics; | ||
| @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; | ||
| @Inject Instance<DataSource> dataSource; | ||
| @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; | ||
| @Inject RealmConfig realmConfig; | ||
|
|
||
| protected JdbcMetaStoreManagerFactory() {} | ||
| @Inject | ||
| Clock clock; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I wonder if the Spotless is going to complain about this 😅 |
||
| @Inject | ||
| PolarisDiagnostics diagnostics; | ||
| @Inject | ||
| PolarisStorageIntegrationProvider storageIntegrationProvider; | ||
| @Inject | ||
| Instance<DataSourceResolver> dataSourceResolver; | ||
| @Inject | ||
| RelationalJdbcConfiguration relationalJdbcConfiguration; | ||
| @Inject | ||
| RealmConfig realmConfig; | ||
|
|
||
| protected JdbcMetaStoreManagerFactory() { | ||
| } | ||
|
|
||
| protected PrincipalSecretsGenerator secretsGenerator( | ||
| String realmId, @Nullable RootCredentialsSet rootCredentialsSet) { | ||
|
|
@@ -101,30 +109,29 @@ private void initializeForRealm( | |
| // RealmContext (request-scoped bean) can still create a JdbcBasePersistenceImpl | ||
| String realmId = realmContext.getRealmIdentifier(); | ||
| // determine schemaVersion once per realm | ||
| final int schemaVersion = | ||
| JdbcBasePersistenceImpl.loadSchemaVersion( | ||
| datasourceOperations, | ||
| realmConfig.getConfig(BehaviorChangeConfiguration.SCHEMA_VERSION_FALL_BACK_ON_DNE)); | ||
| final int schemaVersion = JdbcBasePersistenceImpl.loadSchemaVersion( | ||
| datasourceOperations, | ||
| realmConfig.getConfig(BehaviorChangeConfiguration.SCHEMA_VERSION_FALL_BACK_ON_DNE)); | ||
|
|
||
| sessionSupplierMap.put( | ||
| realmId, | ||
| () -> | ||
| new JdbcBasePersistenceImpl( | ||
| diagnostics, | ||
| datasourceOperations, | ||
| secretsGenerator(realmId, rootCredentialsSet), | ||
| storageIntegrationProvider, | ||
| realmId, | ||
| schemaVersion)); | ||
| () -> new JdbcBasePersistenceImpl( | ||
| diagnostics, | ||
| datasourceOperations, | ||
| secretsGenerator(realmId, rootCredentialsSet), | ||
| storageIntegrationProvider, | ||
| realmId, | ||
| schemaVersion)); | ||
|
|
||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||
| metaStoreManagerMap.put(realmId, metaStoreManager); | ||
| } | ||
|
|
||
| public DatasourceOperations getDatasourceOperations() { | ||
| public DatasourceOperations getDatasourceOperations(String realmId, String storeType) { | ||
| DatasourceOperations databaseOperations; | ||
| try { | ||
| databaseOperations = new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); | ||
| DataSource resolvedDs = dataSourceResolver.get().resolve(realmId, storeType); | ||
| databaseOperations = new DatasourceOperations(resolvedDs, relationalJdbcConfiguration); | ||
| } catch (SQLException sqlException) { | ||
| throw new RuntimeException(sqlException); | ||
| } | ||
|
|
@@ -136,12 +143,11 @@ public synchronized Map<String, PrincipalSecretsResult> bootstrapRealms( | |
| Iterable<String> realms, RootCredentialsSet rootCredentialsSet) { | ||
| SchemaOptions schemaOptions = ImmutableSchemaOptions.builder().build(); | ||
|
|
||
| BootstrapOptions bootstrapOptions = | ||
| ImmutableBootstrapOptions.builder() | ||
| .realms(realms) | ||
| .rootCredentialsSet(rootCredentialsSet) | ||
| .schemaOptions(schemaOptions) | ||
| .build(); | ||
| BootstrapOptions bootstrapOptions = ImmutableBootstrapOptions.builder() | ||
| .realms(realms) | ||
| .rootCredentialsSet(rootCredentialsSet) | ||
| .schemaOptions(schemaOptions) | ||
| .build(); | ||
|
|
||
| return bootstrapRealms(bootstrapOptions); | ||
| } | ||
|
|
@@ -154,16 +160,14 @@ public synchronized Map<String, PrincipalSecretsResult> bootstrapRealms( | |
| for (String realm : bootstrapOptions.realms()) { | ||
| RealmContext realmContext = () -> realm; | ||
| if (!metaStoreManagerMap.containsKey(realm)) { | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||
| int currentSchemaVersion = | ||
| JdbcBasePersistenceImpl.loadSchemaVersion(datasourceOperations, true); | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations(realm, DataSourceResolver.STORE_TYPE_MAIN); | ||
| int currentSchemaVersion = JdbcBasePersistenceImpl.loadSchemaVersion(datasourceOperations, true); | ||
| int requestedSchemaVersion = JdbcBootstrapUtils.getRequestedSchemaVersion(bootstrapOptions); | ||
| int effectiveSchemaVersion = | ||
| JdbcBootstrapUtils.getRealmBootstrapSchemaVersion( | ||
| datasourceOperations.getDatabaseType(), | ||
| currentSchemaVersion, | ||
| requestedSchemaVersion, | ||
| JdbcBasePersistenceImpl.entityTableExists(datasourceOperations)); | ||
| int effectiveSchemaVersion = JdbcBootstrapUtils.getRealmBootstrapSchemaVersion( | ||
| datasourceOperations.getDatabaseType(), | ||
| currentSchemaVersion, | ||
| requestedSchemaVersion, | ||
| JdbcBasePersistenceImpl.entityTableExists(datasourceOperations)); | ||
| LOGGER.info( | ||
| "Effective schema version: {} for bootstrapping realm: {}", | ||
| effectiveSchemaVersion, | ||
|
|
@@ -181,13 +185,11 @@ public synchronized Map<String, PrincipalSecretsResult> bootstrapRealms( | |
| initializeForRealm( | ||
| datasourceOperations, realmContext, bootstrapOptions.rootCredentialsSet()); | ||
|
|
||
| PolarisMetaStoreManager metaStoreManager = | ||
| metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||
| PolarisMetaStoreManager metaStoreManager = metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||
| BasePersistence metaStore = sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); | ||
| PolarisCallContext polarisContext = new PolarisCallContext(realmContext, metaStore); | ||
|
|
||
| PrincipalSecretsResult secretsResult = | ||
| createPolarisPrincipalForRealm(metaStoreManager, polarisContext); | ||
| PrincipalSecretsResult secretsResult = createPolarisPrincipalForRealm(metaStoreManager, polarisContext); | ||
| results.put(realm, secretsResult); | ||
| } | ||
| } | ||
|
|
@@ -219,7 +221,8 @@ public Map<String, BaseResult> purgeRealms(Iterable<String> realms) { | |
| public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( | ||
| RealmContext realmContext) { | ||
| if (!metaStoreManagerMap.containsKey(realmContext.getRealmIdentifier())) { | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations( | ||
| realmContext.getRealmIdentifier(), DataSourceResolver.STORE_TYPE_MAIN); | ||
| initializeForRealm(datasourceOperations, realmContext, null); | ||
| checkPolarisServiceBootstrappedForRealm(realmContext); | ||
| } | ||
|
|
@@ -229,7 +232,8 @@ public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( | |
| @Override | ||
| public synchronized BasePersistence getOrCreateSession(RealmContext realmContext) { | ||
| if (!sessionSupplierMap.containsKey(realmContext.getRealmIdentifier())) { | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||
| DatasourceOperations datasourceOperations = getDatasourceOperations( | ||
| realmContext.getRealmIdentifier(), DataSourceResolver.STORE_TYPE_MAIN); | ||
| initializeForRealm(datasourceOperations, realmContext, null); | ||
| } | ||
| checkPolarisServiceBootstrappedForRealm(realmContext); | ||
|
|
@@ -250,15 +254,18 @@ public synchronized EntityCache getOrCreateEntityCache( | |
| } | ||
|
|
||
| /** | ||
| * In this method we check if Service was bootstrapped for a given realm, i.e. that all the | ||
| * entities were created (root principal, root principal role, etc) If service was not | ||
| * bootstrapped we are throwing IllegalStateException exception That will cause service to crash | ||
| * and force user to run Bootstrap command and initialize MetaStore and create all the required | ||
| * In this method we check if Service was bootstrapped for a given realm, i.e. | ||
| * that all the | ||
| * entities were created (root principal, root principal role, etc) If service | ||
| * was not | ||
| * bootstrapped we are throwing IllegalStateException exception That will cause | ||
| * service to crash | ||
| * and force user to run Bootstrap command and initialize MetaStore and create | ||
| * all the required | ||
| * entities | ||
| */ | ||
| private void checkPolarisServiceBootstrappedForRealm(RealmContext realmContext) { | ||
| PolarisMetaStoreManager metaStoreManager = | ||
| metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||
| PolarisMetaStoreManager metaStoreManager = metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||
| BasePersistence metaStore = sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); | ||
| PolarisCallContext polarisContext = new PolarisCallContext(realmContext, metaStore); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this contract is getting a bit ahead of the implementation. This PR only wires
METASTORE, but the SPI already publishesMETRICSandEVENTSas if those routing modes were part of the current supported surface. Would it make sense to keep the first step narrower and add newStoreTypevalues only once the corresponding paths are actually routed through this resolver?