-
Notifications
You must be signed in to change notification settings - Fork 448
JDBC: Replace coarse-grained synchronized methods with per-realm locking #4054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9c1ae29
b206da4
cad7f36
9585a72
7e86f3e
fe5b6b5
3282850
e7f4eb4
2aa2059
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -24,18 +24,22 @@ | |||
| import jakarta.annotation.Nullable; | ||||
| import jakarta.enterprise.context.ApplicationScoped; | ||||
| import jakarta.enterprise.inject.Instance; | ||||
| import jakarta.enterprise.inject.Produces; | ||||
| import jakarta.inject.Inject; | ||||
| import java.sql.SQLException; | ||||
| import java.time.Clock; | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
| import java.util.Optional; | ||||
| import java.util.function.Supplier; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
| import javax.sql.DataSource; | ||||
| import org.apache.polaris.core.PolarisCallContext; | ||||
| import org.apache.polaris.core.PolarisDiagnostics; | ||||
| import org.apache.polaris.core.config.BehaviorChangeConfiguration; | ||||
| import org.apache.polaris.core.config.RealmConfig; | ||||
| import org.apache.polaris.core.config.RealmConfigImpl; | ||||
| import org.apache.polaris.core.config.RealmConfigurationSource; | ||||
| import org.apache.polaris.core.context.RealmContext; | ||||
| import org.apache.polaris.core.entity.PrincipalEntity; | ||||
| import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; | ||||
|
|
@@ -67,19 +71,31 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory { | |||
|
|
||||
| private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMetaStoreManagerFactory.class); | ||||
|
|
||||
| final Map<String, PolarisMetaStoreManager> metaStoreManagerMap = new HashMap<>(); | ||||
| final Map<String, EntityCache> entityCacheMap = new HashMap<>(); | ||||
| final Map<String, Supplier<BasePersistence>> sessionSupplierMap = new HashMap<>(); | ||||
| // Stateful per-realm cache — InMemoryEntityCache accumulates entries across requests | ||||
| private final Map<String, EntityCache> entityCacheMap = new ConcurrentHashMap<>(); | ||||
|
|
||||
| // Cached per-realm schema version — loaded from DB once, stable at runtime | ||||
| private final ConcurrentHashMap<String, Integer> schemaVersionCache = new ConcurrentHashMap<>(); | ||||
|
|
||||
| // Tracks realms that have already passed the bootstrap verification check | ||||
| // (checkPolarisServiceBootstrappedForRealm), avoiding redundant DB hits on subsequent calls. | ||||
| private final Set<String> verifiedRealms = ConcurrentHashMap.newKeySet(); | ||||
|
|
||||
| @Inject Clock clock; | ||||
| @Inject PolarisDiagnostics diagnostics; | ||||
| @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; | ||||
| @Inject Instance<DataSource> dataSource; | ||||
| @Inject RelationalJdbcConfiguration relationalJdbcConfiguration; | ||||
| @Inject RealmConfig realmConfig; | ||||
| @Inject DatasourceOperations datasourceOperations; | ||||
| @Inject RealmConfigurationSource realmConfigurationSource; | ||||
|
|
||||
| protected JdbcMetaStoreManagerFactory() {} | ||||
|
|
||||
| @Produces | ||||
| @ApplicationScoped | ||||
| static DatasourceOperations produceDatasourceOperations( | ||||
| Instance<DataSource> dataSource, RelationalJdbcConfiguration relationalJdbcConfiguration) { | ||||
| return new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); | ||||
| } | ||||
|
|
||||
| protected PrincipalSecretsGenerator secretsGenerator( | ||||
| String realmId, @Nullable RootCredentialsSet rootCredentialsSet) { | ||||
| if (rootCredentialsSet != null) { | ||||
|
|
@@ -93,42 +109,24 @@ protected PolarisMetaStoreManager createNewMetaStoreManager() { | |||
| return new AtomicOperationMetaStoreManager(clock, diagnostics); | ||||
| } | ||||
|
|
||||
| private void initializeForRealm( | ||||
| DatasourceOperations datasourceOperations, | ||||
| RealmContext realmContext, | ||||
| RootCredentialsSet rootCredentialsSet) { | ||||
| // Materialize realmId so that background tasks that don't have an active | ||||
| // RealmContext (request-scoped bean) can still create a JdbcBasePersistenceImpl | ||||
| String realmId = realmContext.getRealmIdentifier(); | ||||
| // determine schemaVersion once per realm | ||||
| final int schemaVersion = | ||||
| JdbcBasePersistenceImpl.loadSchemaVersion( | ||||
| datasourceOperations, | ||||
| realmConfig.getConfig(BehaviorChangeConfiguration.SCHEMA_VERSION_FALL_BACK_ON_DNE)); | ||||
|
|
||||
| sessionSupplierMap.put( | ||||
| /** Loads and caches the schema version for the given realm (DB hit only on first call). */ | ||||
| private int getOrLoadSchemaVersion(String realmId, boolean fallbackOnDne) { | ||||
| return schemaVersionCache.computeIfAbsent( | ||||
| realmId, | ||||
| () -> | ||||
| new JdbcBasePersistenceImpl( | ||||
| diagnostics, | ||||
| datasourceOperations, | ||||
| secretsGenerator(realmId, rootCredentialsSet), | ||||
| storageIntegrationProvider, | ||||
| realmId, | ||||
| schemaVersion)); | ||||
|
|
||||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||||
| metaStoreManagerMap.put(realmId, metaStoreManager); | ||||
| k -> JdbcBasePersistenceImpl.loadSchemaVersion(datasourceOperations, fallbackOnDne)); | ||||
| } | ||||
|
|
||||
| public DatasourceOperations getDatasourceOperations() { | ||||
| DatasourceOperations databaseOperations; | ||||
| try { | ||||
| databaseOperations = new DatasourceOperations(dataSource.get(), relationalJdbcConfiguration); | ||||
| } catch (SQLException sqlException) { | ||||
| throw new RuntimeException(sqlException); | ||||
| } | ||||
| return databaseOperations; | ||||
| /** Creates a new stateless {@link JdbcBasePersistenceImpl} for the given realm. */ | ||||
| private BasePersistence createSession( | ||||
| String realmId, @Nullable RootCredentialsSet rootCredentialsSet, boolean fallbackOnDne) { | ||||
| int schemaVersion = getOrLoadSchemaVersion(realmId, fallbackOnDne); | ||||
| return new JdbcBasePersistenceImpl( | ||||
| diagnostics, | ||||
| datasourceOperations, | ||||
| secretsGenerator(realmId, rootCredentialsSet), | ||||
| storageIntegrationProvider, | ||||
| realmId, | ||||
| schemaVersion); | ||||
| } | ||||
|
|
||||
| @Override | ||||
|
|
@@ -153,8 +151,7 @@ public synchronized Map<String, PrincipalSecretsResult> bootstrapRealms( | |||
|
|
||||
| for (String realm : bootstrapOptions.realms()) { | ||||
| RealmContext realmContext = () -> realm; | ||||
| if (!metaStoreManagerMap.containsKey(realm)) { | ||||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||||
| if (!verifiedRealms.contains(realm)) { | ||||
| int currentSchemaVersion = | ||||
| JdbcBasePersistenceImpl.loadSchemaVersion(datasourceOperations, true); | ||||
| int requestedSchemaVersion = JdbcBootstrapUtils.getRequestedSchemaVersion(bootstrapOptions); | ||||
|
|
@@ -178,75 +175,91 @@ public synchronized Map<String, PrincipalSecretsResult> bootstrapRealms( | |||
| throw new RuntimeException( | ||||
| String.format("Error executing sql script: %s", e.getMessage()), e); | ||||
| } | ||||
| initializeForRealm( | ||||
| datasourceOperations, realmContext, bootstrapOptions.rootCredentialsSet()); | ||||
| // Cache the effective schema version for this realm | ||||
| schemaVersionCache.put(realm, effectiveSchemaVersion); | ||||
|
singhpk234 marked this conversation as resolved.
|
||||
|
|
||||
| PolarisMetaStoreManager metaStoreManager = | ||||
| metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||||
| BasePersistence metaStore = sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); | ||||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||||
| BasePersistence metaStore = | ||||
| createSession(realm, bootstrapOptions.rootCredentialsSet(), true); | ||||
| PolarisCallContext polarisContext = new PolarisCallContext(realmContext, metaStore); | ||||
|
|
||||
| PrincipalSecretsResult secretsResult = | ||||
| createPolarisPrincipalForRealm(metaStoreManager, polarisContext); | ||||
| results.put(realm, secretsResult); | ||||
| verifiedRealms.add(realm); | ||||
| } | ||||
| } | ||||
|
|
||||
| return Map.copyOf(results); | ||||
| } | ||||
|
|
||||
| @Override | ||||
| public Map<String, BaseResult> purgeRealms(Iterable<String> realms) { | ||||
| public synchronized Map<String, BaseResult> purgeRealms(Iterable<String> realms) { | ||||
| Map<String, BaseResult> results = new HashMap<>(); | ||||
|
|
||||
| for (String realm : realms) { | ||||
| RealmContext realmContext = () -> realm; | ||||
| PolarisMetaStoreManager metaStoreManager = getOrCreateMetaStoreManager(realmContext); | ||||
| BasePersistence session = getOrCreateSession(realmContext); | ||||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||||
| BasePersistence session = createSession(realm, null, true); | ||||
|
|
||||
| PolarisCallContext callContext = new PolarisCallContext(realmContext, session); | ||||
|
|
||||
| // Verify the realm is bootstrapped before purging — a non-bootstrapped realm | ||||
| // has no root principal, so purging it is a no-op that should be reported as failure. | ||||
| Optional<PrincipalEntity> rootPrincipal = metaStoreManager.findRootPrincipal(callContext); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, I think this is the right thing to do. |
||||
| if (rootPrincipal.isEmpty()) { | ||||
| results.put( | ||||
| realm, new BaseResult(BaseResult.ReturnStatus.ENTITY_NOT_FOUND, "Not bootstrapped")); | ||||
| continue; | ||||
| } | ||||
|
|
||||
| BaseResult result = metaStoreManager.purge(callContext); | ||||
| results.put(realm, result); | ||||
|
|
||||
| sessionSupplierMap.remove(realm); | ||||
| metaStoreManagerMap.remove(realm); | ||||
| // Evict all cached state for this realm | ||||
| entityCacheMap.remove(realm); | ||||
| schemaVersionCache.remove(realm); | ||||
| verifiedRealms.remove(realm); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||||
| } | ||||
|
|
||||
| return Map.copyOf(results); | ||||
| } | ||||
|
|
||||
| @Override | ||||
| public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( | ||||
| RealmContext realmContext) { | ||||
| if (!metaStoreManagerMap.containsKey(realmContext.getRealmIdentifier())) { | ||||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||||
| initializeForRealm(datasourceOperations, realmContext, null); | ||||
| checkPolarisServiceBootstrappedForRealm(realmContext); | ||||
| } | ||||
| return metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||||
| public PolarisMetaStoreManager getOrCreateMetaStoreManager(RealmContext realmContext) { | ||||
| // Stateless — create a fresh instance on every call, no caching needed | ||||
| return createNewMetaStoreManager(); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: why not use the one in the entity cache? The caller is request scoped, polaris/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java Line 236 in 8cbfb26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we put it in the entity cache then?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question 😉 ... but I think it'll go way outside the scope of this PR. |
||||
| } | ||||
|
|
||||
| @Override | ||||
| public synchronized BasePersistence getOrCreateSession(RealmContext realmContext) { | ||||
| if (!sessionSupplierMap.containsKey(realmContext.getRealmIdentifier())) { | ||||
| DatasourceOperations datasourceOperations = getDatasourceOperations(); | ||||
| initializeForRealm(datasourceOperations, realmContext, null); | ||||
| public BasePersistence getOrCreateSession(RealmContext realmContext) { | ||||
| String realmId = realmContext.getRealmIdentifier(); | ||||
| RealmConfig realmConfig = new RealmConfigImpl(realmConfigurationSource, realmContext); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose this is related to the following comment from the main thread:
This means the IMHO, this is not a problem in this factory, but in the caller. We need to review the caller and handle context propagation / activation properly. Cf. #4061 @adutra : WDYT? |
||||
| boolean fallbackOnDne = | ||||
| realmConfig.getConfig(BehaviorChangeConfiguration.SCHEMA_VERSION_FALL_BACK_ON_DNE); | ||||
|
|
||||
| // Verify bootstrap once per realm lifetime; skip on subsequent calls. | ||||
| // On cold start, multiple threads may verify concurrently — this is benign | ||||
| // (idempotent DB query), trading a few redundant queries for simpler code. | ||||
| if (!verifiedRealms.contains(realmId)) { | ||||
| checkPolarisServiceBootstrappedForRealm(realmContext, fallbackOnDne); | ||||
| } | ||||
| checkPolarisServiceBootstrappedForRealm(realmContext); | ||||
| return sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); | ||||
|
|
||||
| // Stateless — create a fresh instance on every call; schemaVersion is cached per realm | ||||
| return createSession(realmId, null, fallbackOnDne); | ||||
| } | ||||
|
|
||||
| @Override | ||||
| public synchronized EntityCache getOrCreateEntityCache( | ||||
| RealmContext realmContext, RealmConfig realmConfig) { | ||||
| if (!entityCacheMap.containsKey(realmContext.getRealmIdentifier())) { | ||||
| PolarisMetaStoreManager metaStoreManager = getOrCreateMetaStoreManager(realmContext); | ||||
| entityCacheMap.put( | ||||
| realmContext.getRealmIdentifier(), | ||||
| new InMemoryEntityCache(diagnostics, realmConfig, metaStoreManager)); | ||||
| } | ||||
|
|
||||
| return entityCacheMap.get(realmContext.getRealmIdentifier()); | ||||
| public EntityCache getOrCreateEntityCache(RealmContext realmContext, RealmConfig realmConfig) { | ||||
| String realmId = realmContext.getRealmIdentifier(); | ||||
| // EntityCache is stateful (Caffeine + ConcurrentHashMap) — must be shared across requests. | ||||
| // ConcurrentHashMap.computeIfAbsent is already atomic — no external lock needed. | ||||
| return entityCacheMap.computeIfAbsent( | ||||
| realmId, | ||||
| k -> { | ||||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||||
| return new InMemoryEntityCache(diagnostics, realmConfig, metaStoreManager); | ||||
| }); | ||||
| } | ||||
|
|
||||
| /** | ||||
|
|
@@ -256,19 +269,21 @@ public synchronized EntityCache getOrCreateEntityCache( | |||
| * and force user to run Bootstrap command and initialize MetaStore and create all the required | ||||
| * entities | ||||
| */ | ||||
| private void checkPolarisServiceBootstrappedForRealm(RealmContext realmContext) { | ||||
| PolarisMetaStoreManager metaStoreManager = | ||||
| metaStoreManagerMap.get(realmContext.getRealmIdentifier()); | ||||
| BasePersistence metaStore = sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); | ||||
| private void checkPolarisServiceBootstrappedForRealm( | ||||
| RealmContext realmContext, boolean fallbackOnDne) { | ||||
| String realmId = realmContext.getRealmIdentifier(); | ||||
| PolarisMetaStoreManager metaStoreManager = createNewMetaStoreManager(); | ||||
| BasePersistence metaStore = createSession(realmId, null, fallbackOnDne); | ||||
| PolarisCallContext polarisContext = new PolarisCallContext(realmContext, metaStore); | ||||
|
|
||||
| Optional<PrincipalEntity> rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext); | ||||
| if (rootPrincipal.isEmpty()) { | ||||
| LOGGER.error( | ||||
| "\n\n Realm {} is not bootstrapped, could not load root principal. Please run Bootstrap command. \n\n", | ||||
| realmContext.getRealmIdentifier()); | ||||
| realmId); | ||||
| throw new IllegalStateException( | ||||
| "Realm is not bootstrapped, please run server in bootstrap mode."); | ||||
| } | ||||
| verifiedRealms.add(realmId); | ||||
| } | ||||
| } | ||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check doesn’t make much sense here. The bootstrap is invoked by a CLI tool, so relying on in-memory state isn’t meaningful. Instead, we should make the process idempotent and avoid re-bootstrapping an already initialized realm. That logic should be based on the actual DB state, for example whether the root principal already exists, rather than an in-memory cache that only lives for the duration of the process.
Since the CLI tool is short-lived anyway, I’d suggest reusing
checkPolarisServiceBootstrappedForRealm()here. We could decide whether split it to two methods to strip outverifiedRealms.add(realmId);but that's a minor point.