Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 75 additions & 34 deletions dwio/nimble/index/ClusterIndexGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,50 @@ uint32_t getFirstStripe(
return stripeGroupIndex == 0 ? 0 : stripeCounts->Get(stripeGroupIndex - 1);
}

/// Parsed chunk index for a stripe's chunk range.
struct StripeChunkRange {
uint32_t startChunkIndex;
uint32_t endChunkIndex;
const flatbuffers::Vector<uint32_t>* chunkRows;
const flatbuffers::Vector<uint32_t>* chunkOffsets;
const serialization::KeyStreamChunkIndex* chunkIndex;

ChunkLocation locationAt(uint32_t chunkOffset) const {
const uint32_t rowOffset =
chunkOffset == startChunkIndex ? 0 : chunkRows->Get(chunkOffset - 1);
return ChunkLocation{chunkOffsets->Get(chunkOffset), rowOffset};
}
};

/// Returns the chunk range for a stripe, or nullopt if the stripe has no
/// chunk index (single chunk per stripe).
std::optional<StripeChunkRange> getStripeChunkRange(
const serialization::StripeClusterIndex* root,
uint32_t stripeOffset,
uint32_t stripeCount) {
const auto* chunkIndex = root->chunk_index();
if (chunkIndex == nullptr) {
return std::nullopt;
}

const auto* chunkCounts = chunkIndex->chunk_counts();
NIMBLE_CHECK_NOT_NULL(chunkCounts);
NIMBLE_CHECK_EQ(stripeCount, chunkCounts->size());

const uint32_t startChunkIndex =
stripeOffset == 0 ? 0 : chunkCounts->Get(stripeOffset - 1);
const uint32_t endChunkIndex = chunkCounts->Get(stripeOffset);

const auto* chunkRows = chunkIndex->chunk_rows();
NIMBLE_CHECK_NOT_NULL(chunkRows);

const auto* chunkOffsets = chunkIndex->chunk_offsets();
NIMBLE_CHECK_NOT_NULL(chunkOffsets);

return StripeChunkRange{
startChunkIndex, endChunkIndex, chunkRows, chunkOffsets, chunkIndex};
}

} // namespace

