-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore(spanner): optimize lock contention and skipped tablet reporting #12719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
e9de863
eea8b85
da7a1ba
286dc79
5a0bad1
f0086ec
c99d2af
61e2f16
101c085
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,13 @@ class EndpointLifecycleManager { | |
| */ | ||
| private static final int MAX_TRANSIENT_FAILURE_COUNT = 3; | ||
|
|
||
| private enum EvictionReason { | ||
| TRANSIENT_FAILURE, | ||
| SHUTDOWN, | ||
| IDLE, | ||
| STALE | ||
| } | ||
|
|
||
| /** Per-endpoint lifecycle state. */ | ||
| static final class EndpointState { | ||
| final String address; | ||
|
|
@@ -95,6 +102,7 @@ static final class EndpointState { | |
|
|
||
| private final ChannelEndpointCache endpointCache; | ||
| private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>(); | ||
| private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| /** | ||
| * Active addresses reported by each ChannelFinder, keyed by database id. | ||
|
|
@@ -235,6 +243,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) { | |
| for (Set<String> addresses : activeAddressesPerFinder.values()) { | ||
| allActive.addAll(addresses); | ||
| } | ||
| transientFailureEvictedAddresses.retainAll(allActive); | ||
|
||
|
|
||
| // Evict managed endpoints not referenced by any finder. | ||
| List<String> stale = new ArrayList<>(); | ||
|
|
@@ -276,6 +285,7 @@ void unregisterFinder(String finderKey) { | |
| for (Set<String> addresses : activeAddressesPerFinder.values()) { | ||
| allActive.addAll(addresses); | ||
| } | ||
| transientFailureEvictedAddresses.retainAll(allActive); | ||
|
|
||
| List<String> stale = new ArrayList<>(); | ||
| for (String address : endpoints.keySet()) { | ||
|
|
@@ -412,6 +422,7 @@ private void probe(String address) { | |
| case READY: | ||
| state.lastReadyAt = clock.instant(); | ||
| state.consecutiveTransientFailures = 0; | ||
| transientFailureEvictedAddresses.remove(address); | ||
| break; | ||
|
|
||
| case IDLE: | ||
|
|
@@ -439,13 +450,13 @@ private void probe(String address) { | |
| Level.FINE, | ||
| "Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes", | ||
| new Object[] {address, state.consecutiveTransientFailures}); | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE); | ||
| } | ||
| break; | ||
|
|
||
| case SHUTDOWN: | ||
| logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address); | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.SHUTDOWN); | ||
| break; | ||
|
|
||
| default: | ||
|
|
@@ -482,16 +493,26 @@ void checkIdleEviction() { | |
| } | ||
|
|
||
| for (String address : toEvict) { | ||
| evictEndpoint(address); | ||
| evictEndpoint(address, EvictionReason.IDLE); | ||
| } | ||
| } | ||
|
|
||
| /** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */ | ||
| private void evictEndpoint(String address) { | ||
| evictEndpoint(address, EvictionReason.STALE); | ||
| } | ||
|
|
||
| /** Evicts an endpoint and records whether it should still be reported as unhealthy. */ | ||
| private void evictEndpoint(String address, EvictionReason reason) { | ||
| logger.log(Level.FINE, "Evicting endpoint {0}", address); | ||
|
|
||
| stopProbing(address); | ||
| endpoints.remove(address); | ||
| if (reason == EvictionReason.TRANSIENT_FAILURE) { | ||
| transientFailureEvictedAddresses.add(address); | ||
| } else { | ||
| transientFailureEvictedAddresses.remove(address); | ||
| } | ||
| endpointCache.evict(address); | ||
| } | ||
|
|
||
|
|
@@ -526,6 +547,10 @@ boolean isManaged(String address) { | |
| return endpoints.containsKey(address); | ||
| } | ||
|
|
||
| boolean wasRecentlyEvictedTransientFailure(String address) { | ||
| return transientFailureEvictedAddresses.contains(address); | ||
| } | ||
|
|
||
| /** Returns the endpoint state for testing. */ | ||
| @VisibleForTesting | ||
| EndpointState getEndpointState(String address) { | ||
|
|
@@ -558,6 +583,7 @@ void shutdown() { | |
| } | ||
| } | ||
| endpoints.clear(); | ||
| transientFailureEvictedAddresses.clear(); | ||
|
|
||
| scheduler.shutdown(); | ||
| try { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this method to src/test/java? Or otherwise add a comment that it should only be called from test code? (The
@VisibleForTestingannotation is intended to indicate that a method has higher visibility than otherwise needed to be able to test, it does not indicate that it should only be invoked from test code.)