diff --git a/dwio/nimble/common/Types.cpp b/dwio/nimble/common/Types.cpp index c31b93f1..4446aa6c 100644 --- a/dwio/nimble/common/Types.cpp +++ b/dwio/nimble/common/Types.cpp @@ -49,6 +49,14 @@ std::string toString(EncodingType encodingType) { return "Sentinel"; case EncodingType::Prefix: return "Prefix"; + case EncodingType::VarintV2: + return "VarintV2"; + case EncodingType::RLEV2: + return "RLEV2"; + case EncodingType::FixedBitWidthV2: + return "FixedBitWidthV2"; + case EncodingType::MainlyConstantV2: + return "MainlyConstantV2"; } return fmt::format( "Unknown encoding type: {}", static_cast(encodingType)); diff --git a/dwio/nimble/common/Types.h b/dwio/nimble/common/Types.h index b627cad0..97843b6d 100644 --- a/dwio/nimble/common/Types.h +++ b/dwio/nimble/common/Types.h @@ -104,6 +104,11 @@ enum class EncodingType { // shared across consecutive entries to reduce storage. Supports seek // operations for efficient random access. Prefix = 11, + // V2 variants of existing encodings for benchmarking and experimentation. + VarintV2 = 12, + RLEV2 = 13, + FixedBitWidthV2 = 14, + MainlyConstantV2 = 15, }; std::string toString(EncodingType encodingType); std::ostream& operator<<(std::ostream& out, EncodingType encodingType); diff --git a/dwio/nimble/common/Varint.cpp b/dwio/nimble/common/Varint.cpp index 6c374e02..4956933f 100644 --- a/dwio/nimble/common/Varint.cpp +++ b/dwio/nimble/common/Varint.cpp @@ -21,6 +21,10 @@ #include "common/aarch64/compat.h" #endif //__aarch64__ +#include + +#include + #include "dwio/nimble/common/Exceptions.h" #include "dwio/nimble/common/Varint.h" #include "folly/CpuId.h" @@ -838,3 +842,139 @@ const char* bulkVarintDecodeBmi2(uint64_t n, const char* pos, T* output) { } } // namespace facebook::nimble::varint + +// V2 variants for independent experimentation in VarintEncodingV2. +namespace facebook::nimble::varint_v2 { + +using varint::bulkVarintDecodeBmi2; +using varint::readVarint32; +using varint::readVarint64; + +// Zero-extend 8 consecutive bytes into T-sized output elements using xsimd +// batch construction and store. +template +inline void expandByteWord(const uint8_t* bytes, T* output) { + using batch_type = xsimd::batch; + constexpr auto kBatchSize = batch_type::size; + + if constexpr (kBatchSize >= 8) { + batch_type( + static_cast(bytes[0]), + static_cast(bytes[1]), + static_cast(bytes[2]), + static_cast(bytes[3]), + static_cast(bytes[4]), + static_cast(bytes[5]), + static_cast(bytes[6]), + static_cast(bytes[7])) + .store_unaligned(output); + } else if constexpr (kBatchSize == 4) { + batch_type( + static_cast(bytes[0]), + static_cast(bytes[1]), + static_cast(bytes[2]), + static_cast(bytes[3])) + .store_unaligned(output); + batch_type( + static_cast(bytes[4]), + static_cast(bytes[5]), + static_cast(bytes[6]), + static_cast(bytes[7])) + .store_unaligned(output + 4); + } else if constexpr (kBatchSize == 2) { + batch_type(static_cast(bytes[0]), static_cast(bytes[1])) + .store_unaligned(output); + batch_type(static_cast(bytes[2]), static_cast(bytes[3])) + .store_unaligned(output + 2); + batch_type(static_cast(bytes[4]), static_cast(bytes[5])) + .store_unaligned(output + 4); + batch_type(static_cast(bytes[6]), static_cast(bytes[7])) + .store_unaligned(output + 6); + } +} + +// Process runs of single-byte varints using xsimd for both the high-bit +// check and byte-to-element widening. Works with uint8_t* throughout, +// avoiding reinterpret_cast to uint64_t* (alignment/strict-aliasing issues). +// Returns the number of elements remaining after processing. +template +inline uint64_t +bulkDecodeSingleByteRun(uint64_t n, const char*& pos, T*& output) { + using u8_batch = xsimd::batch; + constexpr auto kU8Size = u8_batch::size; + constexpr uint64_t wordSize = 8; + + const auto* src = reinterpret_cast(pos); + + // Process kU8BatchSize bytes at a time (32 on AVX2, 16 on SSE/NEON). + // Single wide load + vptest replaces 4 separate uint64_t loads + OR chain. + while (n >= kU8Size) { + auto bytes = u8_batch::load_unaligned(src); + if (xsimd::any((bytes & u8_batch(0x80)) != u8_batch(0))) { + break; + } + for (size_t i = 0; i < kU8Size; i += wordSize) { + expandByteWord(src + i, output + i); + } + src += kU8Size; + output += kU8Size; + n -= kU8Size; + } + + // Process 8 bytes at a time. Use memcpy for the high-bit check to avoid + // reinterpret_cast strict-aliasing/alignment issues. + while (n >= wordSize) { + uint64_t word; + std::memcpy(&word, src, sizeof(word)); + + if (word & 0x8080808080808080ULL) { + break; + } + expandByteWord(src, output); + src += wordSize; + output += wordSize; + n -= wordSize; + } + + // Handle trailing single-byte varints one at a time. + while (n > 0 && !(src[0] & 0x80)) { + *output++ = static_cast(src[0]); + ++src; + --n; + } + + pos = reinterpret_cast(src); + return n; +} + +const char* bulkVarintDecode32(uint64_t n, const char* pos, uint32_t* output) { + static bool hasBmi2 = folly::CpuId().bmi2(); + n = bulkDecodeSingleByteRun(n, pos, output); + if (n == 0) { + return pos; + } + if (hasBmi2) { + return bulkVarintDecodeBmi2(n, pos, output); + } + for (uint64_t i = 0; i < n; ++i) { + *output++ = readVarint32(&pos); + } + return pos; +} + +const char* bulkVarintDecode64(uint64_t n, const char* pos, uint64_t* output) { + static bool hasBmi2 = folly::CpuId().bmi2(); + n = bulkDecodeSingleByteRun(n, pos, output); + if (n == 0) { + return pos; + } + if (hasBmi2) { + return bulkVarintDecodeBmi2(n, pos, output); + } + for (uint64_t i = 0; i < n; ++i) { + *output++ = readVarint64(&pos); + } + return pos; +} + +} // namespace facebook::nimble::varint_v2 diff --git a/dwio/nimble/common/Varint.h b/dwio/nimble/common/Varint.h index 70792bfe..80cf5c9e 100644 --- a/dwio/nimble/common/Varint.h +++ b/dwio/nimble/common/Varint.h @@ -136,3 +136,11 @@ inline uint64_t readVarint64(const char** pos) noexcept { } } // namespace facebook::nimble::varint + +// V2 variants of bulk varint decode for independent experimentation. +namespace facebook::nimble::varint_v2 { + +const char* bulkVarintDecode32(uint64_t n, const char* pos, uint32_t* output); +const char* bulkVarintDecode64(uint64_t n, const char* pos, uint64_t* output); + +} // namespace facebook::nimble::varint_v2 diff --git a/dwio/nimble/common/benchmarks/VarintBenchmark.cpp b/dwio/nimble/common/benchmarks/VarintBenchmark.cpp index 6d095fde..b2c6d61a 100644 --- a/dwio/nimble/common/benchmarks/VarintBenchmark.cpp +++ b/dwio/nimble/common/benchmarks/VarintBenchmark.cpp @@ -117,57 +117,66 @@ std::vector MakeSkewedData(int num_elements = kNumElements) { return data; } -BENCHMARK(Encode, iters) { - std::vector data; - std::unique_ptr buf; - BENCHMARK_SUSPEND { - data = MakeUniformData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); +// Makes data where all values fit in exactly `numBytes` varint bytes. +std::vector MakeFixedWidthData32( + int numBytes, + int num_elements = kNumElements) { + std::vector data(num_elements); + uint32_t lo = (numBytes == 1) ? 0 : (1u << (7 * (numBytes - 1))); + uint32_t hi = (1u << (7 * numBytes)) - 1; + if (numBytes == 5) { + hi = UINT32_MAX; } - while (iters--) { - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } - CHECK_GE(pos - buf.get(), kNumElements); + for (int i = 0; i < num_elements; ++i) { + data[i] = lo + folly::Random::secureRand32() % (hi - lo + 1); } + return data; } -BENCHMARK(FollyEncode, iters) { - std::vector data; - std::unique_ptr buf; - BENCHMARK_SUSPEND { - data = MakeUniformData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); +// Makes 64-bit data where all values fit in exactly `numBytes` varint bytes. +std::vector MakeFixedWidthData64( + int numBytes, + int num_elements = kNumElements) { + std::vector data(num_elements); + uint64_t lo = (numBytes == 1) ? 0 : (1ull << (7 * (numBytes - 1))); + uint64_t hi = (numBytes >= 10) ? UINT64_MAX : ((1ull << (7 * numBytes)) - 1); + for (int i = 0; i < num_elements; ++i) { + data[i] = lo + folly::Random::secureRand64() % (hi - lo + 1); } - while (iters--) { - uint8_t* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - pos += folly::encodeVarint(data[i], pos); - } - CHECK_GE(pos - buf.get(), kNumElements); + return data; +} + +// Encode data into a varint buffer, returns total encoded size. +template +std::unique_ptr EncodeData( + const std::vector& data, + uint64_t& encodedSize) { + auto buf = std::make_unique(data.size() * folly::kMaxVarintLength64); + char* pos = buf.get(); + for (auto val : data) { + nimble::varint::writeVarint(val, &pos); } + encodedSize = pos - buf.get(); + return buf; } -BENCHMARK(NimbleDecodeUniform, iters) { +// ============================================================================ +// Original benchmarks (uniform + skewed, 32-bit) +// ============================================================================ + +BENCHMARK(Encode, iters) { std::vector data; std::unique_ptr buf; - std::vector recovered; BENCHMARK_SUSPEND { - recovered.resize(kNumElements); data = MakeUniformData(); buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } } while (iters--) { - const char* cpos = buf.get(); + char* pos = buf.get(); for (int i = 0; i < kNumElements; ++i) { - recovered[i] = nimble::varint::readVarint32(&cpos); + nimble::varint::writeVarint(data[i], &pos); } - CHECK_EQ(recovered.back(), data.back()); + CHECK_GE(pos - buf.get(), kNumElements); } } @@ -191,90 +200,40 @@ BENCHMARK(NimbleBulkDecodeUniform, iters) { } } -BENCHMARK(FollyDecodeUniform, iters) { - std::vector data; - std::unique_ptr buf; - uint8_t* pos; - std::vector recovered; - BENCHMARK_SUSPEND { - recovered.resize(kNumElements); - data = MakeUniformData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - pos += folly::encodeVarint(data[i], pos); - } - } - while (iters--) { - const uint8_t* fstart = buf.get(); - const uint8_t* fend = buf.get() + (pos - buf.get()); - folly::Range frange(fstart, fend); - for (int i = 0; i < kNumElements; ++i) { - recovered[i] = folly::decodeVarint(frange); - } - CHECK_EQ(recovered.back(), data.back()); - } -} +// ============================================================================ +// Fixed byte-width benchmarks (32-bit): isolate per-width performance +// ============================================================================ -BENCHMARK(DwrfDecodeUniform, iters) { - std::vector data; - std::unique_ptr buf; - std::vector recovered; - uint64_t varint_bytes; - BENCHMARK_SUSPEND { - recovered.resize(kNumElements); - data = MakeUniformData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } - varint_bytes = pos - buf.get(); - } - while (iters--) { - const char* cpos = buf.get(); - const char* end = cpos + varint_bytes; - for (int i = 0; i < kNumElements; ++i) { - recovered[i] = DwrfRead(&cpos, end); - } - CHECK_EQ(recovered.back(), data.back()); - } -} +BENCHMARK_DRAW_LINE(); -BENCHMARK(NimbleDecodeSkewed, iters) { +BENCHMARK(BulkDecode_1byte, iters) { std::vector data; std::unique_ptr buf; std::vector recovered; BENCHMARK_SUSPEND { recovered.resize(kNumElements); - data = MakeSkewedData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } + data = MakeFixedWidthData32(1); + uint64_t sz; + buf = EncodeData(data, sz); } while (iters--) { const char* cpos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - recovered[i] = nimble::varint::readVarint32(&cpos); - } + nimble::varint::bulkVarintDecode32(kNumElements, cpos, recovered.data()); CHECK_EQ(recovered.back(), data.back()); } } -BENCHMARK(NimbleBulkDecodeSkewed, iters) { +BENCHMARK_DRAW_LINE(); + +BENCHMARK(BulkDecode_2byte, iters) { std::vector data; std::unique_ptr buf; std::vector recovered; BENCHMARK_SUSPEND { recovered.resize(kNumElements); - data = MakeSkewedData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } + data = MakeFixedWidthData32(2); + uint64_t sz; + buf = EncodeData(data, sz); } while (iters--) { const char* cpos = buf.get(); @@ -283,55 +242,39 @@ BENCHMARK(NimbleBulkDecodeSkewed, iters) { } } -BENCHMARK(FollyDecodeSkewed, iters) { - std::vector data; - std::unique_ptr buf; - uint8_t* pos; - std::vector recovered; - BENCHMARK_SUSPEND { - recovered.resize(kNumElements); - data = MakeSkewedData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - pos += folly::encodeVarint(data[i], pos); - } - } - while (iters--) { - const uint8_t* fstart = buf.get(); - const uint8_t* fend = buf.get() + (pos - buf.get()); - folly::Range frange(fstart, fend); - for (int i = 0; i < kNumElements; ++i) { - recovered[i] = folly::decodeVarint(frange); - } - CHECK_EQ(recovered.back(), data.back()); - } -} +BENCHMARK_DRAW_LINE(); -BENCHMARK(DwrfDecodeSkewed, iters) { - std::vector data; - std::unique_ptr buf; - std::vector recovered; - uint64_t varint_bytes; - BENCHMARK_SUSPEND { - recovered.resize(kNumElements); - data = MakeSkewedData(); - buf = std::make_unique(kNumElements * folly::kMaxVarintLength32); - char* pos = buf.get(); - for (int i = 0; i < kNumElements; ++i) { - nimble::varint::writeVarint(data[i], &pos); - } - varint_bytes = pos - buf.get(); - } - while (iters--) { - const char* cpos = buf.get(); - const char* end = cpos + varint_bytes; - for (int i = 0; i < kNumElements; ++i) { - recovered[i] = DwrfRead(&cpos, end); - } - CHECK_EQ(recovered.back(), data.back()); +// ============================================================================ +// Batch size benchmarks: how does bulk decode scale with n? +// ============================================================================ + +BENCHMARK_DRAW_LINE(); + +#define BATCH_SIZE_BENCH(N) \ + BENCHMARK(BulkDecode_batch##N, iters) { \ + std::vector data; \ + std::unique_ptr buf; \ + std::vector recovered; \ + BENCHMARK_SUSPEND { \ + recovered.resize(N); \ + data = MakeUniformData(N); \ + uint64_t sz; \ + buf = EncodeData(data, sz); \ + } \ + while (iters--) { \ + const char* cpos = buf.get(); \ + nimble::varint::bulkVarintDecode32(N, cpos, recovered.data()); \ + folly::doNotOptimizeAway(recovered.back()); \ + } \ } -} + +BATCH_SIZE_BENCH(4) +BATCH_SIZE_BENCH(8) +BATCH_SIZE_BENCH(16) +BATCH_SIZE_BENCH(64) +BATCH_SIZE_BENCH(256) +BATCH_SIZE_BENCH(1024) +BATCH_SIZE_BENCH(4096) int main() { folly::runBenchmarks(); diff --git a/dwio/nimble/common/tests/VarintTests.cpp b/dwio/nimble/common/tests/VarintTests.cpp index 95fea935..9aee78f0 100644 --- a/dwio/nimble/common/tests/VarintTests.cpp +++ b/dwio/nimble/common/tests/VarintTests.cpp @@ -16,6 +16,8 @@ #include #include +#include + #include "dwio/nimble/common/Varint.h" #include "folly/Random.h" #include "folly/Range.h" @@ -25,8 +27,56 @@ using namespace ::facebook; namespace { const int kNumElements = 10000; + +// Encode a vector of values into a varint buffer, returning the buffer and its +// size. +template +std::pair, size_t> encodeValues( + const std::vector& values) { + auto buf = + std::make_unique(values.size() * folly::kMaxVarintLength64); + char* pos = buf.get(); + for (auto val : values) { + nimble::varint::writeVarint(val, &pos); + } + return {std::move(buf), static_cast(pos - buf.get())}; +} + +// Bulk-decode and verify the result matches the expected values. +template +void verifyBulkDecode(const std::vector& expected, const char* encoded) { + std::vector decoded(expected.size()); + if constexpr (sizeof(T) == 4) { + nimble::varint::bulkVarintDecode32( + expected.size(), encoded, decoded.data()); + } else { + nimble::varint::bulkVarintDecode64( + expected.size(), encoded, decoded.data()); + } + for (size_t i = 0; i < expected.size(); ++i) { + ASSERT_EQ(expected[i], decoded[i]) + << "mismatch at index " << i << " of " << expected.size(); + } } +// Bulk-decode using V2 and verify the result matches the expected values. +template +void verifyBulkDecodeV2(const std::vector& expected, const char* encoded) { + std::vector decoded(expected.size()); + if constexpr (sizeof(T) == 4) { + nimble::varint_v2::bulkVarintDecode32( + expected.size(), encoded, decoded.data()); + } else { + nimble::varint_v2::bulkVarintDecode64( + expected.size(), encoded, decoded.data()); + } + for (size_t i = 0; i < expected.size(); ++i) { + ASSERT_EQ(expected[i], decoded[i]) + << "V2 mismatch at index " << i << " of " << expected.size(); + } +} +} // namespace + TEST(VarintTests, varintSize32) { // Boundary values for varint encoding. EXPECT_EQ(nimble::varint::varintSize(uint32_t{0}), 1); @@ -181,3 +231,227 @@ TEST(VarintTests, WriteRead64) { ASSERT_EQ(data[i], bulk[i]); } } + +// ============================================================================ +// Single-byte varint tests: exercise the V2 SIMD decodeSingleByteRun path. +// The function has three loops: +// 1. Wide loop: processes kU8BatchSize bytes (32 on AVX2, 16 on SSE/NEON) +// 2. 8-byte loop: processes 8 bytes at a time +// 3. Tail loop: processes 1 byte at a time +// These tests cover boundary conditions for all three loops. +// ============================================================================ + +// All 128 single-byte values (0-127) decode correctly for uint32_t. +TEST(VarintTests, SingleByte32_AllValues) { + std::vector data(128); + std::iota(data.begin(), data.end(), 0); + auto [buf, size] = encodeValues(data); + ASSERT_EQ(size, 128u); + verifyBulkDecodeV2(data, buf.get()); +} + +// All 128 single-byte values (0-127) decode correctly for uint64_t. +TEST(VarintTests, SingleByte64_AllValues) { + std::vector data(128); + std::iota(data.begin(), data.end(), 0); + auto [buf, size] = encodeValues(data); + ASSERT_EQ(size, 128u); + verifyBulkDecodeV2(data, buf.get()); +} + +// Test every count from 0 to 100 with all-zero values. +// Exercises exact boundary transitions between wide/8-byte/tail loops. +TEST(VarintTests, SingleByte32_AllCountsZero) { + for (int count = 0; count <= 100; ++count) { + std::vector data(count, 0); + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } +} + +// Test every count from 0 to 100 with value 127 (max single-byte varint). +TEST(VarintTests, SingleByte32_AllCountsMax) { + for (int count = 0; count <= 100; ++count) { + std::vector data(count, 127); + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } +} + +// Test counts at specific SIMD boundaries with uint64_t. +TEST(VarintTests, SingleByte64_SimdBoundaries) { + for (int count : {0, 1, 2, 7, 8, 9, 15, 16, 17, 31, + 32, 33, 63, 64, 65, 96, 127, 128, 256, 1000}) { + std::vector data(count); + for (int i = 0; i < count; ++i) { + data[i] = i % 128; + } + auto [buf, size] = encodeValues(data); + ASSERT_EQ(size, static_cast(count)); + verifyBulkDecodeV2(data, buf.get()); + } +} + +// A multi-byte varint (>=128) interrupts the single-byte run at each position +// within a 64-element window. Verifies the SIMD path correctly bails out and +// the remaining elements are decoded by the fallback path. +TEST(VarintTests, SingleByte32_MultiByteInterrupt) { + for (int interruptPos = 0; interruptPos < 64; ++interruptPos) { + const int count = 64; + std::vector data(count); + for (int i = 0; i < count; ++i) { + data[i] = (i == interruptPos) ? 200 : (i % 128); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } +} + +TEST(VarintTests, SingleByte64_MultiByteInterrupt) { + for (int interruptPos = 0; interruptPos < 64; ++interruptPos) { + const int count = 64; + std::vector data(count); + for (int i = 0; i < count; ++i) { + data[i] = (i == interruptPos) ? 200 : (i % 128); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } +} + +// Single-byte values followed by progressively longer multi-byte varints. +// Tests the transition from decodeSingleByteRun into the BMI2/scalar path. +TEST(VarintTests, SingleByte32_TransitionToMultiByte) { + for (int singleCount : {0, 1, 7, 8, 15, 16, 31, 32, 33, 64}) { + for (int multiCount : {0, 1, 5, 10}) { + std::vector data; + data.reserve(singleCount + multiCount); + for (int i = 0; i < singleCount; ++i) { + data.push_back(i % 128); + } + for (int i = 0; i < multiCount; ++i) { + data.push_back(128 + i * 1000); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } + } +} + +TEST(VarintTests, SingleByte64_TransitionToMultiByte) { + for (int singleCount : {0, 1, 7, 8, 15, 16, 31, 32, 33, 64}) { + for (int multiCount : {0, 1, 5, 10}) { + std::vector data; + data.reserve(singleCount + multiCount); + for (int i = 0; i < singleCount; ++i) { + data.push_back(i % 128); + } + for (int i = 0; i < multiCount; ++i) { + data.push_back(128 + static_cast(i) * 1000); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } + } +} + +// Alternating single-byte and multi-byte varints. The SIMD path must +// correctly handle frequent bail-outs and re-entries. +TEST(VarintTests, SingleByte32_AlternatingSingleMulti) { + std::vector data; + data.reserve(200); + for (int i = 0; i < 200; ++i) { + data.push_back(i % 2 == 0 ? (i % 128) : (128 + i)); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); +} + +// Large run of single-byte varints to stress the wide SIMD loop. +TEST(VarintTests, SingleByte32_LargeRun) { + const int count = 100000; + std::vector data(count); + for (int i = 0; i < count; ++i) { + data[i] = i % 128; + } + auto [buf, size] = encodeValues(data); + ASSERT_EQ(size, static_cast(count)); + verifyBulkDecodeV2(data, buf.get()); +} + +TEST(VarintTests, SingleByte64_LargeRun) { + const int count = 100000; + std::vector data(count); + for (int i = 0; i < count; ++i) { + data[i] = i % 128; + } + auto [buf, size] = encodeValues(data); + ASSERT_EQ(size, static_cast(count)); + verifyBulkDecodeV2(data, buf.get()); +} + +// Random mix: ~80% single-byte, ~20% multi-byte, with a random seed. +TEST(VarintTests, SingleByte32_RandomMix) { + auto seed = folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng(seed); + + std::vector data(kNumElements); + for (int i = 0; i < kNumElements; ++i) { + if (folly::Random::rand32(rng) % 5 != 0) { + data[i] = folly::Random::rand32(rng) % 128; + } else { + data[i] = 128 + folly::Random::rand32(rng) % 10000; + } + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); +} + +TEST(VarintTests, SingleByte64_RandomMix) { + auto seed = folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng(seed); + + std::vector data(kNumElements); + for (int i = 0; i < kNumElements; ++i) { + if (folly::Random::rand32(rng) % 5 != 0) { + data[i] = folly::Random::rand32(rng) % 128; + } else { + data[i] = 128 + folly::Random::rand64(rng) % 1000000; + } + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); +} + +// Constant value runs for each single-byte value. +TEST(VarintTests, SingleByte32_ConstantRuns) { + for (uint32_t val = 0; val < 128; ++val) { + std::vector data(37, val); + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); + } +} + +// Verify that a single multi-byte varint at the very start works. +TEST(VarintTests, SingleByte32_MultiByteFirst) { + std::vector data = {300}; + for (int i = 0; i < 50; ++i) { + data.push_back(i % 128); + } + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); +} + +// Verify that a single multi-byte varint at the very end works. +TEST(VarintTests, SingleByte32_MultiByteLast) { + std::vector data; + data.reserve(51); + for (int i = 0; i < 50; ++i) { + data.push_back(i % 128); + } + data.push_back(300); + auto [buf, size] = encodeValues(data); + verifyBulkDecodeV2(data, buf.get()); +} diff --git a/dwio/nimble/encodings/EncodingFactory.cpp b/dwio/nimble/encodings/EncodingFactory.cpp index d33dca9e..86fcbc2c 100644 --- a/dwio/nimble/encodings/EncodingFactory.cpp +++ b/dwio/nimble/encodings/EncodingFactory.cpp @@ -18,13 +18,17 @@ #include "dwio/nimble/encodings/DictionaryEncoding.h" #include "dwio/nimble/encodings/EncodingSelection.h" #include "dwio/nimble/encodings/FixedBitWidthEncoding.h" +#include "dwio/nimble/encodings/FixedBitWidthEncodingV2.h" #include "dwio/nimble/encodings/MainlyConstantEncoding.h" +#include "dwio/nimble/encodings/MainlyConstantEncodingV2.h" #include "dwio/nimble/encodings/NullableEncoding.h" #include "dwio/nimble/encodings/PrefixEncoding.h" #include "dwio/nimble/encodings/RleEncoding.h" +#include "dwio/nimble/encodings/RleEncodingV2.h" #include "dwio/nimble/encodings/SparseBoolEncoding.h" #include "dwio/nimble/encodings/TrivialEncoding.h" #include "dwio/nimble/encodings/VarintEncoding.h" +#include "dwio/nimble/encodings/VarintEncodingV2.h" namespace facebook::nimble { @@ -238,6 +242,18 @@ std::unique_ptr EncodingFactory::decode( "Trying to deserialize a PrefixEncoding with a non-string data type."); return std::make_unique(memoryPool, data, options); } + case EncodingType::VarintV2: { + RETURN_ENCODING_BY_VARINT_TYPE(VarintEncodingV2, dataType); + } + case EncodingType::RLEV2: { + RETURN_ENCODING_BY_LEAF_TYPE(RLEV2Encoding, dataType); + } + case EncodingType::FixedBitWidthV2: { + RETURN_ENCODING_BY_NUMERIC_TYPE(FixedBitWidthEncodingV2, dataType); + } + case EncodingType::MainlyConstantV2: { + RETURN_ENCODING_BY_NON_BOOL_TYPE(MainlyConstantEncodingV2, dataType); + } default: { NIMBLE_UNREACHABLE( "Trying to deserialize invalid EncodingType:{} -- garbage input?", @@ -361,6 +377,38 @@ std::string_view EncodingFactory::encode( return PrefixEncoding::encode(selection, castedValues, buffer, options); } } + case EncodingType::VarintV2: { + if constexpr ( + isNumericType() && + (sizeof(physicalType) == 4 || sizeof(T) == 8)) { + return VarintEncodingV2::encode( + selection, castedValues, buffer, options); + } else { + NIMBLE_INCOMPATIBLE_ENCODING( + "VarintV2 encoding can only be selected for large numeric data types."); + } + } + case EncodingType::RLEV2: { + return RLEV2Encoding::encode(selection, castedValues, buffer, options); + } + case EncodingType::FixedBitWidthV2: { + if constexpr (isNumericType()) { + return FixedBitWidthEncodingV2::encode( + selection, castedValues, buffer, options); + } else { + NIMBLE_INCOMPATIBLE_ENCODING( + "FixedBitWidthV2 encoding should not be selected for non-numeric data types."); + } + } + case EncodingType::MainlyConstantV2: { + if constexpr (std::is_same::value) { + NIMBLE_INCOMPATIBLE_ENCODING( + "MainlyConstantV2 encoding should not be selected for bool data types."); + } else { + return MainlyConstantEncodingV2::encode( + selection, castedValues, buffer, options); + } + } default: { NIMBLE_UNSUPPORTED( "Encoding {} is not supported.", toString(selection.encodingType())); diff --git a/dwio/nimble/encodings/EncodingLayout.cpp b/dwio/nimble/encodings/EncodingLayout.cpp index a8dc8c3a..1b6ccae9 100644 --- a/dwio/nimble/encodings/EncodingLayout.cpp +++ b/dwio/nimble/encodings/EncodingLayout.cpp @@ -161,6 +161,7 @@ EncodingLayout EncodingLayoutCapture::capture(std::string_view encoding) { CompressionType compressionType = CompressionType::Uncompressed; if (encodingType == EncodingType::FixedBitWidth || + encodingType == EncodingType::FixedBitWidthV2 || encodingType == EncodingType::Trivial) { compressionType = encoding::peek( encoding.data() + kEncodingPrefixSize); @@ -169,7 +170,9 @@ EncodingLayout EncodingLayoutCapture::capture(std::string_view encoding) { std::vector> children; switch (encodingType) { case EncodingType::FixedBitWidth: + case EncodingType::FixedBitWidthV2: case EncodingType::Varint: + case EncodingType::VarintV2: case EncodingType::Constant: case EncodingType::Prefix: { // Non nested encodings have zero children @@ -195,7 +198,8 @@ EncodingLayout EncodingLayoutCapture::capture(std::string_view encoding) { encoding.substr(kEncodingPrefixSize + 1))); break; } - case EncodingType::MainlyConstant: { + case EncodingType::MainlyConstant: + case EncodingType::MainlyConstantV2: { children.reserve(2); const char* pos = encoding.data() + kEncodingPrefixSize; @@ -226,7 +230,8 @@ EncodingLayout EncodingLayoutCapture::capture(std::string_view encoding) { {pos, encoding.size() - (pos - encoding.data())})); break; } - case EncodingType::RLE: { + case EncodingType::RLE: + case EncodingType::RLEV2: { const auto dataType = encoding::peek(encoding.data() + 1); diff --git a/dwio/nimble/encodings/EncodingSelectionPolicy.h b/dwio/nimble/encodings/EncodingSelectionPolicy.h index 045cd461..24eb627d 100644 --- a/dwio/nimble/encodings/EncodingSelectionPolicy.h +++ b/dwio/nimble/encodings/EncodingSelectionPolicy.h @@ -368,6 +368,10 @@ class ManualEncodingSelectionPolicyFactory { EncodingType::Dictionary, EncodingType::RLE, EncodingType::Varint, + EncodingType::VarintV2, + EncodingType::RLEV2, + EncodingType::FixedBitWidthV2, + EncodingType::MainlyConstantV2, }; } diff --git a/dwio/nimble/encodings/EncodingSizeEstimation.h b/dwio/nimble/encodings/EncodingSizeEstimation.h index 5a569d5b..ada13c1d 100644 --- a/dwio/nimble/encodings/EncodingSizeEstimation.h +++ b/dwio/nimble/encodings/EncodingSizeEstimation.h @@ -45,7 +45,8 @@ struct EncodingSizeEstimation { physicalType>()} : std::nullopt; } - case EncodingType::MainlyConstant: { + case EncodingType::MainlyConstant: + case EncodingType::MainlyConstantV2: { // Assumptions: // We store one entry for the common value. // Number of uncommon values is total item count minus the max unique @@ -85,7 +86,8 @@ struct EncodingSizeEstimation { return getEncodingOverhead() + (entryCount * sizeof(physicalType)); } - case EncodingType::FixedBitWidth: { + case EncodingType::FixedBitWidth: + case EncodingType::FixedBitWidthV2: { return getEncodingOverhead< EncodingType::FixedBitWidth, physicalType>() + @@ -108,7 +110,8 @@ struct EncodingSizeEstimation { getEncodingOverhead(); return overhead + alphabetSize + indicesSize; } - case EncodingType::RLE: { + case EncodingType::RLE: + case EncodingType::RLEV2: { // Assumptions: // Run values are stored bit-packed (with bit width needed to store max // value). Run lengths are stored using bit-packing (with bit width @@ -130,7 +133,8 @@ struct EncodingSizeEstimation { getEncodingOverhead(); return overhead + runValuesSize + runLengthsSize; } - case EncodingType::Varint: { + case EncodingType::Varint: + case EncodingType::VarintV2: { // Note: the condition below actually support floating point numbers as // well, as we use physicalType, which, for floating point numbers, is // an integer. @@ -188,7 +192,8 @@ struct EncodingSizeEstimation { return getEncodingOverhead() + FixedBitArray::bufferSize(entryCount, 1); } - case EncodingType::RLE: { + case EncodingType::RLE: + case EncodingType::RLEV2: { // Assumptions: // Run lengths are stored using bit-packing (with bit width // needed to store max repetition count). @@ -222,7 +227,8 @@ struct EncodingSizeEstimation { physicalType>(maxStringSize)} : std::nullopt; } - case EncodingType::MainlyConstant: { + case EncodingType::MainlyConstant: + case EncodingType::MainlyConstantV2: { // Assumptions: // We store one entry for the common value. // For each uncommon value we store the its value. @@ -315,7 +321,8 @@ struct EncodingSizeEstimation { getEncodingOverhead(); return overhead + alphabetSize + indicesSize; } - case EncodingType::RLE: { + case EncodingType::RLE: + case EncodingType::RLEV2: { // Assumptions: // Run values are stored using dictionary (and inside, trivial + // bit-packing). Run lengths are stored using bit-packing (with bit diff --git a/dwio/nimble/encodings/EncodingUtils.h b/dwio/nimble/encodings/EncodingUtils.h index 7c9b51c6..d5fc4c8c 100644 --- a/dwio/nimble/encodings/EncodingUtils.h +++ b/dwio/nimble/encodings/EncodingUtils.h @@ -18,13 +18,17 @@ #include "dwio/nimble/encodings/ConstantEncoding.h" #include "dwio/nimble/encodings/DictionaryEncoding.h" #include "dwio/nimble/encodings/FixedBitWidthEncoding.h" +#include "dwio/nimble/encodings/FixedBitWidthEncodingV2.h" #include "dwio/nimble/encodings/MainlyConstantEncoding.h" +#include "dwio/nimble/encodings/MainlyConstantEncodingV2.h" #include "dwio/nimble/encodings/NullableEncoding.h" #include "dwio/nimble/encodings/PrefixEncoding.h" #include "dwio/nimble/encodings/RleEncoding.h" +#include "dwio/nimble/encodings/RleEncodingV2.h" #include "dwio/nimble/encodings/SparseBoolEncoding.h" #include "dwio/nimble/encodings/TrivialEncoding.h" #include "dwio/nimble/encodings/VarintEncoding.h" +#include "dwio/nimble/encodings/VarintEncodingV2.h" namespace facebook::nimble { @@ -79,6 +83,11 @@ auto encodingTypeDispatchString(Encoding& encoding, F f) { case EncodingType::MainlyConstant: return f( static_cast&>(encoding)); + case EncodingType::MainlyConstantV2: + return f( + static_cast&>(encoding)); + case EncodingType::RLEV2: + return f(static_cast&>(encoding)); case EncodingType::Prefix: return f(static_cast(encoding)); default: @@ -127,6 +136,25 @@ auto encodingTypeDispatchNonString(Encoding& encoding, F&& f) { return f(static_cast&>(encoding)); case EncodingType::MainlyConstant: return f(static_cast&>(encoding)); + case EncodingType::VarintV2: + if constexpr (folly::IsOneOf< + T, + int32_t, + uint32_t, + int64_t, + uint64_t, + float, + double>::value) { + return f(static_cast&>(encoding)); + } else { + NIMBLE_UNREACHABLE(toString(encoding.dataType())); + } + case EncodingType::RLEV2: + return f(static_cast&>(encoding)); + case EncodingType::FixedBitWidthV2: + return f(static_cast&>(encoding)); + case EncodingType::MainlyConstantV2: + return f(static_cast&>(encoding)); default: NIMBLE_UNSUPPORTED(toString(encoding.encodingType())); } diff --git a/dwio/nimble/encodings/FixedBitWidthEncodingV2.h b/dwio/nimble/encodings/FixedBitWidthEncodingV2.h new file mode 100644 index 00000000..0a4fd959 --- /dev/null +++ b/dwio/nimble/encodings/FixedBitWidthEncodingV2.h @@ -0,0 +1,313 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "dwio/nimble/common/Buffer.h" +#include "dwio/nimble/common/EncodingPrimitives.h" +#include "dwio/nimble/common/EncodingType.h" +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/common/FixedBitArray.h" +#include "dwio/nimble/common/Vector.h" +#include "dwio/nimble/encodings/Compression.h" +#include "dwio/nimble/encodings/Encoding.h" + +// V2 variant of FixedBitWidthEncoding for benchmarking and experimentation. + +namespace facebook::nimble { + +template +class FixedBitWidthEncodingV2 final + : public TypedEncoding::physicalType> { + public: + using cppDataType = T; + using physicalType = typename TypeTraits::physicalType; + + static const int kPrefixSize = 2 + sizeof(T); + + FixedBitWidthEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + void reset() final; + void skip(uint32_t rowCount) final; + void materialize(uint32_t rowCount, void* buffer) final; + + template + void readWithVisitor(DecoderVisitor& visitor, ReadWithVisitorParams& params); + + template + void bulkScan( + Visitor& visitor, + uint32_t nonNullsSoFar, + const int32_t* rows, + int32_t numRows, + const int32_t* scatterRows); + + static std::string_view encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options = {}); + + std::string debugString(int offset) const final; + + private: + int bitWidth_; + physicalType baseline_; + FixedBitArray fixedBitArray_; + uint32_t row_ = 0; + Vector uncompressedData_; + Vector buffer_; +}; + +// +// End of public API. Implementations follow. +// + +template +FixedBitWidthEncodingV2::FixedBitWidthEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function /* stringBufferFactory */, + const Encoding::Options& options) + : TypedEncoding{memoryPool, data, options}, + uncompressedData_{&memoryPool}, + buffer_{&memoryPool} { + auto pos = data.data() + this->dataOffset(); + auto compressionType = static_cast(encoding::readChar(pos)); + baseline_ = encoding::read(pos); + bitWidth_ = static_cast(encoding::readChar(pos)); + if (compressionType != CompressionType::Uncompressed) { + uncompressedData_ = Compression::uncompress( + memoryPool, + compressionType, + DataType::Undefined, + {pos, static_cast(data.end() - pos)}); + fixedBitArray_ = FixedBitArray{ + {uncompressedData_.data(), uncompressedData_.size()}, bitWidth_}; + } else { + fixedBitArray_ = + FixedBitArray{{pos, static_cast(data.end() - pos)}, bitWidth_}; + } +} + +template +void FixedBitWidthEncodingV2::reset() { + row_ = 0; +} + +template +void FixedBitWidthEncodingV2::skip(uint32_t rowCount) { + row_ += rowCount; +} + +template +void FixedBitWidthEncodingV2::materialize(uint32_t rowCount, void* buffer) { + if constexpr (isFourByteIntegralType()) { + fixedBitArray_.bulkGetWithBaseline32( + row_, rowCount, static_cast(buffer), baseline_); + } else { + if (sizeof(physicalType) == 8 && bitWidth_ <= 32) { + fixedBitArray_.bulkGetWithBaseline32Into64( + row_, rowCount, static_cast(buffer), baseline_); + } else { + const uint32_t start = row_; + const uint32_t end = start + rowCount; + physicalType* output = static_cast(buffer); + for (uint32_t i = start; i < end; ++i) { + *output++ = fixedBitArray_.get(i) + baseline_; + } + } + } + row_ += rowCount; +} + +template +template +void FixedBitWidthEncodingV2::readWithVisitor( + V& visitor, + ReadWithVisitorParams& params) { + using OutputType = detail::ValueType; + constexpr bool kExtractToReader = + std::is_same_v; + constexpr bool kSameType = std::is_same_v; + constexpr bool kIsWidening = sizeof(OutputType) > sizeof(physicalType) && + std::is_integral_v && std::is_integral_v; + constexpr bool kCanUseFastPath = isFourByteIntegralType() && + !V::kHasFilter && !V::kHasHook && kExtractToReader && + (kSameType || kIsWidening); + if constexpr (kCanUseFastPath) { + auto* nulls = visitor.reader().rawNullsInReadRange(); + detail::readWithVisitorFast(*this, visitor, params, nulls); + return; + } + detail::readWithVisitorSlow( + visitor, + params, + [&](auto toSkip) { skip(toSkip); }, + [&] { + physicalType value = fixedBitArray_.get(row_++) + baseline_; + return value; + }); +} + +template +template +void FixedBitWidthEncodingV2::bulkScan( + V& visitor, + uint32_t currentRow, + const int32_t* nonNullRows, + int32_t numNonNulls, + const int32_t* scatterRows) { + using DataType = typename V::DataType; + using OutputType = detail::ValueType; + static_assert( + isFourByteIntegralType(), + "bulkScan only supports 4-byte integral types"); + + if (numNonNulls == 0) { + return; + } + + const auto numRows = visitor.numRows() - visitor.rowIndex(); + + const auto offset = + static_cast(row_) - static_cast(currentRow); + + auto* values = detail::mutableValues(visitor, numRows); + + constexpr bool kSameSize = sizeof(physicalType) == sizeof(OutputType); + constexpr bool kIsUpcast = sizeof(OutputType) > sizeof(physicalType) && + std::is_integral_v && std::is_integral_v; + + if constexpr (V::dense) { + buffer_.resize(numNonNulls); + fixedBitArray_.bulkGetWithBaseline32( + nonNullRows[0] + offset, + numNonNulls, + reinterpret_cast(buffer_.data()), + baseline_); + + if constexpr (kSameSize) { + std::memcpy(values, buffer_.data(), numNonNulls * sizeof(physicalType)); + } else if constexpr (kIsUpcast) { + for (int32_t i = 0; i < numNonNulls; ++i) { + values[i] = static_cast(buffer_[i]); + } + } + } else { + for (int32_t i = 0; i < numNonNulls; ++i) { + values[i] = fixedBitArray_.get(nonNullRows[i] + offset) + baseline_; + } + } + + row_ += nonNullRows[numNonNulls - 1] - currentRow + 1; + + visitor.addNumValues(V::dense ? numRows : numNonNulls); + visitor.setRowIndex(visitor.numRows()); +} + +template +std::string_view FixedBitWidthEncodingV2::encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options) { + const bool useVarint = options.useVarintRowCount; + static_assert( + std::is_same_v< + typename std::make_unsigned::type, + physicalType>, + "Physical type must be unsigned."); + const uint32_t rowCount = values.size(); + const int bitsRequired = + (velox::bits::bitsRequired( + selection.statistics().max() - selection.statistics().min()) + + 7) & + ~7; + + const uint32_t fixedBitArraySize = + FixedBitArray::bufferSize(values.size(), bitsRequired); + + Vector vector{&buffer.getMemoryPool()}; + + auto dataCompressionPolicy = selection.compressionPolicy(); + CompressionEncoder compressionEncoder{ + buffer.getMemoryPool(), + *dataCompressionPolicy, + DataType::Undefined, + bitsRequired, + fixedBitArraySize, + [&]() { + vector.resize(fixedBitArraySize); + return std::span{vector}; + }, + [&, baseline = selection.statistics().min()](char*& pos) { + memset(pos, 0, fixedBitArraySize); + FixedBitArray fba(pos, bitsRequired); + if constexpr (sizeof(physicalType) == 4) { + fba.bulkSet32WithBaseline( + 0, + rowCount, + reinterpret_cast(values.data()), + baseline); + } else { + for (uint32_t i = 0; i < values.size(); ++i) { + fba.set(i, values[i] - baseline); + } + } + pos += fixedBitArraySize; + return pos; + }}; + + const uint32_t encodingSize = + Encoding::serializePrefixSize(rowCount, useVarint) + + FixedBitWidthEncodingV2::kPrefixSize + compressionEncoder.getSize(); + char* reserved = buffer.reserve(encodingSize); + char* pos = reserved; + Encoding::serializePrefix( + EncodingType::FixedBitWidthV2, + TypeTraits::dataType, + rowCount, + useVarint, + pos); + encoding::writeChar( + static_cast(compressionEncoder.compressionType()), pos); + encoding::write(selection.statistics().min(), pos); + encoding::writeChar(bitsRequired, pos); + compressionEncoder.write(pos); + + NIMBLE_DCHECK_EQ(encodingSize, pos - reserved, "Encoding size mismatch."); + return {reserved, encodingSize}; +} + +template +std::string FixedBitWidthEncodingV2::debugString(int offset) const { + return fmt::format( + "{}{}<{}> rowCount={} bit_width={}", + std::string(offset, ' '), + toString(Encoding::encodingType()), + toString(Encoding::dataType()), + Encoding::rowCount(), + bitWidth_); +} + +} // namespace facebook::nimble diff --git a/dwio/nimble/encodings/MainlyConstantEncodingV2.cpp b/dwio/nimble/encodings/MainlyConstantEncodingV2.cpp new file mode 100644 index 00000000..9c588bc7 --- /dev/null +++ b/dwio/nimble/encodings/MainlyConstantEncodingV2.cpp @@ -0,0 +1,109 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/encodings/MainlyConstantEncodingV2.h" + +namespace facebook::nimble { + +MainlyConstantEncodingV2::MainlyConstantEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options) + : MainlyConstantEncodingBase(memoryPool, data, options) { + const char* pos = data.data() + this->dataOffset(); + const uint32_t isCommonBytes = encoding::readUint32(pos); + isCommon_ = EncodingFactory::decode( + *this->pool_, {pos, isCommonBytes}, stringBufferFactory, options); + pos += isCommonBytes; + const uint32_t otherValuesBytes = encoding::readUint32(pos); + otherValues_ = EncodingFactory::decode( + *this->pool_, {pos, otherValuesBytes}, stringBufferFactory, options); + pos += otherValuesBytes; + commonValue_ = encoding::read(pos); + NIMBLE_CHECK(pos == data.end(), "Unexpected mainly constant encoding end"); + auto stringBuffer = + static_cast(stringBufferFactory(commonValue_.size())); + std::memcpy(stringBuffer, commonValue_.data(), commonValue_.size()); + commonValue_ = std::string_view(stringBuffer, commonValue_.size()); +} + +std::string_view MainlyConstantEncodingV2::encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options) { + const bool useVarint = options.useVarintRowCount; + if (values.empty()) { + NIMBLE_INCOMPATIBLE_ENCODING("MainlyConstantEncodingV2 cannot be empty."); + } + + const auto commonElement = std::max_element( + selection.statistics().uniqueCounts()->cbegin(), + selection.statistics().uniqueCounts()->cend(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + const uint32_t entryCount = values.size(); + const uint32_t uncommonCount = entryCount - commonElement->second; + + Vector isCommon{&buffer.getMemoryPool(), values.size(), true}; + Vector otherValues(&buffer.getMemoryPool()); + otherValues.reserve(uncommonCount); + + physicalType commonValue = commonElement->first; + for (auto i = 0; i < values.size(); ++i) { + physicalType currentValue = values[i]; + if (currentValue != commonValue) { + isCommon[i] = false; + otherValues.push_back(std::move(currentValue)); + } + } + + Buffer tempBuffer{buffer.getMemoryPool()}; + std::string_view serializedIsCommon = selection.template encodeNested( + EncodingIdentifiers::MainlyConstant::IsCommon, + isCommon, + tempBuffer, + options); + std::string_view serializedOtherValues = + selection.template encodeNested( + EncodingIdentifiers::MainlyConstant::OtherValues, + otherValues, + tempBuffer, + options); + + uint32_t encodingSize = Encoding::serializePrefixSize(entryCount, useVarint) + + 8 + serializedIsCommon.size() + serializedOtherValues.size(); + if constexpr (isNumericType()) { + encodingSize += sizeof(physicalType); + } else { + encodingSize += 4 + commonValue.size(); + } + char* reserved = buffer.reserve(encodingSize); + char* pos = reserved; + Encoding::serializePrefix( + EncodingType::MainlyConstantV2, + TypeTraits::dataType, + entryCount, + useVarint, + pos); + encoding::writeString(serializedIsCommon, pos); + encoding::writeString(serializedOtherValues, pos); + encoding::write(commonValue, pos); + NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); + return {reserved, encodingSize}; +} +} // namespace facebook::nimble diff --git a/dwio/nimble/encodings/MainlyConstantEncodingV2.h b/dwio/nimble/encodings/MainlyConstantEncodingV2.h new file mode 100644 index 00000000..09995e4d --- /dev/null +++ b/dwio/nimble/encodings/MainlyConstantEncodingV2.h @@ -0,0 +1,167 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "dwio/nimble/common/Buffer.h" +#include "dwio/nimble/common/EncodingPrimitives.h" +#include "dwio/nimble/common/EncodingType.h" +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/common/Types.h" +#include "dwio/nimble/common/Vector.h" +#include "dwio/nimble/encodings/Encoding.h" +#include "dwio/nimble/encodings/EncodingFactory.h" +#include "dwio/nimble/encodings/EncodingIdentifier.h" +#include "dwio/nimble/encodings/EncodingSelection.h" +#include "velox/common/memory/Memory.h" + +// V2 variant of MainlyConstantEncoding for benchmarking and experimentation. +// Reuses MainlyConstantEncodingBase from MainlyConstantEncoding.h. + +#include "dwio/nimble/encodings/MainlyConstantEncoding.h" + +namespace facebook::nimble { + +template +class MainlyConstantEncodingV2 final : public MainlyConstantEncodingBase { + public: + using cppDataType = T; + using physicalType = typename TypeTraits::physicalType; + + MainlyConstantEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + static std::string_view encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options = {}); +}; + +// +// End of public API. Implementation follows. +// + +template +MainlyConstantEncodingV2::MainlyConstantEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options) + : MainlyConstantEncodingBase(memoryPool, data, options) { + const char* pos = data.data() + this->dataOffset(); + const uint32_t isCommonBytes = encoding::readUint32(pos); + this->isCommon_ = EncodingFactory::decode( + *this->pool_, {pos, isCommonBytes}, stringBufferFactory, options); + pos += isCommonBytes; + const uint32_t otherValuesBytes = encoding::readUint32(pos); + this->otherValues_ = EncodingFactory::decode( + *this->pool_, {pos, otherValuesBytes}, stringBufferFactory, options); + pos += otherValuesBytes; + this->commonValue_ = encoding::read(pos); + NIMBLE_CHECK(pos == data.end(), "Unexpected mainly constant encoding end"); +} + +template +std::string_view MainlyConstantEncodingV2::encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options) { + const bool useVarint = options.useVarintRowCount; + if (values.empty()) { + NIMBLE_INCOMPATIBLE_ENCODING("MainlyConstantEncodingV2 cannot be empty."); + } + + const auto commonElement = std::max_element( + selection.statistics().uniqueCounts().value().cbegin(), + selection.statistics().uniqueCounts().value().cend(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + const uint32_t entryCount = values.size(); + const uint32_t uncommonCount = entryCount - commonElement->second; + + Vector isCommon{&buffer.getMemoryPool(), values.size(), true}; + Vector otherValues(&buffer.getMemoryPool()); + otherValues.reserve(uncommonCount); + + physicalType commonValue = commonElement->first; + for (auto i = 0; i < values.size(); ++i) { + physicalType currentValue = values[i]; + if (currentValue != commonValue) { + isCommon[i] = false; + otherValues.push_back(std::move(currentValue)); + } + } + + Buffer tempBuffer{buffer.getMemoryPool()}; + std::string_view serializedIsCommon = selection.template encodeNested( + EncodingIdentifiers::MainlyConstant::IsCommon, + isCommon, + tempBuffer, + options); + std::string_view serializedOtherValues = + selection.template encodeNested( + EncodingIdentifiers::MainlyConstant::OtherValues, + otherValues, + tempBuffer, + options); + + uint32_t encodingSize = Encoding::serializePrefixSize(entryCount, useVarint) + + 8 + serializedIsCommon.size() + serializedOtherValues.size(); + if constexpr (isNumericType()) { + encodingSize += sizeof(physicalType); + } else { + encodingSize += 4 + commonValue.size(); + } + char* reserved = buffer.reserve(encodingSize); + char* pos = reserved; + Encoding::serializePrefix( + EncodingType::MainlyConstantV2, + TypeTraits::dataType, + entryCount, + useVarint, + pos); + encoding::writeString(serializedIsCommon, pos); + encoding::writeString(serializedOtherValues, pos); + encoding::write(commonValue, pos); + NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); + return {reserved, encodingSize}; +} + +template <> +class MainlyConstantEncodingV2 final + : public MainlyConstantEncodingBase { + public: + using cppDataType = std::string_view; + using physicalType = std::string_view; + + MainlyConstantEncodingV2( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + static std::string_view encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options = {}); +}; +} // namespace facebook::nimble diff --git a/dwio/nimble/encodings/RleEncodingV2.cpp b/dwio/nimble/encodings/RleEncodingV2.cpp new file mode 100644 index 00000000..d244e193 --- /dev/null +++ b/dwio/nimble/encodings/RleEncodingV2.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dwio/nimble/encodings/RleEncodingV2.h" + +namespace facebook::nimble { + +RLEV2Encoding::RLEV2Encoding( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options) + : internal_v2::RLEV2EncodingBase>( + memoryPool, + data, + stringBufferFactory, + options) { + initialValue_ = *reinterpret_cast( + internal_v2::RLEV2EncodingBase>:: + getValuesStart()); + NIMBLE_CHECK( + (internal_v2::RLEV2EncodingBase>:: + getValuesStart() + + 1) == data.end(), + "Unexpected run length encoding end"); + internal_v2::RLEV2EncodingBase>::reset(); +} + +bool RLEV2Encoding::nextValue() { + value_ = !value_; + return !value_; +} + +void RLEV2Encoding::resetValues() { + value_ = initialValue_; +} + +void RLEV2Encoding::materializeBoolsAsBits( + uint32_t rowCount, + uint64_t* buffer, + int begin) { + auto rowsLeft = rowCount; + while (rowsLeft) { + if (rowsLeft < copiesRemaining_) { + velox::bits::fillBits(buffer, begin, begin + rowsLeft, currentValue_); + copiesRemaining_ -= rowsLeft; + return; + } else { + velox::bits::fillBits( + buffer, begin, begin + copiesRemaining_, currentValue_); + begin += copiesRemaining_; + rowsLeft -= copiesRemaining_; + copiesRemaining_ = materializedRunLengths_.nextValue(); + currentValue_ = nextValue(); + } + } +} + +} // namespace facebook::nimble diff --git a/dwio/nimble/encodings/RleEncodingV2.h b/dwio/nimble/encodings/RleEncodingV2.h new file mode 100644 index 00000000..9556de18 --- /dev/null +++ b/dwio/nimble/encodings/RleEncodingV2.h @@ -0,0 +1,412 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include "dwio/nimble/common/Buffer.h" +#include "dwio/nimble/common/EncodingPrimitives.h" +#include "dwio/nimble/common/EncodingType.h" +#include "dwio/nimble/common/FixedBitArray.h" +#include "dwio/nimble/common/Rle.h" +#include "dwio/nimble/common/Vector.h" +#include "dwio/nimble/encodings/Encoding.h" +#include "dwio/nimble/encodings/EncodingFactory.h" +#include "dwio/nimble/encodings/EncodingIdentifier.h" +#include "dwio/nimble/encodings/EncodingSelection.h" + +// V2 variant of RLEEncoding for benchmarking and experimentation. + +namespace facebook::nimble { + +namespace internal_v2 { + +template +class RLEV2EncodingBase + : public TypedEncoding::physicalType> { + public: + using cppDataType = T; + using physicalType = typename TypeTraits::physicalType; + + RLEV2EncodingBase( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}) + : TypedEncoding(memoryPool, data, options), + materializedRunLengths_{EncodingFactory::decode( + memoryPool, + {data.data() + this->dataOffset() + 4, + *reinterpret_cast( + data.data() + this->dataOffset())}, + stringBufferFactory, + options)} {} + + void reset() { + materializedRunLengths_.reset(); + derived().resetValues(); + copiesRemaining_ = materializedRunLengths_.nextValue(); + currentValue_ = nextValue(); + } + + void skip(uint32_t rowCount) final { + uint32_t rowsLeft = rowCount; + while (rowsLeft) { + if (rowsLeft < copiesRemaining_) { + copiesRemaining_ -= rowsLeft; + return; + } else { + rowsLeft -= copiesRemaining_; + copiesRemaining_ = materializedRunLengths_.nextValue(); + currentValue_ = nextValue(); + } + } + } + + void materialize(uint32_t rowCount, void* buffer) final { + uint32_t rowsLeft = rowCount; + physicalType* output = static_cast(buffer); + while (rowsLeft) { + if (rowsLeft < copiesRemaining_) { + std::fill(output, output + rowsLeft, currentValue_); + copiesRemaining_ -= rowsLeft; + return; + } else { + std::fill(output, output + copiesRemaining_, currentValue_); + output += copiesRemaining_; + rowsLeft -= copiesRemaining_; + copiesRemaining_ = materializedRunLengths_.nextValue(); + currentValue_ = nextValue(); + } + } + } + + template + void readWithVisitor(DecoderVisitor& visitor, ReadWithVisitorParams& params) { + detail::readWithVisitorSlow( + visitor, + params, + [&](auto toSkip) { skip(toSkip); }, + [&] { + if (copiesRemaining_ == 0) { + copiesRemaining_ = materializedRunLengths_.nextValue(); + currentValue_ = nextValue(); + } + --copiesRemaining_; + return currentValue_; + }); + } + + static std::string_view encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options = {}) { + const bool useVarint = options.useVarintRowCount; + const uint32_t valueCount = values.size(); + Vector runLengths(&buffer.getMemoryPool()); + Vector runValues(&buffer.getMemoryPool()); + rle::computeRuns(values, &runLengths, &runValues); + + Buffer tempBuffer{buffer.getMemoryPool()}; + std::string_view serializedRunLengths = + selection.template encodeNested( + EncodingIdentifiers::RunLength::RunLengths, + runLengths, + tempBuffer, + options); + + std::string_view serializedRunValues = + getSerializedRunValues(selection, runValues, tempBuffer, options); + + const uint32_t encodingSize = + Encoding::serializePrefixSize(valueCount, useVarint) + 4 + + serializedRunLengths.size() + serializedRunValues.size(); + char* reserved = buffer.reserve(encodingSize); + char* pos = reserved; + Encoding::serializePrefix( + EncodingType::RLEV2, + TypeTraits::dataType, + valueCount, + useVarint, + pos); + encoding::writeString(serializedRunLengths, pos); + encoding::writeBytes(serializedRunValues, pos); + NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); + return {reserved, encodingSize}; + } + + const char* getValuesStart() const { + return this->data_.data() + this->dataOffset() + 4 + + *reinterpret_cast( + this->data_.data() + this->dataOffset()); + } + + RLEV2Encoding& derived() { + return *static_cast(this); + } + physicalType nextValue() { + return derived().nextValue(); + } + static std::string_view getSerializedRunValues( + EncodingSelection& selection, + const Vector& runValues, + Buffer& buffer, + const Encoding::Options& options = {}) { + return RLEV2Encoding::getSerializedRunValues( + selection, runValues, buffer, options); + } + + uint32_t copiesRemaining_ = 0; + physicalType currentValue_; + detail::BufferedEncoding materializedRunLengths_; +}; + +} // namespace internal_v2 + +template +class RLEV2Encoding final + : public internal_v2::RLEV2EncodingBase> { + using physicalType = typename TypeTraits::physicalType; + + public: + explicit RLEV2Encoding( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + physicalType nextValue(); + void resetValues(); + static std::string_view getSerializedRunValues( + EncodingSelection& selection, + const Vector& runValues, + Buffer& buffer, + const Encoding::Options& options = {}) { + return selection.template encodeNested( + EncodingIdentifiers::RunLength::RunValues, runValues, buffer, options); + } + + template + void readWithVisitor(DecoderVisitor& visitor, ReadWithVisitorParams& params); + + template + void bulkScan( + Visitor& visitor, + vector_size_t currentRow, + const vector_size_t* nonNullRows, + vector_size_t numNonNulls, + const vector_size_t* scatterRows); + + private: + template + vector_size_t findNumInRun( + const vector_size_t* rows, + vector_size_t rowIndex, + vector_size_t numRows, + vector_size_t currentRow) const; + + detail::BufferedEncoding values_; +}; + +template <> +class RLEV2Encoding final + : public internal_v2::RLEV2EncodingBase> { + public: + RLEV2Encoding( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + bool nextValue(); + void resetValues(); + static std::string_view getSerializedRunValues( + EncodingSelection& /* selection */, + const Vector& runValues, + Buffer& buffer, + const Encoding::Options& /* options */ = {}) { + char* reserved = buffer.reserve(sizeof(char)); + *reserved = runValues[0]; + return {reserved, 1}; + } + + void materializeBoolsAsBits(uint32_t rowCount, uint64_t* buffer, int begin) + final; + + private: + bool initialValue_; + bool value_; +}; + +// +// End of public API. Implementations follow. +// + +template +RLEV2Encoding::RLEV2Encoding( + velox::memory::MemoryPool& memoryPool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options) + : internal_v2::RLEV2EncodingBase>( + memoryPool, + data, + stringBufferFactory, + options), + values_{EncodingFactory::decode( + memoryPool, + {internal_v2::RLEV2EncodingBase>:: + getValuesStart(), + static_cast( + data.end() - + internal_v2::RLEV2EncodingBase>:: + getValuesStart())}, + stringBufferFactory, + options)} { + internal_v2::RLEV2EncodingBase>::reset(); +} + +template +typename RLEV2Encoding::physicalType RLEV2Encoding::nextValue() { + return values_.nextValue(); +} + +template +void RLEV2Encoding::resetValues() { + values_.reset(); +} + +template +template +vector_size_t RLEV2Encoding::findNumInRun( + const vector_size_t* rows, + vector_size_t rowIndex, + vector_size_t numRows, + vector_size_t currentRow) const { + if constexpr (kDense) { + return std::min(this->copiesRemaining_, numRows - rowIndex); + } + if (rows[rowIndex] - currentRow >= this->copiesRemaining_) { + return 0; + } + if (rows[numRows - 1] - currentRow < this->copiesRemaining_) { + return numRows - rowIndex; + } + auto* begin = rows + rowIndex; + auto* end = begin + + std::min(this->copiesRemaining_, numRows - rowIndex); + auto endOfRun = currentRow + this->copiesRemaining_; + auto* it = std::lower_bound(begin, end, endOfRun); + NIMBLE_DCHECK(it > begin); + return it - begin; +} + +template +template +void RLEV2Encoding::bulkScan( + V& visitor, + vector_size_t currentRow, + const vector_size_t* nonNullRows, + vector_size_t numNonNulls, + const vector_size_t* scatterRows) { + using DataType = typename V::DataType; + using ValueType = detail::ValueType; + constexpr bool kScatterValues = kScatter && !V::kHasFilter && !V::kHasHook; + auto* values = detail::mutableValues( + visitor, visitor.numRows() - visitor.rowIndex()); + auto* filterHits = V::kHasFilter ? visitor.outputRows(numNonNulls) : nullptr; + auto* rows = kScatter ? scatterRows : nonNullRows; + vector_size_t numValues = 0; + vector_size_t numHits = 0; + vector_size_t nonNullRowIndex = 0; + for (;;) { + if (this->copiesRemaining_ == 0) { + this->copiesRemaining_ = this->materializedRunLengths_.nextValue(); + this->currentValue_ = nextValue(); + } + const auto numInRun = findNumInRun( + nonNullRows, nonNullRowIndex, numNonNulls, currentRow); + if (numInRun > 0) { + auto value = detail::castFromPhysicalType(this->currentValue_); + bool pass = true; + if constexpr (V::kHasFilter) { + pass = velox::common::applyFilter(visitor.filter(), value); + if (pass) { + auto* begin = rows + nonNullRowIndex; + std::copy(begin, begin + numInRun, filterHits + numHits); + numHits += numInRun; + } + } + if (!V::kFilterOnly && pass) { + vector_size_t numRows; + if constexpr (kScatterValues) { + auto end = nonNullRowIndex + numInRun; + if (FOLLY_UNLIKELY(end == numNonNulls)) { + numRows = visitor.numRows() - visitor.rowIndex(); + } else { + numRows = scatterRows[end] - visitor.rowIndex(); + } + visitor.addRowIndex(numRows); + } else { + numRows = numInRun; + } + auto* begin = values + numValues; + std::fill(begin, begin + numRows, detail::dataToValue(visitor, value)); + numValues += numRows; + } + auto endRow = nonNullRows[nonNullRowIndex + numInRun - 1]; + auto consumed = endRow - currentRow + 1; + consumed = std::min(consumed, this->copiesRemaining_); + this->copiesRemaining_ -= consumed; + currentRow += consumed; + nonNullRowIndex += numInRun; + } + if (FOLLY_UNLIKELY(nonNullRowIndex == numNonNulls)) { + break; + } + currentRow += this->copiesRemaining_; + this->copiesRemaining_ = 0; + } + if constexpr (kScatterValues) { + NIMBLE_DCHECK_EQ(visitor.rowIndex(), visitor.numRows(), ""); + } else { + visitor.setRowIndex(visitor.numRows()); + } + if constexpr (V::kHasHook) { + NIMBLE_DCHECK_EQ(numValues, numNonNulls, ""); + visitor.hook().addValues(scatterRows, values, numNonNulls); + } else { + visitor.addNumValues(V::kFilterOnly ? numHits : numValues); + } +} + +template +template +void RLEV2Encoding::readWithVisitor( + V& visitor, + ReadWithVisitorParams& params) { + auto* nulls = visitor.reader().rawNullsInReadRange(); + if (velox::dwio::common::useFastPath(visitor, nulls)) { + detail::readWithVisitorFast(*this, visitor, params, nulls); + } else { + internal_v2::RLEV2EncodingBase>::readWithVisitor( + visitor, params); + } +} + +} // namespace facebook::nimble diff --git a/dwio/nimble/encodings/VarintEncodingV2.h b/dwio/nimble/encodings/VarintEncodingV2.h new file mode 100644 index 00000000..60dbd52c --- /dev/null +++ b/dwio/nimble/encodings/VarintEncodingV2.h @@ -0,0 +1,186 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "dwio/nimble/common/Buffer.h" +#include "dwio/nimble/common/EncodingPrimitives.h" +#include "dwio/nimble/common/EncodingType.h" +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/common/Types.h" +#include "dwio/nimble/common/Varint.h" +#include "dwio/nimble/common/Vector.h" +#include "dwio/nimble/encodings/Encoding.h" +#include "dwio/nimble/encodings/EncodingSelection.h" +#include "dwio/nimble/encodings/Statistics.h" + +// V2 variant of VarintEncoding for benchmarking and experimentation. + +namespace facebook::nimble { + +template +class VarintEncodingV2 final + : public TypedEncoding::physicalType> { + public: + using cppDataType = T; + using physicalType = typename TypeTraits::physicalType; + + static constexpr int kPrefixSize = sizeof(T); + + VarintEncodingV2( + velox::memory::MemoryPool& pool, + std::string_view data, + std::function stringBufferFactory, + const Encoding::Options& options = {}); + + void reset() final; + void skip(uint32_t rowCount) final; + void materialize(uint32_t rowCount, void* buffer) final; + + template + void readWithVisitor(DecoderVisitor& visitor, ReadWithVisitorParams& params); + + static std::string_view encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options = {}); + + private: + const physicalType baseValue_; + uint32_t row_{0}; + const char* pos_; + Vector buf_; +}; + +// +// End of public API. Implementations follow. +// + +template +VarintEncodingV2::VarintEncodingV2( + velox::memory::MemoryPool& pool, + std::string_view data, + std::function /* stringBufferFactory */, + const Encoding::Options& options) + : TypedEncoding(pool, data, options), + baseValue_{*reinterpret_cast( + data.data() + this->dataOffset())}, + buf_{this->pool_} { + reset(); +} + +template +void VarintEncodingV2::reset() { + row_ = 0; + pos_ = Encoding::data_.data() + this->dataOffset() + + VarintEncodingV2::kPrefixSize; +} + +template +void VarintEncodingV2::skip(uint32_t rowCount) { + row_ += rowCount; + pos_ = varint::bulkVarintSkip(rowCount, pos_); +} + +template +void VarintEncodingV2::materialize(uint32_t rowCount, void* buffer) { + static_assert( + sizeof(T) == 4 || sizeof(T) == 8, + "Varint encoding require 4 or 8 bytes data types."); + if constexpr (isFourByteIntegralType()) { + pos_ = varint_v2::bulkVarintDecode32( + rowCount, pos_, static_cast(buffer)); + } else { + pos_ = varint_v2::bulkVarintDecode64( + rowCount, pos_, static_cast(buffer)); + } + + if (baseValue_ != 0) { + auto* output = reinterpret_cast(buffer); + for (auto i = 0; i < rowCount; ++i) { + output[i] += baseValue_; + } + } + row_ += rowCount; +} + +template +template +void VarintEncodingV2::readWithVisitor( + V& visitor, + ReadWithVisitorParams& params) { + detail::readWithVisitorSlow( + visitor, + params, + [&](auto toSkip) { skip(toSkip); }, + [&] { + physicalType value; + if constexpr (isFourByteIntegralType()) { + value = varint::readVarint32(&pos_); + } else { + static_assert(sizeof(T) == 8); + value = varint::readVarint64(&pos_); + } + return baseValue_ + value; + }); +} + +template +std::string_view VarintEncodingV2::encode( + EncodingSelection& selection, + std::span values, + Buffer& buffer, + const Encoding::Options& options) { + const bool useVarint = options.useVarintRowCount; + static_assert( + std::is_same_v< + typename std::make_unsigned::type, + physicalType>, + "Physical type must be unsigned."); + static_assert( + sizeof(T) == 4 || sizeof(T) == 8, + "Varint encoding require 4 or 8 bytes data types."); + const uint32_t valueCount = values.size(); + uint8_t index{0}; + const uint32_t dataSize = std::accumulate( + selection.statistics().bucketCounts().cbegin(), + selection.statistics().bucketCounts().cend(), + 0, + [&index](uint32_t sum, uint64_t bucketSize) { + return sum + (bucketSize * (++index)); + }); + const uint32_t encodingSize = dataSize + + Encoding::serializePrefixSize(valueCount, useVarint) + + VarintEncodingV2::kPrefixSize; + + char* reserved = buffer.reserve(encodingSize); + char* pos = reserved; + Encoding::serializePrefix( + EncodingType::VarintV2, + TypeTraits::dataType, + valueCount, + useVarint, + pos); + encoding::write(selection.statistics().min(), pos); + for (auto value : values) { + varint::writeVarint(value - selection.statistics().min(), &pos); + } + NIMBLE_DCHECK_EQ(pos - reserved, encodingSize, "Encoding size mismatch."); + return {reserved, encodingSize}; +} +} // namespace facebook::nimble diff --git a/dwio/nimble/tools/EncodingUtilities.cpp b/dwio/nimble/tools/EncodingUtilities.cpp index c82c4a91..dfcb7dff 100644 --- a/dwio/nimble/tools/EncodingUtilities.cpp +++ b/dwio/nimble/tools/EncodingUtilities.cpp @@ -29,7 +29,8 @@ void extractCompressionType( // Compression type is the byte right after the encoding header for both // encodings. case EncodingType::Trivial: - case EncodingType::FixedBitWidth: { + case EncodingType::FixedBitWidth: + case EncodingType::FixedBitWidthV2: { auto pos = stream.data() + kEncodingPrefixSize; properties.insert( {EncodingPropertyType::Compression, @@ -40,14 +41,17 @@ void extractCompressionType( break; } case EncodingType::RLE: + case EncodingType::RLEV2: case EncodingType::Dictionary: case EncodingType::Sentinel: case EncodingType::Nullable: case EncodingType::SparseBool: case EncodingType::Varint: + case EncodingType::VarintV2: case EncodingType::Delta: case EncodingType::Constant: case EncodingType::MainlyConstant: + case EncodingType::MainlyConstantV2: case EncodingType::Prefix: break; } @@ -99,7 +103,9 @@ void traverseEncodings( switch (encodingType) { case EncodingType::FixedBitWidth: + case EncodingType::FixedBitWidthV2: case EncodingType::Varint: + case EncodingType::VarintV2: case EncodingType::Constant: case EncodingType::Prefix: { // don't have any nested encoding @@ -124,7 +130,8 @@ void traverseEncodings( visitor); break; } - case EncodingType::MainlyConstant: { + case EncodingType::MainlyConstant: + case EncodingType::MainlyConstantV2: { const char* pos = stream.data() + kEncodingPrefixSize; const uint32_t isCommonBytes = encoding::readUint32(pos); traverseEncodings( @@ -149,7 +156,8 @@ void traverseEncodings( visitor); break; } - case EncodingType::RLE: { + case EncodingType::RLE: + case EncodingType::RLEV2: { const char* pos = stream.data() + kEncodingPrefixSize; const uint32_t runLengthBytes = encoding::readUint32(pos); traverseEncodings(