std::shared_ptr<ClusterIndexGroup> ClusterIndexGroup::create(
Expand All @@ -79,60 +123,57 @@ ClusterIndexGroup::ClusterIndexGroup(
std::optional<ChunkLocation> ClusterIndexGroup::lookupChunk(
uint32_t stripe,
std::string_view encodedKey) const {
const uint32_t stripeOffset = this->stripeOffset(stripe);
const auto* root = asFlatBuffersRoot<serialization::StripeClusterIndex>(
metadata_->content());

// No chunk index sub-table means single chunk per stripe — return default
// location at offset 0.
const auto* chunkIndex = root->chunk_index();
if (chunkIndex == nullptr) {
auto range = getStripeChunkRange(root, stripeOffset(stripe), stripeCount_);
if (!range.has_value()) {
return ChunkLocation{0, 0};
}

// Get chunk counts to determine the search range for this stripe.
const auto* chunkCounts = chunkIndex->chunk_counts();
NIMBLE_CHECK_NOT_NULL(chunkCounts);
NIMBLE_CHECK_EQ(stripeCount_, chunkCounts->size());

// Determine the chunk range for this stripe
const uint32_t startChunkIndex =
stripeOffset == 0 ? 0 : chunkCounts->Get(stripeOffset - 1);
const uint32_t endChunkIndex = chunkCounts->Get(stripeOffset);

const auto* chunkKeys = chunkIndex->chunk_keys();
const auto* chunkKeys = range->chunkIndex->chunk_keys();
NIMBLE_CHECK_NOT_NULL(chunkKeys);
NIMBLE_CHECK_LE(endChunkIndex, chunkKeys->size());
NIMBLE_CHECK_LE(range->endChunkIndex, chunkKeys->size());

// Binary search for the chunk key within this stripe's range.
// Chunk keys are the last key in each chunk. lower_bound finds the first
// chunk whose last key >= encodedKey, meaning the chunk that may contain
// the target key.
auto it = std::lower_bound(
chunkKeys->begin() + startChunkIndex,
chunkKeys->begin() + endChunkIndex,
chunkKeys->begin() + range->startChunkIndex,
chunkKeys->begin() + range->endChunkIndex,
encodedKey,
[](const flatbuffers::String* a, std::string_view b) {
return a->string_view() < b;
});
if (it == chunkKeys->begin() + endChunkIndex) {
if (it == chunkKeys->begin() + range->endChunkIndex) {
return std::nullopt;
}
return range->locationAt(it - chunkKeys->begin());
}

const uint32_t chunkOffset = it - chunkKeys->begin();

const auto* chunkRows = chunkIndex->chunk_rows();
NIMBLE_CHECK_NOT_NULL(chunkRows);
NIMBLE_CHECK_EQ(chunkRows->size(), chunkKeys->size());

const auto* chunkOffsets = chunkIndex->chunk_offsets();
NIMBLE_CHECK_NOT_NULL(chunkOffsets);
NIMBLE_CHECK_EQ(chunkOffsets->size(), chunkKeys->size());
std::optional<ChunkLocation> ClusterIndexGroup::lookupChunk(
uint32_t stripe,
uint32_t row) const {
const auto* root = asFlatBuffersRoot<serialization::StripeClusterIndex>(
metadata_->content());
auto range = getStripeChunkRange(root, stripeOffset(stripe), stripeCount_);
if (!range.has_value()) {
return ChunkLocation{0, 0};
}

// Return the found chunk location.
const uint32_t rowOffset =
chunkOffset == startChunkIndex ? 0 : chunkRows->Get(chunkOffset - 1);
return ChunkLocation{chunkOffsets->Get(chunkOffset), rowOffset};
// Binary search on cumulative chunk_rows to find the chunk containing the
// target row.
auto it = std::lower_bound(
range->chunkRows->begin() + range->startChunkIndex,
range->chunkRows->begin() + range->endChunkIndex,
row + 1,
[](uint32_t chunkRowCount, uint32_t target) {
return chunkRowCount < target;
});
if (it == range->chunkRows->begin() + range->endChunkIndex) {
return std::nullopt;
}
return range->locationAt(it - range->chunkRows->begin());
}

velox::common::Region ClusterIndexGroup::keyStreamRegion(
Expand Down
6 changes: 6 additions & 0 deletions dwio/nimble/index/ClusterIndexGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class ClusterIndexGroup {
uint32_t stripe,
std::string_view encodedKey) const;

/// Lookup chunk by stripe and row position.
/// Returns the chunk's stream offset and row offset.
/// When there is no chunk index (single chunk per stripe), returns default
/// location {0, 0}.
std::optional<ChunkLocation> lookupChunk(uint32_t stripe, uint32_t row) const;

/// Returns the region for the key stream of the specified stripe.
/// The key stream is stored in the ClusterIndexGroup metadata.
velox::common::Region keyStreamRegion(
Expand Down
22 changes: 22 additions & 0 deletions dwio/nimble/index/ClusterIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ std::optional<uint32_t> ClusterIndexReader::seekAtOrAfterInChunk(
return encoding_->seekAtOrAfter(&encodedKey);
}

std::string ClusterIndexReader::keyAtRow(uint32_t row) {
const auto chunkLocation = indexGroup_->lookupChunk(stripeIndex_, row);
NIMBLE_CHECK(
chunkLocation.has_value(),
"Row {} is out of range for stripe {}",
row,
stripeIndex_);

seekToChunk(chunkLocation->streamOffset);
NIMBLE_CHECK_NOT_NULL(encoding_);

encoding_->reset();
const uint32_t skipCount = row - chunkLocation->rowOffset;
if (skipCount > 0) {
encoding_->skip(skipCount);
}

std::string_view result;
encoding_->materialize(1, &result);
return std::string(result);
}

bool ClusterIndexReader::ensureInput(int size) {
while (inputSize_ < size) {
if (inputSize_ > 0) {
Expand Down
4 changes: 4 additions & 0 deletions dwio/nimble/index/ClusterIndexReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class ClusterIndexReader {
/// the range (i.e., greater than all stripe keys).
std::optional<uint32_t> seekAtOrAfter(std::string_view encodedKey);

/// Returns the encoded key at the given row position within the stripe.
/// Used to compute resume keys when lookup results are truncated.
std::string keyAtRow(uint32_t row);

private:
ClusterIndexReader(
std::unique_ptr<velox::dwio::common::SeekableInputStream> input,
Expand Down
72 changes: 72 additions & 0 deletions dwio/nimble/index/tests/ClusterIndexGroupTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,76 @@ TEST_F(ClusterIndexGroupTest, lookupChunkWithEncodedKey) {
}
}

TEST_F(ClusterIndexGroupTest, lookupChunk) {
std::vector<std::string> indexColumns = {"col1"};
std::string minKey = "aaa";
std::vector<Stripe> stripes = {
{.streams = {{.numChunks = 1, .chunkRows = {100}, .chunkOffsets = {0}}},
.keyStream =
{.streamOffset = 0,
.streamSize = 100,
.stream =
{.numChunks = 3,
.chunkRows = {100, 200, 300},
.chunkOffsets = {0, 50, 120}},
.chunkKeys = {"ccc", "fff", "iii"}}},
{.streams = {{.numChunks = 1, .chunkRows = {200}, .chunkOffsets = {0}}},
.keyStream = {
.streamOffset = 100,
.streamSize = 150,
.stream =
{.numChunks = 2,
.chunkRows = {150, 300},
.chunkOffsets = {0, 80}},
.chunkKeys = {"mmm", "ppp"}}}};
std::vector<int> stripeGroups = {2};

auto indexBuffers =
createTestClusterIndex(indexColumns, minKey, stripes, stripeGroups);
auto clusterIndexGroup = createClusterIndexGroup(indexBuffers, 0);
ASSERT_NE(clusterIndexGroup, nullptr);

struct {
uint32_t stripeIndex;
uint32_t row;
std::optional<uint32_t> expectedStreamOffset;
std::optional<uint32_t> expectedRowOffset;
} testCases[] = {
// Stripe 0: per-chunk rows = {100, 200, 300}, cumulative = {100, 300,
// 600}. Chunk 0: rows [0, 100), Chunk 1: rows [100, 300), Chunk 2: rows
// [300, 600). chunkOffsets = {0, 50, 120}
{0, 0, 0, 0}, // First row of chunk 0
{0, 99, 0, 0}, // Last row of chunk 0
{0, 100, 50, 100}, // First row of chunk 1
{0, 299, 50, 100}, // Last row of chunk 1
{0, 300, 120, 300}, // First row of chunk 2
{0, 599, 120, 300}, // Last row of chunk 2
{0, 600, std::nullopt, std::nullopt}, // Out of range

// Stripe 1: per-chunk rows = {150, 300}, cumulative = {150, 450}
// Chunk 0: rows [0, 150), Chunk 1: rows [150, 450)
// chunkOffsets = {0, 80}
{1, 0, 0, 0}, // First row of chunk 0
{1, 149, 0, 0}, // Last row of chunk 0
{1, 150, 80, 150}, // First row of chunk 1
{1, 449, 80, 150}, // Last row of chunk 1
{1, 450, std::nullopt, std::nullopt}, // Out of range
};

for (const auto& testCase : testCases) {
SCOPED_TRACE(
fmt::format(
"stripeIndex {} row {}", testCase.stripeIndex, testCase.row));
auto result =
clusterIndexGroup->lookupChunk(testCase.stripeIndex, testCase.row);
if (testCase.expectedStreamOffset.has_value()) {
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result->streamOffset, testCase.expectedStreamOffset.value());
EXPECT_EQ(result->rowOffset, testCase.expectedRowOffset.value());
} else {
EXPECT_FALSE(result.has_value());
}
}
}

} // namespace facebook::nimble::index::test
108 changes: 108 additions & 0 deletions dwio/nimble/index/tests/ClusterIndexReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,112 @@ TEST_F(ClusterIndexReaderTest, binaryKeys) {
}
}

TEST_F(ClusterIndexReaderTest, keyAtRowSingleChunk) {
std::vector<std::string> keys = {"key_a", "key_b", "key_c", "key_d", "key_e"};
auto encodedStream = encodeKeyStream({keys});

std::vector<std::string> indexColumns = {"col1"};
std::string minKey = "key_0";
std::vector<Stripe> stripes = {createStripe(
{.streamOffset = 0,
.streamSize = static_cast<uint32_t>(encodedStream.data.size()),
.stream = {.numChunks = 1, .chunkRows = {5}, .chunkOffsets = {0}},
.chunkKeys = {"key_e"}})};
std::vector<int> stripeGroups = {1};

auto indexBuffers =
createTestClusterIndex(indexColumns, minKey, stripes, stripeGroups);
auto clusterIndexGroup =
createClusterIndexGroup(indexBuffers, /*stripeGroupIndex=*/0);

auto reader = ClusterIndexReader::create(
createInputStream(encodedStream.data), 0, clusterIndexGroup, pool_.get());

// Read each key by row position.
for (size_t i = 0; i < keys.size(); ++i) {
SCOPED_TRACE(fmt::format("row: {}", i));
EXPECT_EQ(reader->keyAtRow(i), keys[i]);
}
}

TEST_F(ClusterIndexReaderTest, keyAtRowMultipleChunks) {
std::vector<std::vector<std::string>> keyChunks = {
{"key_a", "key_b", "key_c"}, // Chunk 0: rows 0-2
{"key_d", "key_e", "key_f"}, // Chunk 1: rows 3-5
{"key_g", "key_h"}, // Chunk 2: rows 6-7
};
auto encodedStream = encodeKeyStream(keyChunks);

std::vector<std::string> indexColumns = {"col1"};
std::string minKey = "key_0";
std::vector<Stripe> stripes = {createStripe(
{.streamOffset = 0,
.streamSize = static_cast<uint32_t>(encodedStream.data.size()),
.stream =
{.numChunks = 3,
.chunkRows = {3, 3, 2},
.chunkOffsets = encodedStream.chunkOffsets},
.chunkKeys = {"key_c", "key_f", "key_h"}})};
std::vector<int> stripeGroups = {1};

auto indexBuffers =
createTestClusterIndex(indexColumns, minKey, stripes, stripeGroups);
auto clusterIndexGroup =
createClusterIndexGroup(indexBuffers, /*stripeGroupIndex=*/0);

auto reader = ClusterIndexReader::create(
createInputStream(encodedStream.data), 0, clusterIndexGroup, pool_.get());

// All keys in order across chunks.
std::vector<std::string> allKeys = {
"key_a", "key_b", "key_c", "key_d", "key_e", "key_f", "key_g", "key_h"};
for (size_t i = 0; i < allKeys.size(); ++i) {
SCOPED_TRACE(fmt::format("row: {}", i));
EXPECT_EQ(reader->keyAtRow(i), allKeys[i]);
}

// Read in non-sequential order to test chunk switching.
EXPECT_EQ(reader->keyAtRow(7), "key_h"); // Last row
EXPECT_EQ(reader->keyAtRow(0), "key_a"); // First row
EXPECT_EQ(reader->keyAtRow(3), "key_d"); // Chunk boundary
EXPECT_EQ(reader->keyAtRow(5), "key_f"); // Last in chunk 1
EXPECT_EQ(reader->keyAtRow(6), "key_g"); // First in chunk 2
}

TEST_F(ClusterIndexReaderTest, keyAtRowChunkBoundary) {
std::vector<std::vector<std::string>> keyChunks = {
{"key_a", "key_b"}, // Chunk 0: rows 0-1
{"key_c", "key_d"}, // Chunk 1: rows 2-3
};
auto encodedStream = encodeKeyStream(keyChunks);

std::vector<std::string> indexColumns = {"col1"};
std::string minKey = "key_0";
std::vector<Stripe> stripes = {createStripe(
{.streamOffset = 0,
.streamSize = static_cast<uint32_t>(encodedStream.data.size()),
.stream =
{.numChunks = 2,
.chunkRows = {2, 2},
.chunkOffsets = encodedStream.chunkOffsets},
.chunkKeys = {"key_b", "key_d"}})};
std::vector<int> stripeGroups = {1};

auto indexBuffers =
createTestClusterIndex(indexColumns, minKey, stripes, stripeGroups);
auto clusterIndexGroup =
createClusterIndexGroup(indexBuffers, /*stripeGroupIndex=*/0);

auto reader = ClusterIndexReader::create(
createInputStream(encodedStream.data), 0, clusterIndexGroup, pool_.get());

// First row of each chunk.
EXPECT_EQ(reader->keyAtRow(0), "key_a");
EXPECT_EQ(reader->keyAtRow(2), "key_c");

// Last row of each chunk.
EXPECT_EQ(reader->keyAtRow(1), "key_b");
EXPECT_EQ(reader->keyAtRow(3), "key_d");
}

} // namespace facebook::nimble::index::test
Loading
Loading