Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private boolean updateContainerState(final DatanodeDetails datanode,
// If the state of a container is OPEN and a replica is in different state, finalize the container.
if (replica.getState() != State.OPEN) {
getLogger().info("FINALIZE (i.e. CLOSING) {}", detailsForLogging);
containerManager.updateContainerState(containerId, LifeCycleEvent.FINALIZE);
updateContainerState(containerId, LifeCycleEvent.FINALIZE);
}
return false;
case CLOSING:
Expand All @@ -263,7 +263,7 @@ private boolean updateContainerState(final DatanodeDetails datanode,
// If the replica is in QUASI_CLOSED state, move the container to QUASI_CLOSED state.
if (replica.getState() == State.QUASI_CLOSED) {
getLogger().info("QUASI_CLOSE {}", detailsForLogging);
containerManager.updateContainerState(containerId, LifeCycleEvent.QUASI_CLOSE);
updateContainerState(containerId, LifeCycleEvent.QUASI_CLOSE);
return false;
}

Expand All @@ -288,7 +288,7 @@ private boolean updateContainerState(final DatanodeDetails datanode,
return true;
}
getLogger().info("CLOSE {}", detailsForLogging);
containerManager.updateContainerState(containerId, LifeCycleEvent.CLOSE);
updateContainerState(containerId, LifeCycleEvent.CLOSE);
}
return false;
case QUASI_CLOSED:
Expand All @@ -301,7 +301,7 @@ private boolean updateContainerState(final DatanodeDetails datanode,
return true;
}
getLogger().info("FORCE_CLOSE for {}", detailsForLogging);
containerManager.updateContainerState(containerId, LifeCycleEvent.FORCE_CLOSE);
updateContainerState(containerId, LifeCycleEvent.FORCE_CLOSE);
}
return false;
case CLOSED:
Expand Down Expand Up @@ -358,6 +358,15 @@ private boolean updateContainerState(final DatanodeDetails datanode,
}
}

private void updateContainerState(ContainerID containerID, LifeCycleEvent event)
throws InvalidStateTransitionException, IOException {
if (scmContext.isLeader()) {
containerManager.updateContainerState(containerID, event);
} else {
getLogger().debug("The Current scm is not leader");
}
}

/**
* Helper method to verify that the replica's bcsId matches the container's in SCM.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
Expand All @@ -52,6 +51,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
Expand Down Expand Up @@ -86,7 +86,6 @@ public class SCMStateMachine extends BaseStateMachine {
private DBCheckpoint installingDBCheckpoint = null;
private List<ManagedSecretKey> installingSecretKeys = null;

private AtomicLong currentLeaderTerm = new AtomicLong(-1L);
private AtomicBoolean isStateMachineReady = new AtomicBoolean();

public SCMStateMachine(final StorageContainerManager scm,
Expand Down Expand Up @@ -284,26 +283,24 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
return;
}

currentLeaderTerm.set(scm.getScmHAManager().getRatisServer().getDivision()
.getInfo().getCurrentTerm());

if (isStateMachineReady.compareAndSet(false, true)) {
// refresh and validate safe mode rules if it can exit safe mode
// if being leader, all previous term transactions have been applied
// if other states, just refresh safe mode rules, and transaction keeps flushing from leader
// and does not depend on pending transactions.
scm.getScmSafeModeManager().refreshAndValidate();
if (!isStateMachineReady.get()) {
if (groupMemberId.getPeerId().equals(newLeaderId)) {
tryStartDNServerAndRefreshSafeMode();
} else {
scheduleDNServerStartCheck();
}
}

if (!groupMemberId.getPeerId().equals(newLeaderId)) {
LOG.info("leader changed, yet current SCM is still follower.");
return;
}

LOG.info("current SCM becomes leader of term {}.", currentLeaderTerm);
long currentTerm = scm.getScmHAManager().getRatisServer().getDivision()
.getInfo().getCurrentTerm();
LOG.info("current SCM becomes leader of term {}.", currentTerm);

scm.getScmContext().updateLeaderAndTerm(true,
currentLeaderTerm.get());
scm.getScmContext().updateLeaderAndTerm(true, currentTerm);
scm.getSequenceIdGen().invalidateBatch();

try {
Expand Down Expand Up @@ -369,19 +366,100 @@ public void notifyTermIndexUpdated(long term, long index) {
if (transactionBuffer != null) {
transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(term, index));
}
}

public boolean getIsStateMachineReady() {
return isStateMachineReady.get();
}

if (currentLeaderTerm.get() == term) {
// This means after a restart, all pending transactions have been applied.
/**
* Start the DN protocol server and trigger safe mode re-evaluation.
*
* <p>In HA mode the DN server is deliberately not started during
* {@link org.apache.hadoop.hdds.scm.server.StorageContainerManager#start()}.
* Instead it is deferred until the SCM state machine has caught up with
* the leader's committed log entries, so that DN heartbeats are processed
* against the latest container/pipeline state rather than a stale snapshot.
*
* <p>The method is guarded by {@code isStateMachineReady} (CAS) to ensure
* the non-idempotent {@code DatanodeProtocolServer.start()} is invoked
* exactly once.
*/
private void tryStartDNServerAndRefreshSafeMode() {
if (isStateMachineReady.get()) {
return;
}
if (scm.getScmContext().isLeader() || isFollowerCaughtUp()) {
if (isStateMachineReady.compareAndSet(false, true)) {
// Refresh Safemode rules state if not already done.
scm.getDatanodeProtocolServer().start();
scm.getScmSafeModeManager().refreshAndValidate();
}
currentLeaderTerm.set(-1L);
}
}

