Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class BookieImpl implements Bookie {
private final ByteBufAllocator allocator;

private final boolean writeDataToJournal;
private final boolean journalHashBasedSelection;

// Write Callback do nothing
static class NopWriteCallback implements WriteCallback {
Expand Down Expand Up @@ -406,6 +407,7 @@ public BookieImpl(ServerConfiguration conf,
this.ledgerDirsManager = ledgerDirsManager;
this.indexDirsManager = indexDirsManager;
this.writeDataToJournal = conf.getJournalWriteData();
this.journalHashBasedSelection = conf.getJournalHashBasedSelection();
this.allocator = allocator;
this.registrationManager = registrationManager;
stateManager = initializeStateManager();
Expand Down Expand Up @@ -940,8 +942,30 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
return handles.getHandle(ledgerId, masterKey, false);
}

/**
* Constant for Fibonacci hashing. Multiplying by this constant adds randomness, breaking
* patterns in ledger IDs that would otherwise cause uneven journal distribution.
*
* <p>This is the same constant used as {@code GOLDEN_GAMMA} in
* {@link java.util.concurrent.ThreadLocalRandom} and {@link java.util.SplittableRandom}
* for seed mixing.
*/
private static final long FIBONACCI_HASH_CONSTANT = 0x9E3779B97F4A7C15L;

/**
* Returns the journal to use for the given ledger.
*
* <p>When hash-based selection is enabled, uses Fibonacci hashing to distribute ledgers
* across journals. The xor with the right-shifted value folds the high 32 bits into
* the low 32 bits, adding more mixing prior to the modulo.
*/
private Journal getJournal(long ledgerId) {
return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
long index = ledgerId;
if (journalHashBasedSelection) {
index = ledgerId * FIBONACCI_HASH_CONSTANT;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about putting this block into a static method so that we can add unit tests ?

The function gets the index and the number of journals and computes the journal index

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review. I added the tests and factored out the static method.

index ^= index >>> 32;
}
return journals.get(MathUtils.signSafeMod(index, journals.size()));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
protected static final String JOURNAL_CHANNEL_PROVIDER = "journalChannelProvider";
protected static final String JOURNAL_REUSE_FILES = "journalReuseFiles";
protected static final String JOURNAL_HASH_BASED_SELECTION = "journalHashBasedSelection";
// backpressure control
protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
Expand Down Expand Up @@ -2335,6 +2336,40 @@ public ServerConfiguration setJournalWriteData(boolean journalWriteData) {
return this;
}

/**
* Whether to use hash-based journal selection for ledgers.
*
* <p>When enabled, ledger IDs are hashed using Fibonacci hashing before
* selecting a journal. In some deployments (e.g., sharded setups), patterns
* in ledger IDs can cause uneven distribution across journals. This setting
* breaks those patterns.
*
* <p>WARNING: This setting is not backwards compatible. Changing this
* setting on an existing bookie will cause ledgers to be mapped to
* different journals than before, which can cause issues during recovery.
* Only enable this on new clusters or after careful migration planning.
*
* <p>Default is false for backwards compatibility.
*
* @return whether hash-based journal selection is enabled
*/
public boolean getJournalHashBasedSelection() {
return getBoolean(JOURNAL_HASH_BASED_SELECTION, false);
}

/**
* Enable or disable hash-based journal selection for ledgers.
*
* <p>See {@link #getJournalHashBasedSelection()} for details.
*
* @param enabled whether to enable hash-based journal selection
* @return server configuration object
*/
public ServerConfiguration setJournalHashBasedSelection(boolean enabled) {
setProperty(JOURNAL_HASH_BASED_SELECTION, enabled);
return this;
}

/**
* Enable or disable journal syncs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,51 @@ public void testMultipleWritesAndBookieRestart() throws Exception {
}
}

/**
* Test that hash-based selection still allows correct read/write operations.
*
* This integration test verifies that when journalHashBasedSelection is enabled,
* entries can still be written and read back correctly.
*/
@Test
public void testHashBasedSelectionReadWrite() throws Exception {
// Restart bookies with hash-based selection enabled
restartBookies(conf -> {
conf.setJournalHashBasedSelection(true);
return conf;
});

// Create ledgers and write entries
final int numLedgers = 8;
final int numEntries = 10;
List<LedgerHandle> writeHandles = new ArrayList<>();

for (int i = 0; i < numLedgers; i++) {
writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]));
}

for (int i = 0; i < numEntries; i++) {
for (int j = 0; j < numLedgers; j++) {
writeHandles.get(j).addEntry(("entry-" + i).getBytes());
}
}

// Close write handles
for (LedgerHandle lh : writeHandles) {
lh.close();
}

// Read back and verify
for (int i = 0; i < numLedgers; i++) {
LedgerHandle readHandle = bkc.openLedger(writeHandles.get(i).getId(), DigestType.CRC32, new byte[0]);
Enumeration<LedgerEntry> entries = readHandle.readEntries(0, numEntries - 1);

for (int j = 0; j < numEntries; j++) {
LedgerEntry entry = entries.nextElement();
assertEquals("entry-" + j, new String(entry.getEntry()));
}
readHandle.close();
}
}

}
13 changes: 13 additions & 0 deletions conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ journalDirectories=/tmp/bk-txn
# Set the Channel Provider for journal.
# The default value is
# journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider

# Enable hash-based selection of journals for ledgers in multi-journal setups.
# By default, ledgers are assigned to journals using ledgerId % numJournals.
# In some deployments (e.g., sharded setups), patterns in ledger IDs can cause
# uneven distribution across journals. When enabled, Fibonacci hashing is used
# for better distribution.
#
# WARNING: This setting is not backwards compatible. Changing this setting on an
# existing bookie will cause ledgers to be mapped to different journals than before,
# which can cause issues during recovery. Only enable this on new clusters or after
# careful migration planning.
# journalHashBasedSelection=false

#############################################################################
## Ledger storage settings
#############################################################################
Expand Down
5 changes: 3 additions & 2 deletions site3/website/docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ The table below lists parameters that you can set to configure bookies. All conf
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalAlignmentSize | All the journal writes and commits should be aligned to given size. If not, zeros will be padded to align to given size. | 512 |
| journalBufferedEntriesThreshold | Maximum entries to buffer to impose on a journal write to achieve grouping. | |
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalQueueSize | Set the size of the journal queue. | 10000 |
| journalFlushWhenQueueEmpty | If we should flush the journal when journal queue is empty. | false |
| journalQueueSize | Set the size of the journal queue. | 10000 |
| journalHashBasedSelection | Enable hash-based selection of journals for ledgers in multi-journal setups. By default, ledgers are assigned to journals using ledgerId % numJournals. In some deployments (e.g., sharded setups), patterns in ledger IDs can cause uneven distribution across journals. When enabled, Fibonacci hashing is used for better distribution.<br /><br />**WARNING**: This setting is not backwards compatible. Changing this setting on an existing bookie will cause ledgers to be mapped to different journals than before, which can cause issues during recovery. Only enable this on new clusters or after careful migration planning. | false |


## Ledger storage settings
Expand Down