Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 3 additions & 186 deletions dwio/nimble/encodings/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,189 +15,6 @@
*/
#pragma once

#include "dwio/nimble/common/EncodingPrimitives.h"
#include "dwio/nimble/common/Types.h"
#include "dwio/nimble/common/Vector.h"
#include "dwio/nimble/encodings/EncodingSelection.h"
#include "folly/io/IOBuf.h"

namespace facebook::nimble {

struct CompressionResult {
CompressionType compressionType;
std::optional<Vector<char>> buffer;
};

struct ICompressor {
virtual CompressionResult compress(
velox::memory::MemoryPool& memoryPool,
std::string_view data,
DataType dataType,
int bitWidth,
const CompressionPolicy& compressionPolicy) = 0;

virtual Vector<char> uncompress(
velox::memory::MemoryPool& memoryPool,
CompressionType compressionType,
DataType dataType,
std::string_view data) = 0;

virtual std::optional<size_t> uncompressedSize(
std::string_view data) const = 0;

virtual CompressionType compressionType() = 0;

virtual ~ICompressor() = default;
};

class Compression {
public:
static CompressionResult compress(
velox::memory::MemoryPool& memoryPool,
std::string_view data,
DataType dataType,
int bitWidth,
const CompressionPolicy& compressionPolicy);

static Vector<char> uncompress(
velox::memory::MemoryPool& memoryPool,
CompressionType compressionType,
DataType dataType,
std::string_view data);

static std::optional<size_t> uncompressedSize(
CompressionType compressionType,
std::string_view data);

static void registerCompressor(std::unique_ptr<ICompressor>&& compressor);
};

// Encodings using compression repeat the same pattern, which involves trying to
// compress the data, and throwing it away if compression policy decides to
// throw it away. This class tries to extract this common logic into one place.
// Note: There are actually two sub-patterns, therefore, there are two CTors in
// this class. More on this below.
template <typename T>
class CompressionEncoder {
public:
// This CTor handles the sub-pattern where the source data is already encoded
// correctly, so no extra encoding is needed.
CompressionEncoder(
velox::memory::MemoryPool& memoryPool,
const CompressionPolicy& compressionPolicy,
DataType dataType,
std::string_view uncompressedBuffer,
int bitWidth = 0)
: dataSize_{uncompressedBuffer.size()},
compressionType_{CompressionType::Uncompressed} {
if (dataSize_ == 0 ||
dataSize_ < compressionPolicy.compression().minCompressionSize ||
compressionPolicy.compression().compressionType ==
CompressionType::Uncompressed) {
// No compression, just use the original buffer.
data_ = uncompressedBuffer;
return;
}

auto compressionResult = Compression::compress(
memoryPool, uncompressedBuffer, dataType, bitWidth, compressionPolicy);

if (compressionResult.compressionType == CompressionType::Uncompressed) {
// Compression declined. Use the original buffer.
data_ = uncompressedBuffer;
return;
}

// Compression accepted. Use the compressed buffer.
compressed_ = std::move(compressionResult.buffer);
data_ = {compressed_->data(), compressed_->size()};
dataSize_ = compressed_->size();
compressionType_ = compressionResult.compressionType;
}

// This CTor handles the sub-pattern where the source data requires special
// encoding before it is compressed/written.
// Note that in this case, the target buffer for the newly encoded data is
// different if compression is applied (it is written to a temp buffer), or if
// compression is skipped (written directly to the stream buffer).
CompressionEncoder(
velox::memory::MemoryPool& memoryPool,
const CompressionPolicy& compressionPolicy,
DataType dataType,
int bitWidth,
size_t uncompressedSize,
std::function<std::span<char>()> allocateUncompressedBuffer,
std::function<void(char*&)> encoder)
: encoder_{std::move(encoder)},
dataSize_{uncompressedSize},
compressionType_{CompressionType::Uncompressed} {
if (uncompressedSize == 0 ||
uncompressedSize < compressionPolicy.compression().minCompressionSize ||
compressionPolicy.compression().compressionType ==
CompressionType::Uncompressed) {
// No compression. Do not encode the data yet. It will be encoded later
// (in write()) directly into the output buffer.
return;
}

// Compression is attempted. Encode the data before compressing it, into a
// temp buffer.
auto uncompressed = allocateUncompressedBuffer();
char* pos = uncompressed.data();
encoder_(pos);
auto compressionResult = Compression::compress(
memoryPool,
{uncompressed.data(), uncompressed.size()},
dataType,
bitWidth,
compressionPolicy);

if (compressionResult.compressionType == CompressionType::Uncompressed) {
// Compression declined. Since we already encoded the data, remember the
// temp buffer for later.
// Note: data size is still the same uncompressed size.
data_ = uncompressed;
return;
}

// Compression accepted. Use the compressed buffer.
compressed_ = std::move(compressionResult.buffer);
data_ = {compressed_->data(), compressed_->size()};
dataSize_ = compressed_->size();
compressionType_ = compressionResult.compressionType;
}

size_t getSize() {
return dataSize_;
}

void write(char*& pos) {
if (!data_.has_value()) {
// If we are here, it means we handle uncompressed data that needs to be
// encoded directly into the target buffer.
encoder_(pos);
return;
}

if (data_->data() == nullptr) {
return;
}

std::copy(data_->begin(), data_->end(), pos);
pos += data_->size();
}

CompressionType compressionType() {
return compressionType_;
}

private:
const std::function<void(char*&)> encoder_;

size_t dataSize_;
std::optional<std::span<const char>> data_;
std::optional<Vector<char>> compressed_;
CompressionType compressionType_;
};

} // namespace facebook::nimble
// This header has moved to dwio/nimble/encodings/compression/Compression.h.
// This forwarding header is kept for backward compatibility.
#include "dwio/nimble/encodings/compression/Compression.h" // @manual
103 changes: 1 addition & 102 deletions dwio/nimble/encodings/EncodingSelection.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "dwio/nimble/encodings/EncodingIdentifier.h"
#include "dwio/nimble/encodings/EncodingLayout.h"
#include "dwio/nimble/encodings/Statistics.h"
#include "dwio/nimble/encodings/compression/CompressionPolicy.h"
#include "folly/json/json.h"

