Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 88 additions & 65 deletions dwio/nimble/serializer/Projector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ std::shared_ptr<const Type> updateColumnNames(
}

// Tracks which children are selected at each RowType/FlatMapType.
// Key: pointer to Type node, Value: set of selected child indices.
// Matches Projector::SelectedChildrenMap.
using SelectedChildrenMap = folly::F14FastMap<const Type*, std::set<size_t>>;

// Inserts a stream offset and asserts it's unique.
Expand Down Expand Up @@ -361,62 +361,67 @@ void resolveSubfield(
collectTypeStreams(*current, indices);
}

// Forward declaration for recursive call.
std::shared_ptr<const Type> buildProjectedSchema(
const Type* inputType,
uint32_t& nextStreamOffset,
const SelectedChildrenMap& selectedChildren);
using OffsetMap = folly::F14FastMap<uint32_t, uint32_t>;

uint32_t mapOffset(const OffsetMap& offsetMap, uint32_t inputOffset) {
const auto it = offsetMap.find(inputOffset);
NIMBLE_CHECK(
it != offsetMap.end(),
"Input stream offset {} not found in offset map",
inputOffset);
return it->second;
}

// Builds the projected schema with only selected fields.
std::shared_ptr<const Type> buildProjectedSchema(
std::shared_ptr<const Type> buildProjectedType(
const Type* inputType,
uint32_t& nextStreamOffset,
const OffsetMap& offsetMap,
const SelectedChildrenMap& selectedChildren) {
const auto kind = inputType->kind();
switch (kind) {
case Kind::Scalar: {
const auto inputOffset =
inputType->asScalar().scalarDescriptor().offset();
return std::make_shared<ScalarType>(StreamDescriptor{
nextStreamOffset++,
mapOffset(offsetMap, inputOffset),
inputType->asScalar().scalarDescriptor().scalarKind()});
}

case Kind::TimestampMicroNano: {
const auto& ts = inputType->asTimestampMicroNano();
auto microsOffset = nextStreamOffset++;
auto nanosOffset = nextStreamOffset++;
return std::make_shared<TimestampMicroNanoType>(
StreamDescriptor{microsOffset, ts.microsDescriptor().scalarKind()},
StreamDescriptor{nanosOffset, ts.nanosDescriptor().scalarKind()});
StreamDescriptor{
mapOffset(offsetMap, ts.microsDescriptor().offset()),
ts.microsDescriptor().scalarKind()},
StreamDescriptor{
mapOffset(offsetMap, ts.nanosDescriptor().offset()),
ts.nanosDescriptor().scalarKind()});
}

case Kind::Row: {
const auto& row = inputType->asRow();
auto nullsDesc = StreamDescriptor{
nextStreamOffset++, row.nullsDescriptor().scalarKind()};
mapOffset(offsetMap, row.nullsDescriptor().offset()),
row.nullsDescriptor().scalarKind()};

std::vector<std::string> names;
std::vector<std::shared_ptr<const Type>> children;

// Check if we have selected children for this type.
const auto it = selectedChildren.find(inputType);
if (it != selectedChildren.end()) {
// Only include selected children.
names.reserve(it->second.size());
children.reserve(it->second.size());
for (size_t idx : it->second) {
names.emplace_back(row.nameAt(idx));
children.emplace_back(buildProjectedSchema(
row.childAt(idx).get(), nextStreamOffset, selectedChildren));
children.emplace_back(buildProjectedType(
row.childAt(idx).get(), offsetMap, selectedChildren));
}
} else {
// Include all children (this is a nested type within a projected
// field).
names.reserve(row.childrenCount());
children.reserve(row.childrenCount());
for (size_t i = 0; i < row.childrenCount(); ++i) {
names.emplace_back(row.nameAt(i));
children.emplace_back(buildProjectedSchema(
row.childAt(i).get(), nextStreamOffset, selectedChildren));
children.emplace_back(buildProjectedType(
row.childAt(i).get(), offsetMap, selectedChildren));
}
}

Expand All @@ -427,47 +432,53 @@ std::shared_ptr<const Type> buildProjectedSchema(
case Kind::Array: {
const auto& array = inputType->asArray();
auto lengthsDesc = StreamDescriptor{
nextStreamOffset++, array.lengthsDescriptor().scalarKind()};
auto elements = buildProjectedSchema(
array.elements().get(), nextStreamOffset, selectedChildren);
mapOffset(offsetMap, array.lengthsDescriptor().offset()),
array.lengthsDescriptor().scalarKind()};
auto elements = buildProjectedType(
array.elements().get(), offsetMap, selectedChildren);
return std::make_shared<ArrayType>(
std::move(lengthsDesc), std::move(elements));
}

case Kind::ArrayWithOffsets: {
const auto& array = inputType->asArrayWithOffsets();
auto offsetsDesc = StreamDescriptor{
nextStreamOffset++, array.offsetsDescriptor().scalarKind()};
mapOffset(offsetMap, array.offsetsDescriptor().offset()),
array.offsetsDescriptor().scalarKind()};
auto lengthsDesc = StreamDescriptor{
nextStreamOffset++, array.lengthsDescriptor().scalarKind()};
auto elements = buildProjectedSchema(
array.elements().get(), nextStreamOffset, selectedChildren);
mapOffset(offsetMap, array.lengthsDescriptor().offset()),
array.lengthsDescriptor().scalarKind()};
auto elements = buildProjectedType(
array.elements().get(), offsetMap, selectedChildren);
return std::make_shared<ArrayWithOffsetsType>(
std::move(offsetsDesc), std::move(lengthsDesc), std::move(elements));
}

