Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions dwio/nimble/tablet/tests/TabletTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ class TabletTest : public ::testing::TestWithParam<BufferedInputMode> {
case BufferedInputMode::kCachedBufferedInput:
if (!allocator_) {
allocator_ = std::make_shared<velox::memory::MallocAllocator>(
1UL << 30, 0 /* reservationByteLimit */);
velox::memory::MallocAllocator::Options{
.capacity = 1UL << 30, .reservationByteLimit = 0});
}
if (!cache_) {
cache_ = velox::cache::AsyncDataCache::create(allocator_.get());
Expand Down Expand Up @@ -3891,7 +3892,8 @@ TEST_P(TabletWithIndexTest, cacheWarmPath) {

// Initialize cache before the cold reader so we can verify it starts empty.
allocator_ = std::make_shared<velox::memory::MallocAllocator>(
1UL << 30, 0 /* reservationByteLimit */);
velox::memory::MallocAllocator::Options{
.capacity = 1UL << 30, .reservationByteLimit = 0});
cache_ = velox::cache::AsyncDataCache::create(allocator_.get());

auto coldCacheStats = cache_->refreshStats();
Expand Down Expand Up @@ -4144,7 +4146,8 @@ TEST_P(TabletTest, cacheWarmPath) {
// Initialize cache before the cold reader so we can verify it starts empty.
// Normally lazy-initialized in createBufferedInput.
allocator_ = std::make_shared<velox::memory::MallocAllocator>(
1UL << 30, 0 /* reservationByteLimit */);
velox::memory::MallocAllocator::Options{
.capacity = 1UL << 30, .reservationByteLimit = 0});
cache_ = velox::cache::AsyncDataCache::create(allocator_.get());

// Cache should be empty before the cold reader.
Expand Down Expand Up @@ -4260,7 +4263,8 @@ TEST(TabletStressTest, concurrentReadersWithCacheEviction) {

// Shared cache infrastructure.
auto allocator = std::make_shared<velox::memory::MallocAllocator>(
1UL << 30, 0 /* reservationByteLimit */);
velox::memory::MallocAllocator::Options{
.capacity = 1UL << 30, .reservationByteLimit = 0});
auto cache = velox::cache::AsyncDataCache::create(allocator.get());
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(4);

Expand Down
154 changes: 119 additions & 35 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,22 @@ VeloxWriter::VeloxWriter(
*rootWriter_->typeBuilder(),
context_->options().encodingLayoutTree.value());
}

const auto boundaryCount = context_->options().stripeBoundaryColumnCount;
if (boundaryCount > 0) {
NIMBLE_USER_CHECK(
context_->options().indexConfig.has_value(),
"stripeBoundaryColumnCount requires indexConfig to be set");
const auto& indexColumns = context_->options().indexConfig->columns;
NIMBLE_USER_CHECK_LE(
boundaryCount,
indexColumns.size(),
"stripeBoundaryColumnCount exceeds number of index columns");
const auto& rowType = velox::asRowType(schema_->type());
for (uint32_t i = 0; i < boundaryCount; ++i) {
boundaryColumnIndices_.push_back(rowType->getChildIdx(indexColumns[i]));
}
}
}