namespace facebook::nimble {
Expand Down Expand Up @@ -65,108 +66,6 @@ namespace facebook::nimble {

class EncodingSelectionPolicyBase;

///
/// Compression policy type definitions:
/// A compression policy defines which compression algorithm to apply on the
/// data (if any) and what parameters to use for this compression algorithm. In
/// addition, once compression is applied to the data, the compression policy
/// can decide if the compressed result is statisfactory or if it should be
/// discarded.
struct ZstdCompressionParameters {
int16_t compressionLevel = 3;
};

/// An identifier for the meta internal compression policy.
class MetaInternalCompressionKey {
public:
MetaInternalCompressionKey() = default;

MetaInternalCompressionKey(
std::string ns,
std::string tableName,
std::string columnName)
: ns_{std::move(ns)},
tableName_{std::move(tableName)},
columnName_{std::move(columnName)} {}

const std::string& ns() const {
return ns_;
}

const std::string& tableName() const {
return tableName_;
}

const std::string& columnName() const {
return columnName_;
}

std::string toString() const {
folly::dynamic json = folly::dynamic::object("ns", ns_)(
"tableName", tableName_)("columnName", columnName_);
return folly::toJson(json);
}

static MetaInternalCompressionKey fromString(const std::string& str) {
// will throw upon failure to parse or missing fields
auto json = folly::parseJson(str);
return MetaInternalCompressionKey{
json["ns"].asString(),
json["tableName"].asString(),
json["columnName"].asString()};
}

private:
std::string ns_;
std::string tableName_;
std::string columnName_;
};

struct MetaInternalCompressionParameters {
int16_t compressionLevel = 0;
int16_t decompressionLevel = 0;
bool useVariableBitWidthCompressor = true;
MetaInternalCompressionKey compressionKey;
};

struct CompressionParameters {
ZstdCompressionParameters zstd{};
MetaInternalCompressionParameters metaInternal{};
};

struct CompressionInformation {
CompressionType compressionType{};
CompressionParameters parameters{};
uint64_t minCompressionSize = 0;
};

class CompressionPolicy {
public:
virtual CompressionInformation compression() const = 0;
virtual bool shouldAccept(
CompressionType /* compressionType */,
uint64_t /* uncompressedSize */,
uint64_t /* compressedSize */) const = 0;

virtual ~CompressionPolicy() = default;
};

/// Default compression policy. Default behavior (if not compression policy is
/// provided) is to not compress.
class NoCompressionPolicy : public CompressionPolicy {
public:
CompressionInformation compression() const override {
return {.compressionType = CompressionType::Uncompressed};
}

virtual bool shouldAccept(
CompressionType /* compressionType */,
uint64_t /* uncompressedSize */,
uint64_t /* compressedSize */) const override {
return false;
}
};

/// Type representing a selected encoding.
/// This is the result type returned from the select() method of an encoding
/// selection policy. Also provides access to the compression policies for
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/encodings/ZstdCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <zstd_errors.h>
#include <optional>

#include "dwio/nimble/common/EncodingPrimitives.h"

namespace facebook::nimble {

CompressionResult ZstdCompressor::compress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dwio/nimble/encodings/Compression.h"
#include "dwio/nimble/encodings/compression/Compression.h"
#include "dwio/nimble/common/EncodingPrimitives.h"
#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/encodings/ZstdCompressor.h"
Expand Down
Loading
Loading