public boolean getIsStateMachineReady() {
return isStateMachineReady.get();
/**
* Check whether this follower's state machine has caught up with the
* leader's committed log entries.
* @return true if {@code lastAppliedIndex >= leaderCommitIndex}
*/
private boolean isFollowerCaughtUp() {
try {
RaftServer.Division division = scm.getScmHAManager()
.getRatisServer().getDivision();
DivisionInfo divisionInfo = division.getInfo();
long lastAppliedIndex = divisionInfo.getLastAppliedIndex();

RaftPeerId leaderId = divisionInfo.getLeaderId();
if (leaderId != null) {
for (RaftProtos.CommitInfoProto info : division.getCommitInfos()) {
if (info.getServer().getId().equals(leaderId.toByteString())) {
long leaderCommit = info.getCommitIndex();
boolean caughtUp = lastAppliedIndex >= leaderCommit;
if (caughtUp) {
LOG.info("Followers caught up with the leader: lastAppliedIndex={}, leaderCommit={}",
lastAppliedIndex, leaderCommit);
} else {
LOG.debug("Followers did not catch up with the leader: lastAppliedIndex={}, leaderCommit={}",
lastAppliedIndex, leaderCommit);
}
return caughtUp;
}
}
}

LOG.warn("Leader commit index not available yet, leaderId={}", leaderId);
return false;
} catch (Exception e) {
LOG.warn("Failed to check follower catch-up status", e);
return false;
}
}

/**
* Schedule deferred checks for starting the DN protocol server.
*/
private void scheduleDNServerStartCheck() {
CompletableFuture.runAsync(() -> {
long delayMs = 1000;
final long maxTotalMs = 5 * 60 * 1000;
long elapsed = 0;
while (!isStateMachineReady.get() && elapsed < maxTotalMs) {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
elapsed += delayMs;
tryStartDNServerAndRefreshSafeMode();
delayMs = Math.min(delayMs * 2, 10_000);
}
if (isStateMachineReady.get()) {
LOG.info("DN server started via deferred check after {}ms", elapsed);
} else {
LOG.warn("DN server start check expired after {}ms", elapsed);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,8 +1573,10 @@ public void start() throws IOException {
}
getBlockProtocolServer().start();

// start datanode protocol server
getDatanodeProtocolServer().start();
// If HA is enabled, start datanode protocol server once leader is ready.
if (!scmStorageConfig.isSCMHAEnabled()) {
getDatanodeProtocolServer().start();
}
if (getSecurityProtocolServer() != null) {
getSecurityProtocolServer().start();
persistSCMCertificates();
Expand Down
Loading