VeloxWriter::~VeloxWriter() {}
Expand All @@ -744,42 +760,10 @@ bool VeloxWriter::write(const velox::VectorPtr& input) {

NIMBLE_CHECK_NOT_NULL(file_, "Writer is already closed");
try {
const auto numRows = input->size();
// Calculate raw size using schema information to correctly handle
// passthrough flatmaps (ROW vectors written as MAP).
RawSizeContext context;
const auto rawSize = nimble::getRawSizeFromVector(
input,
velox::common::Ranges::of(0, numRows),
context,
schema_.get(),
context_->flatMapNodeIds(),
context_->ignoreTopLevelNulls());
NIMBLE_CHECK_GE(rawSize, 0, "Invalid raw size");
context_->updateFileRawSize(rawSize);

if (context_->options().writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
context_->options().writeExecutor};
rootWriter_->write(input, OrderedRanges::of(0, numRows), &barrier);
addIndexKey(input, &barrier);
barrier.waitAll();
} else {
rootWriter_->write(input, OrderedRanges::of(0, numRows));
addIndexKey(input);
if (boundaryColumnIndices_.empty()) {
return writeBatch(input);
}

uint64_t memoryUsed{0};
for (const auto& [_, stream] : context_->streams()) {
memoryUsed += stream->memoryUsed();
}

context_->setMemoryUsed(memoryUsed);
context_->updateRowsInFile(numRows);
context_->updateRowsInStripe(numRows);
context_->setBytesWritten(file_->size());

return evaluateFlushPolicy();
return writeBoundaryAware(input);
} catch (const std::exception& e) {
lastException_ = std::current_exception();
context_->logger()->logException(LogOperation::Write, e.what());
Expand All @@ -793,6 +777,106 @@ bool VeloxWriter::write(const velox::VectorPtr& input) {
}
}

bool VeloxWriter::writeBatch(const velox::VectorPtr& input) {
const auto numRows = input->size();
// Calculate raw size using schema information to correctly handle
// passthrough flatmaps (ROW vectors written as MAP).
RawSizeContext context;
const auto rawSize = nimble::getRawSizeFromVector(
input,
velox::common::Ranges::of(0, numRows),
context,
schema_.get(),
context_->flatMapNodeIds(),
context_->ignoreTopLevelNulls());
NIMBLE_CHECK_GE(rawSize, 0, "Invalid raw size");
context_->updateFileRawSize(rawSize);

if (context_->options().writeExecutor) {
velox::dwio::common::ExecutorBarrier barrier{
context_->options().writeExecutor};
rootWriter_->write(input, OrderedRanges::of(0, numRows), &barrier);
addIndexKey(input, &barrier);
barrier.waitAll();
} else {
rootWriter_->write(input, OrderedRanges::of(0, numRows));
addIndexKey(input);
}

uint64_t memoryUsed{0};
for (const auto& [_, stream] : context_->streams()) {
memoryUsed += stream->memoryUsed();
}

context_->setMemoryUsed(memoryUsed);
context_->updateRowsInFile(numRows);
context_->updateRowsInStripe(numRows);
context_->setBytesWritten(file_->size());

return evaluateFlushPolicy();
}

velox::vector_size_t VeloxWriter::findNextBoundaryTransition(
const velox::VectorPtr& input,
velox::vector_size_t numRows) const {
const auto* rowVector = input->asChecked<velox::RowVector>();
for (velox::vector_size_t row = 1; row < numRows; ++row) {
for (auto colIdx : boundaryColumnIndices_) {
const auto& child = rowVector->childAt(colIdx);
if (!child->equalValueAt(child.get(), 0, row)) {
return row;
}
}
}
return numRows;
}

bool VeloxWriter::writeBoundaryAware(const velox::VectorPtr& input) {
bool flushed = false;
const auto numRows = input->size();

if (numRows == 0) {
return false;
}

// Cross-batch: flush if first row differs from previous last row.
if (lastBoundaryValues_ != nullptr) {
const auto* lastRow = lastBoundaryValues_->asChecked<velox::RowVector>();
const auto* curRow = input->asChecked<velox::RowVector>();
for (auto colIdx : boundaryColumnIndices_) {
if (!lastRow->childAt(colIdx)->equalValueAt(
curRow->childAt(colIdx).get(), 0, 0)) {
writeStripe();
flushed = true;
break;
}
}
}

// Intra-batch: scan for transitions, slice and write at each boundary.
velox::vector_size_t start = 0;
while (start < numRows) {
auto slice = (start == 0) ? input : input->slice(start, numRows - start);
auto boundary = findNextBoundaryTransition(slice, slice->size());
auto batch =
(boundary == slice->size()) ? slice : slice->slice(0, boundary);

if (start > 0) {
writeStripe();
flushed = true;
}

if (writeBatch(batch)) {
flushed = true;
}
start += boundary;
}

// Save last row for cross-batch detection.
lastBoundaryValues_ = input->slice(numRows - 1, 1);
return flushed;
}

void VeloxWriter::writeMetadata() {
if (context_->options().metadata.empty()) {
return;
Expand Down
21 changes: 21 additions & 0 deletions dwio/nimble/velox/VeloxWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ class VeloxWriter {
// Returning 'true' if stripe was flushed.
bool evaluateFlushPolicy();

/// Writes a single contiguous batch: computes raw size, invokes field
/// writers, adds index keys, updates memory/row tracking, and evaluates
/// flush policy. Shared by both normal and boundary-aware write paths.
bool writeBatch(const velox::VectorPtr& input);

/// Write implementation for boundary-aware path. Detects value transitions
/// in boundary columns and flushes stripes at boundaries.
bool writeBoundaryAware(const velox::VectorPtr& input);

/// Finds the first row where any boundary column differs from row 0.
/// Returns numRows if no transition found.
velox::vector_size_t findNextBoundaryTransition(
const velox::VectorPtr& input,
velox::vector_size_t numRows) const;

// Returning 'true' if stripe was written.
bool writeStripe();

Expand Down Expand Up @@ -193,6 +208,12 @@ class VeloxWriter {
std::unique_ptr<Buffer> encodingBuffer_;
std::vector<Stream> encodedStreams_;
std::exception_ptr lastException_;

/// Column indices for boundary columns, resolved in constructor.
std::vector<velox::column_index_t> boundaryColumnIndices_;

/// Last row of previous write() for cross-batch boundary detection.
velox::VectorPtr lastBoundaryValues_;
};

} // namespace facebook::nimble
10 changes: 10 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ struct VeloxWriterOptions {
/// writing. The index stores the per-chunk min and max key for each stripe.
std::optional<IndexConfig> indexConfig;

/// Number of leading index columns to use as stripe boundaries. When > 0,
/// the writer detects value transitions in the first N index columns during
/// write() and flushes at boundaries. Requires indexConfig to be set, and
/// the value must be <= indexConfig.columns.size(). Input data must be
/// pre-sorted/clustered by these columns.
/// NOTE: The size-based flush policy still applies as a safety valve. If a
/// single boundary group exceeds the flush policy threshold, the stripe may
/// be flushed mid-group to bound memory usage.
uint32_t stripeBoundaryColumnCount{0};

// Columns that should be encoded as flat maps
folly::F14FastSet<std::string> flatMapColumns;

Expand Down
Loading
Loading