diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 57234889dcb..2fa46c19d94 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -252,7 +252,7 @@ private boolean updateContainerState(final DatanodeDetails datanode, // If the state of a container is OPEN and a replica is in different state, finalize the container. if (replica.getState() != State.OPEN) { getLogger().info("FINALIZE (i.e. CLOSING) {}", detailsForLogging); - containerManager.updateContainerState(containerId, LifeCycleEvent.FINALIZE); + updateContainerState(containerId, LifeCycleEvent.FINALIZE); } return false; case CLOSING: @@ -263,7 +263,7 @@ private boolean updateContainerState(final DatanodeDetails datanode, // If the replica is in QUASI_CLOSED state, move the container to QUASI_CLOSED state. if (replica.getState() == State.QUASI_CLOSED) { getLogger().info("QUASI_CLOSE {}", detailsForLogging); - containerManager.updateContainerState(containerId, LifeCycleEvent.QUASI_CLOSE); + updateContainerState(containerId, LifeCycleEvent.QUASI_CLOSE); return false; } @@ -288,7 +288,7 @@ private boolean updateContainerState(final DatanodeDetails datanode, return true; } getLogger().info("CLOSE {}", detailsForLogging); - containerManager.updateContainerState(containerId, LifeCycleEvent.CLOSE); + updateContainerState(containerId, LifeCycleEvent.CLOSE); } return false; case QUASI_CLOSED: @@ -301,7 +301,7 @@ private boolean updateContainerState(final DatanodeDetails datanode, return true; } getLogger().info("FORCE_CLOSE for {}", detailsForLogging); - containerManager.updateContainerState(containerId, LifeCycleEvent.FORCE_CLOSE); + updateContainerState(containerId, LifeCycleEvent.FORCE_CLOSE); } return false; case CLOSED: @@ -358,6 +358,15 @@ private boolean updateContainerState(final DatanodeDetails datanode, } } + private void updateContainerState(ContainerID containerID, LifeCycleEvent event) + throws InvalidStateTransitionException, IOException { + if (scmContext.isLeader()) { + containerManager.updateContainerState(containerID, event); + } else { + getLogger().debug("The Current scm is not leader"); + } + } + /** * Helper method to verify that the replica's bcsId matches the container's in SCM. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 9d49ca36b6f..cdba9af61ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -35,7 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; @@ -52,6 +51,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.DivisionInfo; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; @@ -86,7 +86,6 @@ public class SCMStateMachine extends BaseStateMachine { private DBCheckpoint installingDBCheckpoint = null; private List installingSecretKeys = null; - private AtomicLong currentLeaderTerm = new AtomicLong(-1L); private AtomicBoolean isStateMachineReady = new AtomicBoolean(); public SCMStateMachine(final StorageContainerManager scm, @@ -284,15 +283,12 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, return; } - currentLeaderTerm.set(scm.getScmHAManager().getRatisServer().getDivision() - .getInfo().getCurrentTerm()); - - if (isStateMachineReady.compareAndSet(false, true)) { - // refresh and validate safe mode rules if it can exit safe mode - // if being leader, all previous term transactions have been applied - // if other states, just refresh safe mode rules, and transaction keeps flushing from leader - // and does not depend on pending transactions. - scm.getScmSafeModeManager().refreshAndValidate(); + if (!isStateMachineReady.get()) { + if (groupMemberId.getPeerId().equals(newLeaderId)) { + tryStartDNServerAndRefreshSafeMode(); + } else { + scheduleDNServerStartCheck(); + } } if (!groupMemberId.getPeerId().equals(newLeaderId)) { @@ -300,10 +296,11 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, return; } - LOG.info("current SCM becomes leader of term {}.", currentLeaderTerm); + long currentTerm = scm.getScmHAManager().getRatisServer().getDivision() + .getInfo().getCurrentTerm(); + LOG.info("current SCM becomes leader of term {}.", currentTerm); - scm.getScmContext().updateLeaderAndTerm(true, - currentLeaderTerm.get()); + scm.getScmContext().updateLeaderAndTerm(true, currentTerm); scm.getSequenceIdGen().invalidateBatch(); try { @@ -369,19 +366,100 @@ public void notifyTermIndexUpdated(long term, long index) { if (transactionBuffer != null) { transactionBuffer.updateLatestTrxInfo(TransactionInfo.valueOf(term, index)); } + } + + public boolean getIsStateMachineReady() { + return isStateMachineReady.get(); + } - if (currentLeaderTerm.get() == term) { - // This means after a restart, all pending transactions have been applied. + /** + * Start the DN protocol server and trigger safe mode re-evaluation. + * + *

In HA mode the DN server is deliberately not started during + * {@link org.apache.hadoop.hdds.scm.server.StorageContainerManager#start()}. + * Instead it is deferred until the SCM state machine has caught up with + * the leader's committed log entries, so that DN heartbeats are processed + * against the latest container/pipeline state rather than a stale snapshot. + * + *

The method is guarded by {@code isStateMachineReady} (CAS) to ensure + * the non-idempotent {@code DatanodeProtocolServer.start()} is invoked + * exactly once. + */ + private void tryStartDNServerAndRefreshSafeMode() { + if (isStateMachineReady.get()) { + return; + } + if (scm.getScmContext().isLeader() || isFollowerCaughtUp()) { if (isStateMachineReady.compareAndSet(false, true)) { - // Refresh Safemode rules state if not already done. + scm.getDatanodeProtocolServer().start(); scm.getScmSafeModeManager().refreshAndValidate(); } - currentLeaderTerm.set(-1L); } } - public boolean getIsStateMachineReady() { - return isStateMachineReady.get(); + /** + * Check whether this follower's state machine has caught up with the + * leader's committed log entries. + * @return true if {@code lastAppliedIndex >= leaderCommitIndex} + */ + private boolean isFollowerCaughtUp() { + try { + RaftServer.Division division = scm.getScmHAManager() + .getRatisServer().getDivision(); + DivisionInfo divisionInfo = division.getInfo(); + long lastAppliedIndex = divisionInfo.getLastAppliedIndex(); + + RaftPeerId leaderId = divisionInfo.getLeaderId(); + if (leaderId != null) { + for (RaftProtos.CommitInfoProto info : division.getCommitInfos()) { + if (info.getServer().getId().equals(leaderId.toByteString())) { + long leaderCommit = info.getCommitIndex(); + boolean caughtUp = lastAppliedIndex >= leaderCommit; + if (caughtUp) { + LOG.info("Followers caught up with the leader: lastAppliedIndex={}, leaderCommit={}", + lastAppliedIndex, leaderCommit); + } else { + LOG.debug("Followers did not catch up with the leader: lastAppliedIndex={}, leaderCommit={}", + lastAppliedIndex, leaderCommit); + } + return caughtUp; + } + } + } + + LOG.warn("Leader commit index not available yet, leaderId={}", leaderId); + return false; + } catch (Exception e) { + LOG.warn("Failed to check follower catch-up status", e); + return false; + } + } + + /** + * Schedule deferred checks for starting the DN protocol server. + */ + private void scheduleDNServerStartCheck() { + CompletableFuture.runAsync(() -> { + long delayMs = 1000; + final long maxTotalMs = 5 * 60 * 1000; + long elapsed = 0; + while (!isStateMachineReady.get() && elapsed < maxTotalMs) { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + elapsed += delayMs; + tryStartDNServerAndRefreshSafeMode(); + delayMs = Math.min(delayMs * 2, 10_000); + } + if (isStateMachineReady.get()) { + LOG.info("DN server started via deferred check after {}ms", elapsed); + } else { + LOG.warn("DN server start check expired after {}ms", elapsed); + } + }); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index dcfc6aa52eb..bf594fb4df0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1573,8 +1573,10 @@ public void start() throws IOException { } getBlockProtocolServer().start(); - // start datanode protocol server - getDatanodeProtocolServer().start(); + // If HA is enabled, start datanode protocol server once leader is ready. + if (!scmStorageConfig.isSCMHAEnabled()) { + getDatanodeProtocolServer().start(); + } if (getSecurityProtocolServer() != null) { getSecurityProtocolServer().start(); persistSCMCertificates(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMFollowerCatchupWithContainerClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMFollowerCatchupWithContainerClose.java new file mode 100644 index 00000000000..11772acfe85 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMFollowerCatchupWithContainerClose.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.function.BooleanSupplier; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verifies that a follower SCM correctly replays container state changes + * (OPEN -> CLOSED) that were committed while it was offline. After the + * follower restarts, catches up, and is promoted to leader, all containers + * must still have the expected replica count and all keys must be readable. + * + *

This exercises the deferred DN-server start introduced by HDDS-14989: + * the restarted follower must finish Raft log replay before + * accepting datanode heartbeats, so that container reports are processed + * against the up-to-date state. + */ +@Timeout(300) +public class TestSCMFollowerCatchupWithContainerClose { + private static final Logger LOG = + LoggerFactory.getLogger(TestSCMFollowerCatchupWithContainerClose.class); + + private static final String OM_SERVICE_ID = "om-service-test1"; + private static final String SCM_SERVICE_ID = "scm-service-test1"; + private static final int NUM_OF_SCMS = 3; + private static final int NUM_OF_DNS = 3; + private static final int NUM_KEYS = 5; + private static final String VOLUME = "testvol"; + private static final String BUCKET = "testbucket"; + + private MiniOzoneHAClusterImpl cluster; + + @BeforeEach + void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newHABuilder(conf) + .setOMServiceId(OM_SERVICE_ID) + .setSCMServiceId(SCM_SERVICE_ID) + .setNumOfOzoneManagers(1) + .setNumOfStorageContainerManagers(NUM_OF_SCMS) + .setNumOfActiveSCMs(NUM_OF_SCMS) + .build(); + cluster.waitForClusterToBeReady(); + } + + @AfterEach + void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private Set createrKeys(byte[] keyData) throws IOException { + Set containerIds = new LinkedHashSet<>(); + try (OzoneClient client = cluster.newClient()) { + ObjectStore store = client.getObjectStore(); + store.createVolume(VOLUME); + OzoneVolume volume = store.getVolume(VOLUME); + volume.createBucket(BUCKET); + OzoneBucket bucket = volume.getBucket(BUCKET); + + for (int i = 0; i < NUM_KEYS; i++) { + String keyName = "key-" + i; + TestDataUtil.createKey(bucket, keyName, + RatisReplicationConfig.getInstance(THREE), keyData); + OzoneKeyDetails keyDetails = bucket.getKey(keyName); + keyDetails.getOzoneKeyLocations() + .forEach(loc -> containerIds.add(loc.getContainerID())); + } + } + return containerIds; + } + + @Test + void testFollowerCatchupAfterContainerClose() throws Exception { + // ---- Step 1: create volume / bucket / keys with THREE replicas ---- + byte[] keyData = ("value-of-key").getBytes(UTF_8); + Set containerIds = createrKeys(keyData); + assertFalse(containerIds.isEmpty(), "Should have created containers"); + + // ---- Step 2: stop the follower so it misses container-close txns ---- + StorageContainerManager followerScm = null; + StorageContainerManager leaderScm = null; + for (StorageContainerManager scm : cluster.getStorageContainerManagers()) { + if (scm.checkLeader()) { + leaderScm = scm; + } else if (followerScm == null) { + followerScm = scm; + } + } + cluster.shutdownStorageContainerManager(followerScm); + followerScm.join(); + + // ---- Step 3: close every container while the follower is offline ---- + for (long cid : containerIds) { + cluster.getStorageContainerLocationClient().closeContainer(cid); + } + StorageContainerManager leader = leaderScm; + for (long cid : containerIds) { + ContainerID id = ContainerID.valueOf(cid); + waitForContainerState(leader, id, LifeCycleState.CLOSED); + } + + // ---- Step 5: restart the follower and wait for safe-mode exit ---- + StorageContainerManager newFollower = + cluster.restartStorageContainerManager(followerScm, false); + BooleanSupplier safeModeExited = () -> !newFollower.isInSafeMode(); + GenericTestUtils.waitFor(safeModeExited, 1000, 120_000); + + // ---- Step 6: transfer leadership to the restarted follower ---- + cluster.getStorageContainerLocationClient() + .transferLeadership(newFollower.getScmId()); + GenericTestUtils.waitFor(newFollower::checkLeader, 1000, 60_000); + LOG.info("Leadership transferred to {}", newFollower.getScmId()); + + // ---- Step 7: verify container state and replica count = 3 ---- + for (long cid : containerIds) { + ContainerID id = ContainerID.valueOf(cid); + + assertEquals(LifeCycleState.CLOSED, + newFollower.getContainerManager().getContainer(id).getState(), + "Container " + cid + " should be CLOSED on new leader"); + + waitForReplicaCount(newFollower, id, NUM_OF_DNS); + + assertEquals(NUM_OF_DNS, + newFollower.getContainerManager() + .getContainerReplicas(id).size(), + "Container " + cid + " should have " + NUM_OF_DNS + " replicas"); + } + LOG.info("All containers verified CLOSED with {} replicas", NUM_OF_DNS); + + // ---- Step 8: verify every key is still readable ---- + try (OzoneClient client = cluster.newClient()) { + ObjectStore store = client.getObjectStore(); + OzoneBucket bucket = store.getVolume(VOLUME).getBucket(BUCKET); + + for (int i = 0; i < NUM_KEYS; i++) { + String keyName = "key-" + i; + try (OzoneInputStream is = bucket.readKey(keyName)) { + byte[] readData = new byte[keyData.length]; + int bytesRead = is.read(readData); + assertEquals(keyData.length, bytesRead); + assertArrayEquals(keyData, readData); + } + } + } + } + + private static void waitForContainerState( + StorageContainerManager scm, ContainerID id, + LifeCycleState expected) throws Exception { + ContainerManager cm = scm.getContainerManager(); + BooleanSupplier check = () -> { + try { + return cm.getContainer(id).getState() == expected; + } catch (Exception e) { + return false; + } + }; + GenericTestUtils.waitFor(check, 1000, 120_000); + } + + private static void waitForReplicaCount( + StorageContainerManager scm, ContainerID id, + int expectedCount) throws Exception { + ContainerManager cm = scm.getContainerManager(); + BooleanSupplier check = () -> { + try { + return cm.getContainerReplicas(id).size() == expectedCount; + } catch (Exception e) { + return false; + } + }; + GenericTestUtils.waitFor(check, 1000, 120_000); + } +}