Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.spi.v1;

import com.google.api.core.InternalApi;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CacheUpdate;
import com.google.spanner.v1.CommitRequest;
Expand All @@ -34,6 +35,9 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
Expand All @@ -47,11 +51,20 @@
@InternalApi
public final class ChannelFinder {
private static final Predicate<String> NO_EXCLUDED_ENDPOINTS = address -> false;
private static final ExecutorService CACHE_UPDATE_POOL =
Executors.newCachedThreadPool(
r -> {
Thread t = new Thread(r, "spanner-cache-update");
t.setDaemon(true);
return t;
});

private final Object updateLock = new Object();
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
private final Executor cacheUpdateExecutor =
MoreExecutors.newSequentialExecutor(CACHE_UPDATE_POOL);
@Nullable private final EndpointLifecycleManager lifecycleManager;
@Nullable private final String finderKey;

Expand Down Expand Up @@ -112,6 +125,10 @@ public void update(CacheUpdate update) {
}
}

public void updateAsync(CacheUpdate update) {
cacheUpdateExecutor.execute(() -> update(update));
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()), NO_EXCLUDED_ENDPOINTS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ class EndpointLifecycleManager {
*/
private static final int MAX_TRANSIENT_FAILURE_COUNT = 3;

private enum EvictionReason {
TRANSIENT_FAILURE,
SHUTDOWN,
IDLE,
STALE
}

/** Per-endpoint lifecycle state. */
static final class EndpointState {
final String address;
Expand All @@ -95,6 +102,7 @@ static final class EndpointState {

private final ChannelEndpointCache endpointCache;
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();

/**
* Active addresses reported by each ChannelFinder, keyed by database id.
Expand Down Expand Up @@ -235,6 +243,7 @@ void updateActiveAddresses(String finderKey, Set<String> activeAddresses) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
transientFailureEvictedAddresses.retainAll(allActive);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is synchronized on activeAddressLock. However, the probe method also adds to this set without holding a lock. This means that this call (and also the add call from probe) are not guaranteed to be atomic, and that the outcome is undefined. It won't cause any exceptions, as it is a ConcurrentHashSet, so if we are fine with this being slightly undeterministic, then this is fine. Otherwise, we should take the same lock in probe as well as here.


// Evict managed endpoints not referenced by any finder.
List<String> stale = new ArrayList<>();
Expand Down Expand Up @@ -276,6 +285,7 @@ void unregisterFinder(String finderKey) {
for (Set<String> addresses : activeAddressesPerFinder.values()) {
allActive.addAll(addresses);
}
transientFailureEvictedAddresses.retainAll(allActive);

List<String> stale = new ArrayList<>();
for (String address : endpoints.keySet()) {
Expand Down Expand Up @@ -412,6 +422,7 @@ private void probe(String address) {
case READY:
state.lastReadyAt = clock.instant();
state.consecutiveTransientFailures = 0;
transientFailureEvictedAddresses.remove(address);
break;

case IDLE:
Expand Down Expand Up @@ -439,13 +450,13 @@ private void probe(String address) {
Level.FINE,
"Evicting endpoint {0}: {1} consecutive TRANSIENT_FAILURE probes",
new Object[] {address, state.consecutiveTransientFailures});
evictEndpoint(address);
evictEndpoint(address, EvictionReason.TRANSIENT_FAILURE);
}
break;

case SHUTDOWN:
logger.log(Level.FINE, "Probe for {0}: channel SHUTDOWN, evicting endpoint", address);
evictEndpoint(address);
evictEndpoint(address, EvictionReason.SHUTDOWN);
break;

default:
Expand Down Expand Up @@ -482,16 +493,26 @@ void checkIdleEviction() {
}

for (String address : toEvict) {
evictEndpoint(address);
evictEndpoint(address, EvictionReason.IDLE);
}
}

/** Evicts an endpoint: stops probing, removes from tracking, shuts down the channel. */
private void evictEndpoint(String address) {
evictEndpoint(address, EvictionReason.STALE);
}

/** Evicts an endpoint and records whether it should still be reported as unhealthy. */
private void evictEndpoint(String address, EvictionReason reason) {
logger.log(Level.FINE, "Evicting endpoint {0}", address);

stopProbing(address);
endpoints.remove(address);
if (reason == EvictionReason.TRANSIENT_FAILURE) {
transientFailureEvictedAddresses.add(address);
} else {
transientFailureEvictedAddresses.remove(address);
}
endpointCache.evict(address);
}

Expand Down Expand Up @@ -526,6 +547,10 @@ boolean isManaged(String address) {
return endpoints.containsKey(address);
}

boolean wasRecentlyEvictedTransientFailure(String address) {
return transientFailureEvictedAddresses.contains(address);
}

/** Returns the endpoint state for testing. */
@VisibleForTesting
EndpointState getEndpointState(String address) {
Expand Down Expand Up @@ -558,6 +583,7 @@ void shutdown() {
}
}
endpoints.clear();
transientFailureEvictedAddresses.clear();

scheduler.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,25 +798,25 @@ public void onMessage(ResponseT message) {
if (message instanceof PartialResultSet) {
PartialResultSet response = (PartialResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof ResultSet) {
ResultSet response = (ResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof Transaction) {
Transaction response = (Transaction) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
transactionId = transactionIdFromTransaction(response);
} else if (message instanceof CommitResponse) {
CommitResponse response = (CommitResponse) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
call.channelFinder.updateAsync(response.getCacheUpdate());
}
}
if (transactionId != null) {
Expand Down
Loading
Loading