diff --git a/dwio/nimble/serializer/Projector.cpp b/dwio/nimble/serializer/Projector.cpp index 3c2a4e5c..b2c2c77d 100644 --- a/dwio/nimble/serializer/Projector.cpp +++ b/dwio/nimble/serializer/Projector.cpp @@ -198,7 +198,7 @@ std::shared_ptr updateColumnNames( } // Tracks which children are selected at each RowType/FlatMapType. -// Key: pointer to Type node, Value: set of selected child indices. +// Matches Projector::SelectedChildrenMap. using SelectedChildrenMap = folly::F14FastMap>; // Inserts a stream offset and asserts it's unique. @@ -361,62 +361,67 @@ void resolveSubfield( collectTypeStreams(*current, indices); } -// Forward declaration for recursive call. -std::shared_ptr buildProjectedSchema( - const Type* inputType, - uint32_t& nextStreamOffset, - const SelectedChildrenMap& selectedChildren); +using OffsetMap = folly::F14FastMap; + +uint32_t mapOffset(const OffsetMap& offsetMap, uint32_t inputOffset) { + const auto it = offsetMap.find(inputOffset); + NIMBLE_CHECK( + it != offsetMap.end(), + "Input stream offset {} not found in offset map", + inputOffset); + return it->second; +} -// Builds the projected schema with only selected fields. -std::shared_ptr buildProjectedSchema( +std::shared_ptr buildProjectedType( const Type* inputType, - uint32_t& nextStreamOffset, + const OffsetMap& offsetMap, const SelectedChildrenMap& selectedChildren) { const auto kind = inputType->kind(); switch (kind) { case Kind::Scalar: { + const auto inputOffset = + inputType->asScalar().scalarDescriptor().offset(); return std::make_shared(StreamDescriptor{ - nextStreamOffset++, + mapOffset(offsetMap, inputOffset), inputType->asScalar().scalarDescriptor().scalarKind()}); } case Kind::TimestampMicroNano: { const auto& ts = inputType->asTimestampMicroNano(); - auto microsOffset = nextStreamOffset++; - auto nanosOffset = nextStreamOffset++; return std::make_shared( - StreamDescriptor{microsOffset, ts.microsDescriptor().scalarKind()}, - StreamDescriptor{nanosOffset, ts.nanosDescriptor().scalarKind()}); + StreamDescriptor{ + mapOffset(offsetMap, ts.microsDescriptor().offset()), + ts.microsDescriptor().scalarKind()}, + StreamDescriptor{ + mapOffset(offsetMap, ts.nanosDescriptor().offset()), + ts.nanosDescriptor().scalarKind()}); } case Kind::Row: { const auto& row = inputType->asRow(); auto nullsDesc = StreamDescriptor{ - nextStreamOffset++, row.nullsDescriptor().scalarKind()}; + mapOffset(offsetMap, row.nullsDescriptor().offset()), + row.nullsDescriptor().scalarKind()}; std::vector names; std::vector> children; - // Check if we have selected children for this type. const auto it = selectedChildren.find(inputType); if (it != selectedChildren.end()) { - // Only include selected children. names.reserve(it->second.size()); children.reserve(it->second.size()); for (size_t idx : it->second) { names.emplace_back(row.nameAt(idx)); - children.emplace_back(buildProjectedSchema( - row.childAt(idx).get(), nextStreamOffset, selectedChildren)); + children.emplace_back(buildProjectedType( + row.childAt(idx).get(), offsetMap, selectedChildren)); } } else { - // Include all children (this is a nested type within a projected - // field). names.reserve(row.childrenCount()); children.reserve(row.childrenCount()); for (size_t i = 0; i < row.childrenCount(); ++i) { names.emplace_back(row.nameAt(i)); - children.emplace_back(buildProjectedSchema( - row.childAt(i).get(), nextStreamOffset, selectedChildren)); + children.emplace_back(buildProjectedType( + row.childAt(i).get(), offsetMap, selectedChildren)); } } @@ -427,9 +432,10 @@ std::shared_ptr buildProjectedSchema( case Kind::Array: { const auto& array = inputType->asArray(); auto lengthsDesc = StreamDescriptor{ - nextStreamOffset++, array.lengthsDescriptor().scalarKind()}; - auto elements = buildProjectedSchema( - array.elements().get(), nextStreamOffset, selectedChildren); + mapOffset(offsetMap, array.lengthsDescriptor().offset()), + array.lengthsDescriptor().scalarKind()}; + auto elements = buildProjectedType( + array.elements().get(), offsetMap, selectedChildren); return std::make_shared( std::move(lengthsDesc), std::move(elements)); } @@ -437,11 +443,13 @@ std::shared_ptr buildProjectedSchema( case Kind::ArrayWithOffsets: { const auto& array = inputType->asArrayWithOffsets(); auto offsetsDesc = StreamDescriptor{ - nextStreamOffset++, array.offsetsDescriptor().scalarKind()}; + mapOffset(offsetMap, array.offsetsDescriptor().offset()), + array.offsetsDescriptor().scalarKind()}; auto lengthsDesc = StreamDescriptor{ - nextStreamOffset++, array.lengthsDescriptor().scalarKind()}; - auto elements = buildProjectedSchema( - array.elements().get(), nextStreamOffset, selectedChildren); + mapOffset(offsetMap, array.lengthsDescriptor().offset()), + array.lengthsDescriptor().scalarKind()}; + auto elements = buildProjectedType( + array.elements().get(), offsetMap, selectedChildren); return std::make_shared( std::move(offsetsDesc), std::move(lengthsDesc), std::move(elements)); } @@ -449,11 +457,12 @@ std::shared_ptr buildProjectedSchema( case Kind::Map: { const auto& map = inputType->asMap(); auto lengthsDesc = StreamDescriptor{ - nextStreamOffset++, map.lengthsDescriptor().scalarKind()}; - auto keys = buildProjectedSchema( - map.keys().get(), nextStreamOffset, selectedChildren); - auto values = buildProjectedSchema( - map.values().get(), nextStreamOffset, selectedChildren); + mapOffset(offsetMap, map.lengthsDescriptor().offset()), + map.lengthsDescriptor().scalarKind()}; + auto keys = + buildProjectedType(map.keys().get(), offsetMap, selectedChildren); + auto values = + buildProjectedType(map.values().get(), offsetMap, selectedChildren); return std::make_shared( std::move(lengthsDesc), std::move(keys), std::move(values)); } @@ -461,13 +470,15 @@ std::shared_ptr buildProjectedSchema( case Kind::SlidingWindowMap: { const auto& map = inputType->asSlidingWindowMap(); auto offsetsDesc = StreamDescriptor{ - nextStreamOffset++, map.offsetsDescriptor().scalarKind()}; + mapOffset(offsetMap, map.offsetsDescriptor().offset()), + map.offsetsDescriptor().scalarKind()}; auto lengthsDesc = StreamDescriptor{ - nextStreamOffset++, map.lengthsDescriptor().scalarKind()}; - auto keys = buildProjectedSchema( - map.keys().get(), nextStreamOffset, selectedChildren); - auto values = buildProjectedSchema( - map.values().get(), nextStreamOffset, selectedChildren); + mapOffset(offsetMap, map.lengthsDescriptor().offset()), + map.lengthsDescriptor().scalarKind()}; + auto keys = + buildProjectedType(map.keys().get(), offsetMap, selectedChildren); + auto values = + buildProjectedType(map.values().get(), offsetMap, selectedChildren); return std::make_shared( std::move(offsetsDesc), std::move(lengthsDesc), @@ -478,7 +489,8 @@ std::shared_ptr buildProjectedSchema( case Kind::FlatMap: { const auto& flatMap = inputType->asFlatMap(); auto nullsDesc = StreamDescriptor{ - nextStreamOffset++, flatMap.nullsDescriptor().scalarKind()}; + mapOffset(offsetMap, flatMap.nullsDescriptor().offset()), + flatMap.nullsDescriptor().scalarKind()}; std::vector names; std::vector> inMapDescriptors; @@ -486,34 +498,29 @@ std::shared_ptr buildProjectedSchema( auto it = selectedChildren.find(inputType); if (it != selectedChildren.end()) { - // Only include selected keys. names.reserve(it->second.size()); inMapDescriptors.reserve(it->second.size()); children.reserve(it->second.size()); for (size_t idx : it->second) { names.emplace_back(flatMap.nameAt(idx)); - // Child value streams are allocated BEFORE inMap in the Serializer. - // Match this order when assigning output stream offsets. - children.emplace_back(buildProjectedSchema( - flatMap.childAt(idx).get(), nextStreamOffset, selectedChildren)); + children.emplace_back(buildProjectedType( + flatMap.childAt(idx).get(), offsetMap, selectedChildren)); inMapDescriptors.emplace_back( std::make_unique( - nextStreamOffset++, + mapOffset(offsetMap, flatMap.inMapDescriptorAt(idx).offset()), flatMap.inMapDescriptorAt(idx).scalarKind())); } } else { - // Include all children. names.reserve(flatMap.childrenCount()); inMapDescriptors.reserve(flatMap.childrenCount()); children.reserve(flatMap.childrenCount()); for (size_t i = 0; i < flatMap.childrenCount(); ++i) { names.emplace_back(flatMap.nameAt(i)); - // Child value streams are allocated BEFORE inMap in the Serializer. - children.emplace_back(buildProjectedSchema( - flatMap.childAt(i).get(), nextStreamOffset, selectedChildren)); + children.emplace_back(buildProjectedType( + flatMap.childAt(i).get(), offsetMap, selectedChildren)); inMapDescriptors.emplace_back( std::make_unique( - nextStreamOffset++, + mapOffset(offsetMap, flatMap.inMapDescriptorAt(i).offset()), flatMap.inMapDescriptorAt(i).scalarKind())); } } @@ -533,6 +540,28 @@ std::shared_ptr buildProjectedSchema( } // namespace +void Projector::buildProjectedSchema( + const SelectedChildrenMap& selectedChildren) { + NIMBLE_CHECK_NULL(projectedSchema_, "Projected schema already built"); + + folly::F14FastMap offsetMap; + offsetMap.reserve(inputStreamIndices_.size()); + for (uint32_t i = 0; i < inputStreamIndices_.size(); ++i) { + offsetMap[inputStreamIndices_[i]] = i; + } + projectedSchema_ = + buildProjectedType(inputSchema_.get(), offsetMap, selectedChildren); + + // Sanity check: verify all offsets were used by validating projected schema + // stream count matches selected input streams. + std::set projectedIndices; + collectTypeStreams(*projectedSchema_, projectedIndices); + NIMBLE_CHECK_EQ( + projectedIndices.size(), + inputStreamIndices_.size(), + "Stream count mismatch"); +} + Projector::Projector( std::shared_ptr inputSchema, const std::vector& projectSubfields, @@ -563,14 +592,7 @@ Projector::Projector( // Assign sorted indices for sequential access during projection. inputStreamIndices_.assign(uniqueIndices.begin(), uniqueIndices.end()); - // Build output schema with compact stream offsets. - uint32_t nextStreamOffset{0}; - projectedSchema_ = buildProjectedSchema( - inputSchema_.get(), nextStreamOffset, selectedChildren); - - // Sanity check: output stream count must match selected input streams. - NIMBLE_CHECK_EQ( - nextStreamOffset, inputStreamIndices_.size(), "Stream count mismatch"); + buildProjectedSchema(selectedChildren); // Check if all streams are selected (enables pass-through optimization). passThrough_ = @@ -646,10 +668,6 @@ std::string Projector::project(std::string_view input) const { if (inputIdx < inputStreams.size()) { data = inputStreams[inputIdx]; } - // Skip empty streams for sparse output. - if (outputSparse && data.empty()) { - continue; - } } else { // Dense input: all streams must be present. NIMBLE_CHECK_LT( @@ -661,6 +679,11 @@ std::string Projector::project(std::string_view input) const { data = inputStreams[inputIdx]; } + // Skip empty streams for sparse output. + if (outputSparse && data.empty()) { + continue; + } + streamsToWrite.emplace_back(outputIdx, data); } diff --git a/dwio/nimble/serializer/Projector.h b/dwio/nimble/serializer/Projector.h index 62986754..50886104 100644 --- a/dwio/nimble/serializer/Projector.h +++ b/dwio/nimble/serializer/Projector.h @@ -24,6 +24,7 @@ #include "dwio/nimble/serializer/Options.h" #include "dwio/nimble/velox/SchemaReader.h" +#include "folly/container/F14Map.h" #include "velox/type/Subfield.h" #include "velox/type/Type.h" @@ -125,7 +126,13 @@ class Projector { } private: - // Const members (set in init list). + using SelectedChildrenMap = folly::F14FastMap>; + + // Builds projectedSchema_ from inputSchema_. Maps input stream offsets + // to output indices based on inputStreamIndices_ so that schema offsets + // match the data layout produced by project(). + void buildProjectedSchema(const SelectedChildrenMap& selectedChildren); + const Options options_; std::shared_ptr inputSchema_; diff --git a/dwio/nimble/serializer/tests/ProjectorTest.cpp b/dwio/nimble/serializer/tests/ProjectorTest.cpp index 45960ee7..e1d06963 100644 --- a/dwio/nimble/serializer/tests/ProjectorTest.cpp +++ b/dwio/nimble/serializer/tests/ProjectorTest.cpp @@ -1772,4 +1772,159 @@ TEST_F(ProjectorTest, projectNestedFieldUnderFlatMapValue) { EXPECT_EQ(nestedProjectedRow.nameAt(0), "new_value"); } +// Test projecting keys from multiple FlatMap columns at the same Row level. +// This validates that projected schema offsets match the data layout when +// input stream offsets from different FlatMap subtrees interleave numerically +// (e.g., map_b nulls offset falls between map_a nulls and map_a's first child). +TEST_F(ProjectorTest, projectMultipleFlatMapColumns) { + // Row with two FlatMap columns. + auto type = ROW({ + {"map_a", MAP(VARCHAR(), INTEGER())}, + {"map_b", MAP(VARCHAR(), BIGINT())}, + }); + + const vector_size_t numRows = 3; + + // Build map_a: keys "x", "y" with int32 values. + const int aEntriesPerRow = 2; + const int aTotalEntries = numRows * aEntriesPerRow; + + auto aOffsets = allocateOffsets(numRows, pool_.get()); + auto aSizes = allocateSizes(numRows, pool_.get()); + auto* aRawOffsets = aOffsets->asMutable(); + auto* aRawSizes = aSizes->asMutable(); + for (vector_size_t i = 0; i < numRows; ++i) { + aRawOffsets[i] = i * aEntriesPerRow; + aRawSizes[i] = aEntriesPerRow; + } + auto aKeys = BaseVector::create>( + VARCHAR(), aTotalEntries, pool_.get()); + auto aValues = BaseVector::create>( + INTEGER(), aTotalEntries, pool_.get()); + std::vector aKeyNames = {"x", "y"}; + for (int i = 0; i < aTotalEntries; ++i) { + aKeys->set(i, StringView(aKeyNames[i % aEntriesPerRow])); + aValues->set(i, (i + 1) * 10); + } + auto mapA = std::make_shared( + pool_.get(), + MAP(VARCHAR(), INTEGER()), + nullptr, + numRows, + aOffsets, + aSizes, + aKeys, + aValues); + + // Build map_b: keys "p", "q" with int64 values. + const int bEntriesPerRow = 2; + const int bTotalEntries = numRows * bEntriesPerRow; + + auto bOffsets = allocateOffsets(numRows, pool_.get()); + auto bSizes = allocateSizes(numRows, pool_.get()); + auto* bRawOffsets = bOffsets->asMutable(); + auto* bRawSizes = bSizes->asMutable(); + for (vector_size_t i = 0; i < numRows; ++i) { + bRawOffsets[i] = i * bEntriesPerRow; + bRawSizes[i] = bEntriesPerRow; + } + auto bKeys = BaseVector::create>( + VARCHAR(), bTotalEntries, pool_.get()); + auto bValues = BaseVector::create>( + BIGINT(), bTotalEntries, pool_.get()); + std::vector bKeyNames = {"p", "q"}; + for (int i = 0; i < bTotalEntries; ++i) { + bKeys->set(i, StringView(bKeyNames[i % bEntriesPerRow])); + bValues->set(i, (i + 1) * 100L); + } + auto mapB = std::make_shared( + pool_.get(), + MAP(VARCHAR(), BIGINT()), + nullptr, + numRows, + bOffsets, + bSizes, + bKeys, + bValues); + + auto vec = std::make_shared( + pool_.get(), type, nullptr, numRows, std::vector{mapA, mapB}); + + // Serialize both maps as FlatMaps. + SerializerOptions serOpts{ + .version = SerializationVersion::kSparseEncoded, + .flatMapColumns = {"map_a", "map_b"}, + }; + auto [serialized, inputSchema] = serializeWithSchema(vec, type, serOpts); + + // Verify stream offset interleaving: map_b nulls offset should fall between + // map_a nulls and map_a's first child stream. + const auto& inputRow = inputSchema->asRow(); + const auto& mapASchema = inputRow.childAt(0)->asFlatMap(); + const auto& mapBSchema = inputRow.childAt(1)->asFlatMap(); + ASSERT_LT( + mapASchema.nullsDescriptor().offset(), + mapASchema.childAt(0)->asScalar().scalarDescriptor().offset()) + << "map_a nulls should be before map_a children"; + ASSERT_LT( + mapBSchema.nullsDescriptor().offset(), + mapASchema.childAt(0)->asScalar().scalarDescriptor().offset()) + << "map_b nulls should interleave with map_a streams"; + + // Project one key from each FlatMap. + auto subfields = makeSubfields({"map_a[\"x\"]", "map_b[\"q\"]"}); + Projector projector( + inputSchema, + subfields, + {.inputHasVersionHeader = true, + .projectVersion = SerializationVersion::kSparseEncoded}); + + auto outputSchema = projector.projectedSchema(); + + // Verify output schema structure. + const auto& outRow = outputSchema->asRow(); + ASSERT_EQ(outRow.childrenCount(), 2); + const auto& outMapA = outRow.childAt(0)->asFlatMap(); + ASSERT_EQ(outMapA.childrenCount(), 1); + EXPECT_EQ(outMapA.nameAt(0), "x"); + const auto& outMapB = outRow.childAt(1)->asFlatMap(); + ASSERT_EQ(outMapB.childrenCount(), 1); + EXPECT_EQ(outMapB.nameAt(0), "q"); + + // Project and deserialize — this was crashing before the fix due to + // misaligned stream offsets between schema and data. + auto projected = projector.project(serialized); + auto result = deserialize( + projected, + outputSchema, + {.version = SerializationVersion::kSparseEncoded}); + + ASSERT_EQ(result->size(), numRows); + auto resultRow = result->as(); + + // Verify map_a projected values (key "x"). + auto resultMapA = resultRow->childAt(0)->as(); + for (vector_size_t i = 0; i < numRows; ++i) { + ASSERT_EQ(resultMapA->sizeAt(i), 1); + auto keyIdx = resultMapA->offsetAt(i); + auto keyVec = resultMapA->mapKeys()->as>(); + EXPECT_EQ(keyVec->valueAt(keyIdx).str(), "x"); + auto valVec = resultMapA->mapValues()->as>(); + // key "x" is at even positions (0, 2, 4) in the input. + EXPECT_EQ(valVec->valueAt(keyIdx), (i * aEntriesPerRow + 1) * 10); + } + + // Verify map_b projected values (key "q"). + auto resultMapB = resultRow->childAt(1)->as(); + for (vector_size_t i = 0; i < numRows; ++i) { + ASSERT_EQ(resultMapB->sizeAt(i), 1); + auto keyIdx = resultMapB->offsetAt(i); + auto keyVec = resultMapB->mapKeys()->as>(); + EXPECT_EQ(keyVec->valueAt(keyIdx).str(), "q"); + auto valVec = resultMapB->mapValues()->as>(); + // key "q" is at odd positions (1, 3, 5) in the input. + EXPECT_EQ(valVec->valueAt(keyIdx), (i * bEntriesPerRow + 2) * 100L); + } +} + } // namespace facebook::nimble::serde