-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore(spanner): optimize lock contention and skipped tablet reporting #12719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
e9de863
eea8b85
da7a1ba
286dc79
5a0bad1
f0086ec
c99d2af
61e2f16
101c085
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| package com.google.cloud.spanner.spi.v1; | ||
|
|
||
| import com.google.api.core.InternalApi; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.spanner.v1.BeginTransactionRequest; | ||
| import com.google.spanner.v1.CacheUpdate; | ||
| import com.google.spanner.v1.CommitRequest; | ||
|
|
@@ -34,8 +35,11 @@ | |
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.Predicate; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -47,11 +51,20 @@ | |
| @InternalApi | ||
| public final class ChannelFinder { | ||
| private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false; | ||
| private static final ExecutorService CACHE_UPDATE_POOL = | ||
| Executors.newCachedThreadPool( | ||
| r -> { | ||
| Thread t = new Thread(r, "spanner-cache-update"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); | ||
|
|
||
| private final Object updateLock = new Object(); | ||
| private final AtomicLong databaseId = new AtomicLong(); | ||
| private final KeyRecipeCache recipeCache = new KeyRecipeCache(); | ||
| private final KeyRangeCache rangeCache; | ||
| private final AtomicReference<CacheUpdate> pendingUpdate = new AtomicReference<>(); | ||
| private volatile java.util.concurrent.CountDownLatch drainingLatch; | ||
| @Nullable private final EndpointLifecycleManager lifecycleManager; | ||
| @Nullable private final String finderKey; | ||
|
|
||
|
|
@@ -112,6 +125,45 @@ public void update(CacheUpdate update) { | |
| } | ||
| } | ||
|
|
||
| public void updateAsync(CacheUpdate update) { | ||
| // Replace any pending update atomically. Each CacheUpdate contains the full current state, | ||
| // so intermediate updates can be safely dropped to prevent unbounded queue growth. | ||
| if (pendingUpdate.getAndSet(update) == null) { | ||
| // No previous pending update means no drain task is scheduled yet — submit one. | ||
| java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); | ||
| drainingLatch = latch; | ||
| CACHE_UPDATE_POOL.execute( | ||
| () -> { | ||
| try { | ||
| drainPendingUpdate(); | ||
| } finally { | ||
| latch.countDown(); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| private void drainPendingUpdate() { | ||
| CacheUpdate toApply; | ||
| while ((toApply = pendingUpdate.getAndSet(null)) != null) { | ||
| update(toApply); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| void awaitPendingUpdates() throws InterruptedException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we move this method to src/test/java? Or otherwise add a comment that it should only be called from test code? (The |
||
| // Spin until no pending update remains. | ||
| long deadline = System.nanoTime() + java.util.concurrent.TimeUnit.SECONDS.toNanos(5); | ||
| while (pendingUpdate.get() != null && System.nanoTime() < deadline) { | ||
| Thread.sleep(1); | ||
| } | ||
| // Wait for the drain task to fully complete (including the update() call). | ||
| java.util.concurrent.CountDownLatch latch = drainingLatch; | ||
| if (latch != null) { | ||
| latch.await(5, java.util.concurrent.TimeUnit.SECONDS); | ||
| } | ||
| } | ||
|
|
||
| public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) { | ||
| return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,13 @@ class EndpointLifecycleManager { | |
| */ | ||
| private static final int MAX_TRANSIENT_FAILURE_COUNT = 3; | ||
|
|
||
| private enum EvictionReason { | ||
| TRANSIENT_FAILURE, | ||
| SHUTDOWN, | ||
| IDLE, | ||
| STALE | ||
| } | ||
|
|
||
| /** Per-endpoint lifecycle state. */ | ||
| static final class EndpointState { | ||
| final String address; | ||
|
|
@@ -95,6 +102,7 @@ static final class EndpointState { | |
|
|
||
| private final ChannelEndpointCache endpointCache; | ||
| private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>(); | ||
| private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| /** | ||
| * Active addresses reported by each ChannelFinder, keyed by database id. | ||
|
|
@@ -235,6 +243,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) { | |
| for (Set<String> addresses : activeAddressesPerFinder.values()) { | ||
| allActive.addAll(addresses); | ||
| } | ||
| transientFailureEvictedAddresses.retainAll(allActive); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This call is synchronized on |
||
|
|
||
| // Evict managed endpoints not referenced by any finder. | ||
| List<String> stale = new ArrayList<>(); | ||
|
|
@@ -276,6 +285,7 @@ void unregisterFinder(String finderKey) { | |
| for (Set<String> addresses : activeAddressesPerFinder.values()) { | ||
| allActive.addAll(addresses); | ||
| } | ||
| transientFailureEvictedAddresses.retainAll(allActive); | ||
|
|
||
| List<String> stale = new ArrayList<>(); | ||
| for (String address : endpoints.keySet()) { | ||
|
|
@@ -412,6 +422,7 @@ private void probe(String address) { | |
| case READY: | ||
| state.lastReadyAt = clock.instant(); | ||
| state.consecutiveTransientFailures = 0; | ||
| transientFailureEvictedAddresses.remove(address); | ||
| break; | ||
|
|
||
| case IDLE: | ||
|
|
@@ -439,13 +450,13 @@ private void probe(String address) { | |
| Level.FINE, | ||
| "Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes", | ||
| new Object[] {address, state.consecutiveTransientFailures}); | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE); | ||
| } | ||
| break; | ||
|
|
||
| case SHUTDOWN: | ||
| logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address); | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.SHUTDOWN); | ||
| break; | ||
|
|
||
| default: | ||
|
|
@@ -482,16 +493,26 @@ void checkIdleEviction() { | |
| } | ||
|
|
||
| for (String address : toEvict) { | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.IDLE); | ||
| } | ||
| } | ||
|
|
||
| /** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */ | ||
| private void evictEndpoint(String address) { | ||
| evictEndpoint(address, EvictionReason.STALE); | ||
| } | ||
|
|
||
| /** Evicts an endpoint and records whether it should still be reported as unhealthy. */ | ||
| private void evictEndpoint(String address, EvictionReason reason) { | ||
| logger.log(Level.FINE, "Evicting endpoint {0}", address); | ||
|
|
||
| stopProbing(address); | ||
| endpoints.remove(address); | ||
| if (reason == EvictionReason.TRANSIENT_FAILURE) { | ||
| transientFailureEvictedAddresses.add(address); | ||
| } else { | ||
| transientFailureEvictedAddresses.remove(address); | ||
| } | ||
| endpointCache.evict(address); | ||
| } | ||
|
|
||
|
|
@@ -526,6 +547,10 @@ boolean isManaged(String address) { | |
| return endpoints.containsKey(address); | ||
| } | ||
|
|
||
| boolean wasRecentlyEvictedTransientFailure(String address) { | ||
| return transientFailureEvictedAddresses.contains(address); | ||
| } | ||
|
|
||
| /** Returns the endpoint state for testing. */ | ||
| @VisibleForTesting | ||
| EndpointState getEndpointState(String address) { | ||
|
|
@@ -558,6 +583,7 @@ void shutdown() { | |
| } | ||
| } | ||
| endpoints.clear(); | ||
| transientFailureEvictedAddresses.clear(); | ||
|
|
||
| scheduler.shutdown(); | ||
| try { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.