diff --git a/dwio/nimble/tablet/tests/TabletTest.cpp b/dwio/nimble/tablet/tests/TabletTest.cpp index 49407420..5d2ebcd9 100644 --- a/dwio/nimble/tablet/tests/TabletTest.cpp +++ b/dwio/nimble/tablet/tests/TabletTest.cpp @@ -208,7 +208,8 @@ class TabletTest : public ::testing::TestWithParam { case BufferedInputMode::kCachedBufferedInput: if (!allocator_) { allocator_ = std::make_shared( - 1UL << 30, 0 /* reservationByteLimit */); + velox::memory::MallocAllocator::Options{ + .capacity = 1UL << 30, .reservationByteLimit = 0}); } if (!cache_) { cache_ = velox::cache::AsyncDataCache::create(allocator_.get()); @@ -3891,7 +3892,8 @@ TEST_P(TabletWithIndexTest, cacheWarmPath) { // Initialize cache before the cold reader so we can verify it starts empty. allocator_ = std::make_shared( - 1UL << 30, 0 /* reservationByteLimit */); + velox::memory::MallocAllocator::Options{ + .capacity = 1UL << 30, .reservationByteLimit = 0}); cache_ = velox::cache::AsyncDataCache::create(allocator_.get()); auto coldCacheStats = cache_->refreshStats(); @@ -4144,7 +4146,8 @@ TEST_P(TabletTest, cacheWarmPath) { // Initialize cache before the cold reader so we can verify it starts empty. // Normally lazy-initialized in createBufferedInput. allocator_ = std::make_shared( - 1UL << 30, 0 /* reservationByteLimit */); + velox::memory::MallocAllocator::Options{ + .capacity = 1UL << 30, .reservationByteLimit = 0}); cache_ = velox::cache::AsyncDataCache::create(allocator_.get()); // Cache should be empty before the cold reader. @@ -4260,7 +4263,8 @@ TEST(TabletStressTest, concurrentReadersWithCacheEviction) { // Shared cache infrastructure. auto allocator = std::make_shared( - 1UL << 30, 0 /* reservationByteLimit */); + velox::memory::MallocAllocator::Options{ + .capacity = 1UL << 30, .reservationByteLimit = 0}); auto cache = velox::cache::AsyncDataCache::create(allocator.get()); auto executor = std::make_unique(4); diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 1916685a..ec00733f 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -733,6 +733,22 @@ VeloxWriter::VeloxWriter( *rootWriter_->typeBuilder(), context_->options().encodingLayoutTree.value()); } + + const auto boundaryCount = context_->options().stripeBoundaryColumnCount; + if (boundaryCount > 0) { + NIMBLE_USER_CHECK( + context_->options().indexConfig.has_value(), + "stripeBoundaryColumnCount requires indexConfig to be set"); + const auto& indexColumns = context_->options().indexConfig->columns; + NIMBLE_USER_CHECK_LE( + boundaryCount, + indexColumns.size(), + "stripeBoundaryColumnCount exceeds number of index columns"); + const auto& rowType = velox::asRowType(schema_->type()); + for (uint32_t i = 0; i < boundaryCount; ++i) { + boundaryColumnIndices_.push_back(rowType->getChildIdx(indexColumns[i])); + } + } } VeloxWriter::~VeloxWriter() {} @@ -744,42 +760,10 @@ bool VeloxWriter::write(const velox::VectorPtr& input) { NIMBLE_CHECK_NOT_NULL(file_, "Writer is already closed"); try { - const auto numRows = input->size(); - // Calculate raw size using schema information to correctly handle - // passthrough flatmaps (ROW vectors written as MAP). - RawSizeContext context; - const auto rawSize = nimble::getRawSizeFromVector( - input, - velox::common::Ranges::of(0, numRows), - context, - schema_.get(), - context_->flatMapNodeIds(), - context_->ignoreTopLevelNulls()); - NIMBLE_CHECK_GE(rawSize, 0, "Invalid raw size"); - context_->updateFileRawSize(rawSize); - - if (context_->options().writeExecutor) { - velox::dwio::common::ExecutorBarrier barrier{ - context_->options().writeExecutor}; - rootWriter_->write(input, OrderedRanges::of(0, numRows), &barrier); - addIndexKey(input, &barrier); - barrier.waitAll(); - } else { - rootWriter_->write(input, OrderedRanges::of(0, numRows)); - addIndexKey(input); + if (boundaryColumnIndices_.empty()) { + return writeBatch(input); } - - uint64_t memoryUsed{0}; - for (const auto& [_, stream] : context_->streams()) { - memoryUsed += stream->memoryUsed(); - } - - context_->setMemoryUsed(memoryUsed); - context_->updateRowsInFile(numRows); - context_->updateRowsInStripe(numRows); - context_->setBytesWritten(file_->size()); - - return evaluateFlushPolicy(); + return writeBoundaryAware(input); } catch (const std::exception& e) { lastException_ = std::current_exception(); context_->logger()->logException(LogOperation::Write, e.what()); @@ -793,6 +777,106 @@ bool VeloxWriter::write(const velox::VectorPtr& input) { } } +bool VeloxWriter::writeBatch(const velox::VectorPtr& input) { + const auto numRows = input->size(); + // Calculate raw size using schema information to correctly handle + // passthrough flatmaps (ROW vectors written as MAP). + RawSizeContext context; + const auto rawSize = nimble::getRawSizeFromVector( + input, + velox::common::Ranges::of(0, numRows), + context, + schema_.get(), + context_->flatMapNodeIds(), + context_->ignoreTopLevelNulls()); + NIMBLE_CHECK_GE(rawSize, 0, "Invalid raw size"); + context_->updateFileRawSize(rawSize); + + if (context_->options().writeExecutor) { + velox::dwio::common::ExecutorBarrier barrier{ + context_->options().writeExecutor}; + rootWriter_->write(input, OrderedRanges::of(0, numRows), &barrier); + addIndexKey(input, &barrier); + barrier.waitAll(); + } else { + rootWriter_->write(input, OrderedRanges::of(0, numRows)); + addIndexKey(input); + } + + uint64_t memoryUsed{0}; + for (const auto& [_, stream] : context_->streams()) { + memoryUsed += stream->memoryUsed(); + } + + context_->setMemoryUsed(memoryUsed); + context_->updateRowsInFile(numRows); + context_->updateRowsInStripe(numRows); + context_->setBytesWritten(file_->size()); + + return evaluateFlushPolicy(); +} + +velox::vector_size_t VeloxWriter::findNextBoundaryTransition( + const velox::VectorPtr& input, + velox::vector_size_t numRows) const { + const auto* rowVector = input->asChecked(); + for (velox::vector_size_t row = 1; row < numRows; ++row) { + for (auto colIdx : boundaryColumnIndices_) { + const auto& child = rowVector->childAt(colIdx); + if (!child->equalValueAt(child.get(), 0, row)) { + return row; + } + } + } + return numRows; +} + +bool VeloxWriter::writeBoundaryAware(const velox::VectorPtr& input) { + bool flushed = false; + const auto numRows = input->size(); + + if (numRows == 0) { + return false; + } + + // Cross-batch: flush if first row differs from previous last row. + if (lastBoundaryValues_ != nullptr) { + const auto* lastRow = lastBoundaryValues_->asChecked(); + const auto* curRow = input->asChecked(); + for (auto colIdx : boundaryColumnIndices_) { + if (!lastRow->childAt(colIdx)->equalValueAt( + curRow->childAt(colIdx).get(), 0, 0)) { + writeStripe(); + flushed = true; + break; + } + } + } + + // Intra-batch: scan for transitions, slice and write at each boundary. + velox::vector_size_t start = 0; + while (start < numRows) { + auto slice = (start == 0) ? input : input->slice(start, numRows - start); + auto boundary = findNextBoundaryTransition(slice, slice->size()); + auto batch = + (boundary == slice->size()) ? slice : slice->slice(0, boundary); + + if (start > 0) { + writeStripe(); + flushed = true; + } + + if (writeBatch(batch)) { + flushed = true; + } + start += boundary; + } + + // Save last row for cross-batch detection. + lastBoundaryValues_ = input->slice(numRows - 1, 1); + return flushed; +} + void VeloxWriter::writeMetadata() { if (context_->options().metadata.empty()) { return; diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index cbc2064e..4c93758c 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -151,6 +151,21 @@ class VeloxWriter { // Returning 'true' if stripe was flushed. bool evaluateFlushPolicy(); + /// Writes a single contiguous batch: computes raw size, invokes field + /// writers, adds index keys, updates memory/row tracking, and evaluates + /// flush policy. Shared by both normal and boundary-aware write paths. + bool writeBatch(const velox::VectorPtr& input); + + /// Write implementation for boundary-aware path. Detects value transitions + /// in boundary columns and flushes stripes at boundaries. + bool writeBoundaryAware(const velox::VectorPtr& input); + + /// Finds the first row where any boundary column differs from row 0. + /// Returns numRows if no transition found. + velox::vector_size_t findNextBoundaryTransition( + const velox::VectorPtr& input, + velox::vector_size_t numRows) const; + // Returning 'true' if stripe was written. bool writeStripe(); @@ -193,6 +208,12 @@ class VeloxWriter { std::unique_ptr encodingBuffer_; std::vector encodedStreams_; std::exception_ptr lastException_; + + /// Column indices for boundary columns, resolved in constructor. + std::vector boundaryColumnIndices_; + + /// Last row of previous write() for cross-batch boundary detection. + velox::VectorPtr lastBoundaryValues_; }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 7f6a5deb..947dc83e 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -49,6 +49,16 @@ struct VeloxWriterOptions { /// writing. The index stores the per-chunk min and max key for each stripe. std::optional indexConfig; + /// Number of leading index columns to use as stripe boundaries. When > 0, + /// the writer detects value transitions in the first N index columns during + /// write() and flushes at boundaries. Requires indexConfig to be set, and + /// the value must be <= indexConfig.columns.size(). Input data must be + /// pre-sorted/clustered by these columns. + /// NOTE: The size-based flush policy still applies as a safety valve. If a + /// single boundary group exceeds the flush policy threshold, the stripe may + /// be flushed mid-group to bound memory usage. + uint32_t stripeBoundaryColumnCount{0}; + // Columns that should be encoded as flat maps folly::F14FastSet flatMapColumns; diff --git a/dwio/nimble/velox/tests/VeloxWriterTest.cpp b/dwio/nimble/velox/tests/VeloxWriterTest.cpp index dcf84eba..938a3012 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTest.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTest.cpp @@ -4225,4 +4225,608 @@ INSTANTIATE_TEST_SUITE_P( return info.param.toString(); }); +// Tests for stripe boundary column cutting feature. +class StripeBoundaryTest : public VeloxWriterTest {}; + +TEST_F(StripeBoundaryTest, intraBatchTransitions) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::VARCHAR()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + // [1,1,1,2,2,3] -> should produce 3 stripes with rows [3,2,1] + auto batch = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1, 1, 2, 2, 3}), + vectorMaker.flatVector( + {"a", "b", "c", "d", "e", "f"})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + writer.write(batch); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + EXPECT_EQ(reader.tabletReader().stripeCount(), 3); + + // Verify data integrity. + velox::VectorPtr result; + uint32_t rowOffset = 0; + while (reader.next(100, result)) { + for (velox::vector_size_t i = 0; i < result->size(); ++i) { + ASSERT_TRUE(result->equalValueAt(batch.get(), i, rowOffset + i)); + } + rowOffset += result->size(); + } + EXPECT_EQ(rowOffset, 6); +} + +TEST_F(StripeBoundaryTest, crossBatchTransition) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto batch1 = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1}), + vectorMaker.flatVector({10, 20})}); + auto batch2 = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({2, 2}), + vectorMaker.flatVector({30, 40})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + writer.write(batch1); + writer.write(batch2); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + EXPECT_EQ(reader.tabletReader().stripeCount(), 2); +} + +TEST_F(StripeBoundaryTest, noCrossBatchTransition) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto batch1 = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1}), + vectorMaker.flatVector({10, 20})}); + auto batch2 = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1}), + vectorMaker.flatVector({30, 40})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + writer.write(batch1); + writer.write(batch2); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + // No transition between batches, so all in one stripe. + EXPECT_EQ(reader.tabletReader().stripeCount(), 1); +} + +TEST_F(StripeBoundaryTest, multipleBoundaryColumns) { + auto type = velox::ROW({ + {"key1", velox::INTEGER()}, + {"key2", velox::VARCHAR()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key1", "key2"}, + .sortOrders = + {nimble::SortOrder{.ascending = true}, + nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + // key1 is same, key2 transitions: [a,a,b,b] -> 2 stripes + auto batch = vectorMaker.rowVector( + {"key1", "key2", "value"}, + {vectorMaker.flatVector({1, 1, 1, 1}), + vectorMaker.flatVector({"a", "a", "b", "b"}), + vectorMaker.flatVector({10, 20, 30, 40})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 2}); + writer.write(batch); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + EXPECT_EQ(reader.tabletReader().stripeCount(), 2); +} + +TEST_F(StripeBoundaryTest, allSameValues) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto batch = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({5, 5, 5, 5}), + vectorMaker.flatVector({1, 2, 3, 4})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + writer.write(batch); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + EXPECT_EQ(reader.tabletReader().stripeCount(), 1); +} + +TEST_F(StripeBoundaryTest, everyRowDifferent) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto batch = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 2, 3, 4}), + vectorMaker.flatVector({10, 20, 30, 40})}); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + writer.write(batch); + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + // Each row is a different key -> 4 stripes. + EXPECT_EQ(reader.tabletReader().stripeCount(), 4); + + // Verify data integrity. + velox::VectorPtr result; + uint32_t rowOffset = 0; + while (reader.next(100, result)) { + for (velox::vector_size_t i = 0; i < result->size(); ++i) { + ASSERT_TRUE(result->equalValueAt(batch.get(), i, rowOffset + i)); + } + rowOffset += result->size(); + } + EXPECT_EQ(rowOffset, 4); +} + +TEST_F(StripeBoundaryTest, singleRowBatches) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + + for (int32_t i = 1; i <= 3; ++i) { + auto batch = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({i}), + vectorMaker.flatVector({i * 10})}); + writer.write(batch); + } + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + // Each batch has a different key -> 3 stripes. + EXPECT_EQ(reader.tabletReader().stripeCount(), 3); +} + +TEST_F(StripeBoundaryTest, validationCountExceedsIndexColumns) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + EXPECT_THROW( + nimble::VeloxWriter( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 2}), + nimble::NimbleUserError); +} + +TEST_F(StripeBoundaryTest, validationNoIndexConfig) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::INTEGER()}, + }); + + std::string file; + auto writeFile = std::make_unique(&file); + + EXPECT_THROW( + nimble::VeloxWriter( + type, + std::move(writeFile), + *rootPool_, + {.stripeBoundaryColumnCount = 1}), + nimble::NimbleUserError); +} + +TEST_F(StripeBoundaryTest, dataIntegrity) { + auto type = velox::ROW({ + {"key", velox::INTEGER()}, + {"value", velox::VARCHAR()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"key"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + // Write multiple batches with transitions. + std::vector batches; + batches.push_back(vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1, 2, 2}), + vectorMaker.flatVector({"a", "b", "c", "d"})})); + batches.push_back(vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({2, 3, 3}), + vectorMaker.flatVector({"e", "f", "g"})})); + batches.push_back(vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({4, 4}), + vectorMaker.flatVector({"h", "i"})})); + + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + + // Concatenate all expected data. + auto expected = vectorMaker.rowVector( + {"key", "value"}, + {vectorMaker.flatVector({1, 1, 2, 2, 2, 3, 3, 4, 4}), + vectorMaker.flatVector( + {"a", "b", "c", "d", "e", "f", "g", "h", "i"})}); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(&readFile, *leafPool_); + // Keys: 1(2), 2(3), 3(2), 4(2) -> 4 stripes + EXPECT_EQ(reader.tabletReader().stripeCount(), 4); + + // Read back all data and verify. + velox::VectorPtr result; + uint32_t rowOffset = 0; + while (reader.next(100, result)) { + for (velox::vector_size_t i = 0; i < result->size(); ++i) { + ASSERT_TRUE(result->equalValueAt(expected.get(), i, rowOffset + i)) + << "Mismatch at row " << rowOffset + i; + } + rowOffset += result->size(); + } + EXPECT_EQ(rowOffset, 9); +} + +TEST_F(StripeBoundaryTest, userSequenceBenchmark) { + // Simulates user sequence storage: each user has a variable number of events. + // With stripeBoundaryColumnCount=1 on user_id, each user should get their + // own stripe(s), reducing read amplification. + auto type = velox::ROW({ + {"user_id", velox::BIGINT()}, + {"event_ts", velox::BIGINT()}, + {"event_type", velox::INTEGER()}, + {"payload", velox::VARCHAR()}, + }); + + nimble::IndexConfig indexConfig{ + .columns = {"user_id"}, + .sortOrders = {nimble::SortOrder{.ascending = true}}, + .enforceKeyOrder = true, + }; + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + // Generate realistic data: 50 users, each with 10-100 events, + // delivered in batches of ~200 rows (spanning multiple users). + constexpr int64_t kNumUsers = 50; + constexpr int64_t kMinEventsPerUser = 10; + constexpr int64_t kMaxEventsPerUser = 100; + constexpr velox::vector_size_t kBatchSize = 200; + + // Build all rows sorted by user_id. + std::vector allUserIds; + std::vector allTimestamps; + std::vector allEventTypes; + std::vector allPayloads; + + folly::Random::DefaultGenerator rng(42); + std::map userEventCounts; + + for (int64_t userId = 1; userId <= kNumUsers; ++userId) { + auto numEvents = kMinEventsPerUser + + folly::Random::rand32(kMaxEventsPerUser - kMinEventsPerUser, rng); + userEventCounts[userId] = numEvents; + int64_t ts = userId * 1000000; + for (int64_t e = 0; e < numEvents; ++e) { + allUserIds.push_back(userId); + allTimestamps.push_back(ts + e * 1000); + allEventTypes.push_back(folly::Random::rand32(10, rng)); + allPayloads.push_back( + "event_" + std::to_string(userId) + "_" + std::to_string(e)); + } + } + + const auto totalRows = allUserIds.size(); + + // Split into batches. + std::vector batches; + for (size_t offset = 0; offset < totalRows; offset += kBatchSize) { + auto count = std::min(kBatchSize, totalRows - offset); + std::vector batchUserIds( + allUserIds.begin() + offset, allUserIds.begin() + offset + count); + std::vector batchTimestamps( + allTimestamps.begin() + offset, + allTimestamps.begin() + offset + count); + std::vector batchEventTypes( + allEventTypes.begin() + offset, + allEventTypes.begin() + offset + count); + std::vector batchPayloads; + for (size_t i = offset; i < offset + count; ++i) { + batchPayloads.emplace_back(allPayloads[i]); + } + + batches.push_back(vectorMaker.rowVector( + {"user_id", "event_ts", "event_type", "payload"}, + {vectorMaker.flatVector(batchUserIds), + vectorMaker.flatVector(batchTimestamps), + vectorMaker.flatVector(batchEventTypes), + vectorMaker.flatVector(batchPayloads)})); + } + + // --- Write WITHOUT boundary columns (baseline) --- + std::string fileBaseline; + { + auto writeFile = + std::make_unique(&fileBaseline); + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig}); + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + } + + // --- Write WITH boundary columns --- + std::string fileBoundary; + { + auto writeFile = + std::make_unique(&fileBoundary); + nimble::VeloxWriter writer( + type, + std::move(writeFile), + *rootPool_, + {.indexConfig = indexConfig, .stripeBoundaryColumnCount = 1}); + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + } + + // --- Verify stripe counts --- + uint32_t baselineStripeCount; + { + velox::InMemoryReadFile readFile(fileBaseline); + nimble::VeloxReader reader(&readFile, *leafPool_); + baselineStripeCount = reader.tabletReader().stripeCount(); + } + + uint32_t boundaryStripeCount; + { + velox::InMemoryReadFile readFile(fileBoundary); + nimble::VeloxReader reader(&readFile, *leafPool_); + boundaryStripeCount = reader.tabletReader().stripeCount(); + } + + // Baseline should have 1 stripe (all data fits in 256MB default). + // Boundary should have exactly kNumUsers stripes. + EXPECT_EQ(baselineStripeCount, 1) + << "Baseline: all data fits in single stripe"; + EXPECT_EQ(boundaryStripeCount, kNumUsers) + << "Boundary: one stripe per user"; + + LOG(INFO) << "=== User Sequence Benchmark Results ==="; + LOG(INFO) << "Total rows: " << totalRows; + LOG(INFO) << "Total users: " << kNumUsers; + LOG(INFO) << "Batches: " << batches.size() << " (batch_size=" << kBatchSize + << ")"; + LOG(INFO) << "Baseline: " << baselineStripeCount + << " stripe(s), file_size=" << fileBaseline.size(); + LOG(INFO) << "Boundary: " << boundaryStripeCount + << " stripe(s), file_size=" << fileBoundary.size(); + + // --- Verify data integrity of boundary file --- + { + velox::InMemoryReadFile readFile(fileBoundary); + nimble::VeloxReader reader(&readFile, *leafPool_); + velox::VectorPtr result; + uint32_t readRowOffset = 0; + while (reader.next(1000, result)) { + const auto* rowResult = result->asChecked(); + for (velox::vector_size_t i = 0; i < result->size(); ++i) { + auto globalIdx = readRowOffset + i; + // Verify user_id column. + auto userId = rowResult->childAt(0) + ->asChecked>() + ->valueAt(i); + ASSERT_EQ(userId, allUserIds[globalIdx]) + << "user_id mismatch at row " << globalIdx; + // Verify event_ts column. + auto ts = rowResult->childAt(1) + ->asChecked>() + ->valueAt(i); + ASSERT_EQ(ts, allTimestamps[globalIdx]) + << "event_ts mismatch at row " << globalIdx; + } + readRowOffset += result->size(); + } + EXPECT_EQ(readRowOffset, totalRows) + << "Total row count mismatch after read-back"; + } + + // --- Verify each stripe contains exactly one user --- + { + velox::InMemoryReadFile readFile(fileBoundary); + nimble::VeloxReader reader(&readFile, *leafPool_); + std::set usersFound; + for (uint32_t stripeIdx = 0; stripeIdx < boundaryStripeCount; + ++stripeIdx) { + // Read this stripe by seeking and reading. + // Since VeloxReader reads sequentially, we just read the next stripe. + velox::VectorPtr result; + ASSERT_TRUE(reader.next(10000, result)); + const auto* rowResult = result->asChecked(); + auto* userIdVec = + rowResult->childAt(0)->asChecked>(); + + // All rows in this stripe should have the same user_id. + auto firstUserId = userIdVec->valueAt(0); + for (velox::vector_size_t i = 1; i < result->size(); ++i) { + ASSERT_EQ(userIdVec->valueAt(i), firstUserId) + << "Stripe " << stripeIdx << " contains mixed users at row " << i + << ": expected " << firstUserId << " got " + << userIdVec->valueAt(i); + } + + ASSERT_EQ(usersFound.count(firstUserId), 0) + << "User " << firstUserId << " found in multiple stripes"; + usersFound.insert(firstUserId); + + // Verify row count matches expected events for this user. + EXPECT_EQ(result->size(), userEventCounts[firstUserId]) + << "Stripe " << stripeIdx << " for user " << firstUserId + << " has wrong row count"; + + LOG(INFO) << "Stripe " << stripeIdx << ": user_id=" << firstUserId + << " rows=" << result->size(); + } + EXPECT_EQ(usersFound.size(), kNumUsers); + } + + LOG(INFO) << "=== All verifications passed ==="; +} + } // namespace facebook