case Kind::Map: {
const auto& map = inputType->asMap();
auto lengthsDesc = StreamDescriptor{
nextStreamOffset++, map.lengthsDescriptor().scalarKind()};
auto keys = buildProjectedSchema(
map.keys().get(), nextStreamOffset, selectedChildren);
auto values = buildProjectedSchema(
map.values().get(), nextStreamOffset, selectedChildren);
mapOffset(offsetMap, map.lengthsDescriptor().offset()),
map.lengthsDescriptor().scalarKind()};
auto keys =
buildProjectedType(map.keys().get(), offsetMap, selectedChildren);
auto values =
buildProjectedType(map.values().get(), offsetMap, selectedChildren);
return std::make_shared<MapType>(
std::move(lengthsDesc), std::move(keys), std::move(values));
}

case Kind::SlidingWindowMap: {
const auto& map = inputType->asSlidingWindowMap();
auto offsetsDesc = StreamDescriptor{
nextStreamOffset++, map.offsetsDescriptor().scalarKind()};
mapOffset(offsetMap, map.offsetsDescriptor().offset()),
map.offsetsDescriptor().scalarKind()};
auto lengthsDesc = StreamDescriptor{
nextStreamOffset++, map.lengthsDescriptor().scalarKind()};
auto keys = buildProjectedSchema(
map.keys().get(), nextStreamOffset, selectedChildren);
auto values = buildProjectedSchema(
map.values().get(), nextStreamOffset, selectedChildren);
mapOffset(offsetMap, map.lengthsDescriptor().offset()),
map.lengthsDescriptor().scalarKind()};
auto keys =
buildProjectedType(map.keys().get(), offsetMap, selectedChildren);
auto values =
buildProjectedType(map.values().get(), offsetMap, selectedChildren);
return std::make_shared<SlidingWindowMapType>(
std::move(offsetsDesc),
std::move(lengthsDesc),
Expand All @@ -478,42 +489,38 @@ std::shared_ptr<const Type> buildProjectedSchema(
case Kind::FlatMap: {
const auto& flatMap = inputType->asFlatMap();
auto nullsDesc = StreamDescriptor{
nextStreamOffset++, flatMap.nullsDescriptor().scalarKind()};
mapOffset(offsetMap, flatMap.nullsDescriptor().offset()),
flatMap.nullsDescriptor().scalarKind()};

std::vector<std::string> names;
std::vector<std::unique_ptr<StreamDescriptor>> inMapDescriptors;
std::vector<std::shared_ptr<const Type>> children;

auto it = selectedChildren.find(inputType);
if (it != selectedChildren.end()) {
// Only include selected keys.
names.reserve(it->second.size());
inMapDescriptors.reserve(it->second.size());
children.reserve(it->second.size());
for (size_t idx : it->second) {
names.emplace_back(flatMap.nameAt(idx));
// Child value streams are allocated BEFORE inMap in the Serializer.
// Match this order when assigning output stream offsets.
children.emplace_back(buildProjectedSchema(
flatMap.childAt(idx).get(), nextStreamOffset, selectedChildren));
children.emplace_back(buildProjectedType(
flatMap.childAt(idx).get(), offsetMap, selectedChildren));
inMapDescriptors.emplace_back(
std::make_unique<StreamDescriptor>(
nextStreamOffset++,
mapOffset(offsetMap, flatMap.inMapDescriptorAt(idx).offset()),
flatMap.inMapDescriptorAt(idx).scalarKind()));
}
} else {
// Include all children.
names.reserve(flatMap.childrenCount());
inMapDescriptors.reserve(flatMap.childrenCount());
children.reserve(flatMap.childrenCount());
for (size_t i = 0; i < flatMap.childrenCount(); ++i) {
names.emplace_back(flatMap.nameAt(i));
// Child value streams are allocated BEFORE inMap in the Serializer.
children.emplace_back(buildProjectedSchema(
flatMap.childAt(i).get(), nextStreamOffset, selectedChildren));
children.emplace_back(buildProjectedType(
flatMap.childAt(i).get(), offsetMap, selectedChildren));
inMapDescriptors.emplace_back(
std::make_unique<StreamDescriptor>(
nextStreamOffset++,
mapOffset(offsetMap, flatMap.inMapDescriptorAt(i).offset()),
flatMap.inMapDescriptorAt(i).scalarKind()));
}
}
Expand All @@ -533,6 +540,28 @@ std::shared_ptr<const Type> buildProjectedSchema(

} // namespace

void Projector::buildProjectedSchema(
const SelectedChildrenMap& selectedChildren) {
NIMBLE_CHECK_NULL(projectedSchema_, "Projected schema already built");

folly::F14FastMap<uint32_t, uint32_t> offsetMap;
offsetMap.reserve(inputStreamIndices_.size());
for (uint32_t i = 0; i < inputStreamIndices_.size(); ++i) {
offsetMap[inputStreamIndices_[i]] = i;
}
projectedSchema_ =
buildProjectedType(inputSchema_.get(), offsetMap, selectedChildren);

// Sanity check: verify all offsets were used by validating projected schema
// stream count matches selected input streams.
std::set<uint32_t> projectedIndices;
collectTypeStreams(*projectedSchema_, projectedIndices);
NIMBLE_CHECK_EQ(
projectedIndices.size(),
inputStreamIndices_.size(),
"Stream count mismatch");
}

Projector::Projector(
std::shared_ptr<const Type> inputSchema,
const std::vector<Subfield>& projectSubfields,
Expand Down Expand Up @@ -563,14 +592,7 @@ Projector::Projector(
// Assign sorted indices for sequential access during projection.
inputStreamIndices_.assign(uniqueIndices.begin(), uniqueIndices.end());

// Build output schema with compact stream offsets.
uint32_t nextStreamOffset{0};
projectedSchema_ = buildProjectedSchema(
inputSchema_.get(), nextStreamOffset, selectedChildren);

// Sanity check: output stream count must match selected input streams.
NIMBLE_CHECK_EQ(
nextStreamOffset, inputStreamIndices_.size(), "Stream count mismatch");
buildProjectedSchema(selectedChildren);

// Check if all streams are selected (enables pass-through optimization).
passThrough_ =
Expand Down Expand Up @@ -646,10 +668,6 @@ std::string Projector::project(std::string_view input) const {
if (inputIdx < inputStreams.size()) {
data = inputStreams[inputIdx];
}
// Skip empty streams for sparse output.
if (outputSparse && data.empty()) {
continue;
}
} else {
// Dense input: all streams must be present.
NIMBLE_CHECK_LT(
Expand All @@ -661,6 +679,11 @@ std::string Projector::project(std::string_view input) const {
data = inputStreams[inputIdx];
}

// Skip empty streams for sparse output.
if (outputSparse && data.empty()) {
continue;
}

streamsToWrite.emplace_back(outputIdx, data);
}

Expand Down
9 changes: 8 additions & 1 deletion dwio/nimble/serializer/Projector.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "dwio/nimble/serializer/Options.h"
#include "dwio/nimble/velox/SchemaReader.h"
#include "folly/container/F14Map.h"
#include "velox/type/Subfield.h"
#include "velox/type/Type.h"

Expand Down Expand Up @@ -125,7 +126,13 @@ class Projector {
}

private:
// Const members (set in init list).
using SelectedChildrenMap = folly::F14FastMap<const Type*, std::set<size_t>>;

// Builds projectedSchema_ from inputSchema_. Maps input stream offsets
// to output indices based on inputStreamIndices_ so that schema offsets
// match the data layout produced by project().
void buildProjectedSchema(const SelectedChildrenMap& selectedChildren);

const Options options_;

std::shared_ptr<const Type> inputSchema_;
Expand Down
Loading
Loading