Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,13 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour

private final LastLogMark lastLogMark = new LastLogMark(0, 0);

// Guards checkpointComplete to ensure lastMark only advances forward.
// Without this, concurrent checkpointComplete calls from SyncThread and
// SingleDirectoryDbLedgerStorage.flush() can overwrite lastMark backwards,
// causing journal files to be deleted while still referenced by the older mark.
private final Object checkpointLock = new Object();
private final LogMark lastPersistedMark = new LogMark(0, 0);

private static final String LAST_MARK_DEFAULT_NAME = "lastMark";

private final String lastMarkFileName;
Expand Down Expand Up @@ -766,6 +773,11 @@ public Checkpoint newCheckpoint() {
/**
* Telling journal a checkpoint is finished.
*
* <p>Multiple threads (SyncThread and DbLedgerStorage flush) may call this concurrently.
* A monotonic check ensures the lastMark file only advances forward — a later call with
* an older mark is safely skipped. This prevents the race where a slower flush overwrites
* lastMark backwards, causing referenced journal files to be garbage collected prematurely.
*
* @throws IOException
*/
@Override
Expand All @@ -776,6 +788,15 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;

// Monotonic check: only advance lastMark forward, never backwards.
synchronized (checkpointLock) {
if (mark.getCurMark().compare(lastPersistedMark) <= 0) {
return;
}
lastPersistedMark.setLogMark(
mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset());
}

mark.rollLog(mark);
Comment thread
void-ptr974 marked this conversation as resolved.
if (compact) {
// list the journals that have been marked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
Expand Down Expand Up @@ -881,4 +882,121 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger
bookie.getLedgerStorage().flush();
Assert.assertEquals(pendingDeletedLedgers.size(), 0);
}

/**
* Simulates the full journal-missing scenario caused by concurrent checkpointComplete
* calls in single-dir mode and verifies the monotonic fix prevents it.
*
* <p>Timeline of the original bug:
* <pre>
* 1. SDLS.flush() starts: newCheckpoint → mark(5, 0), begins flushing data
* 2. SyncThread starts: newCheckpoint → mark(7, 0), waits for flushMutex
* 3. SDLS.flush() completes flushing, releases flushMutex
* 4. SyncThread acquires flushMutex, finds writeCache empty, returns
* 5. SyncThread calls checkpointComplete(mark=7, compact=true) FIRST
* → rollLog(7), GC deletes journal files 3,4,5,6 (all with id < 7)
* 6. SDLS.flush() calls checkpointComplete(mark=5, compact=true) SECOND
* → rollLog(5) — OVERWRITES lastMark backwards from 7 to 5!
* → journal file 5 was already deleted in step 5
* 7. Bookie restarts: reads lastMark=5, looks for journal file 5 → MISSING!
* → throws "Recovery log 5 is missing"
* </pre>
*
* <p>With the monotonic fix, step 6 is skipped (mark 5 <= lastPersistedMark 7),
* so lastMark stays at 7 and the restart succeeds.
*/
@Test
public void testConcurrentCheckpointCompleteJournalMissing() throws Exception {
File baseDir = new File(tmpDir, "journalMissingTest");
File ledgerDir = new File(baseDir, "ledger");
File journalBaseDir = new File(baseDir, "journal");
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(1000);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
conf.setJournalDirName(journalBaseDir.getCanonicalPath());
// Set maxBackupJournals to 0 so GC aggressively deletes all old journals
conf.setMaxBackupJournals(0);

BookieImpl bookie = new TestBookieImpl(conf);
try {
Journal journal = bookie.getJournals().get(0);
File journalDir = journal.getJournalDirectory();
File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");

// Create fake journal files: 3.txn, 4.txn, 5.txn, 6.txn, 7.txn, 8.txn
for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Failed to create journal file " + id, journalFile.createNewFile());
}

// Verify all journal files exist
for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal file " + id + " should exist", journalFile.exists());
}

CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());

// === Simulate the race ===

// Step 1: SDLS.flush() captures checkpoint at mark(5, 100)
journal.getLastLogMark().getCurMark().setLogMark(5, 100);
CheckpointSource.Checkpoint cpFlush = checkpointSource.newCheckpoint();

// Step 2: SyncThread captures checkpoint at mark(7, 200) — newer position
journal.getLastLogMark().getCurMark().setLogMark(7, 200);
CheckpointSource.Checkpoint cpSync = checkpointSource.newCheckpoint();

// Step 5: SyncThread completes FIRST — checkpointComplete(mark=7, compact=true)
// This should: rollLog to 7, GC journals with id < 7 (deletes 3,4,5,6)
checkpointSource.checkpointComplete(cpSync, true);

LogMark markAfterSync = readLogMark(ledgerDirMark);
assertEquals("lastMark should be at 7 after SyncThread", 7, markAfterSync.getLogFileId());
assertEquals(200, markAfterSync.getLogFileOffset());

// Verify journals 3,4,5,6 were GC'd, 7,8 still exist
for (long id = 3; id <= 6; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertFalse("Journal " + id + " should have been GC'd", journalFile.exists());
}
for (long id = 7; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal " + id + " should still exist", journalFile.exists());
}

// Step 6: SDLS.flush() completes SECOND — checkpointComplete(mark=5, compact=true)
// WITHOUT FIX: rollLog would overwrite lastMark to 5, but journal 5 is already deleted!
// WITH FIX: mark 5 <= lastPersistedMark 7, so this is skipped entirely.
checkpointSource.checkpointComplete(cpFlush, true);

Comment thread
void-ptr974 marked this conversation as resolved.
// Verify: lastMark must NOT regress to 5. Should stay at 7.
LogMark markAfterFlush = readLogMark(ledgerDirMark);
assertEquals("lastMark must not regress after older checkpoint completes",
7, markAfterFlush.getLogFileId());
assertEquals(200, markAfterFlush.getLogFileOffset());

// Step 7: Simulate bookie restart — read lastMark and check journal exists
journal.getLastLogMark().readLog();
LogMark restartMark = journal.getLastLogMark().getCurMark();
assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId());
Comment thread
void-ptr974 marked this conversation as resolved.
Outdated

// The journal file pointed to by lastMark must exist
File markedJournal = new File(journalDir,
Long.toHexString(restartMark.getLogFileId()) + ".txn");
assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery",
markedJournal.exists());

// Verify that listJournalIds finds the expected journal for replay
List<Long> replayLogs = Journal.listJournalIds(journalDir,
journalId -> journalId >= restartMark.getLogFileId());
assertTrue("Replay journal list must contain the marked journal",
replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId());
} finally {
bookie.getLedgerStorage().shutdown();
Comment thread
void-ptr974 marked this conversation as resolved.
}
}
}
Loading