Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ velox_add_library(
DirectInputStream.h
ErrorTolerance.h
ExecutorBarrier.h
FileMetadata.h
FileSink.h
FilterNode.h
FlatMapHelper.h
Expand Down
32 changes: 32 additions & 0 deletions velox/dwio/common/FileMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <string>

namespace facebook::velox::dwio::common {

/// File format specific metadata returned when a writer is closed.
/// Caller of Writer::close() can do further processing such as aggregate
/// row group statistics to file level statistics based on the metadata.
class FileMetadata {
public:
virtual ~FileMetadata() = default;
};

} // namespace facebook::velox::dwio::common
4 changes: 2 additions & 2 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ bool SortingWriter::finish() {
return true;
}

void SortingWriter::close() {
std::unique_ptr<FileMetadata> SortingWriter::close() {
VELOX_CHECK(isFinishing());
setState(State::kClosed);
VELOX_CHECK_NULL(sortBuffer_);
outputWriter_->close();
return outputWriter_->close();
}

void SortingWriter::abort() {
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class SortingWriter : public Writer {
/// be flushed.
void flush() override;

void close() override;
/// Closes the writer. Returns file metadata, or null if no metadata is
/// available (e.g. for an empty file).
std::unique_ptr<FileMetadata> close() override;

void abort() override;

Expand Down
7 changes: 5 additions & 2 deletions velox/dwio/common/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>

#include "velox/common/base/Portability.h"
#include "velox/dwio/common/FileMetadata.h"
#include "velox/vector/BaseVector.h"

namespace facebook::velox::dwio::common {
Expand Down Expand Up @@ -69,10 +70,12 @@ class Writer {
/// NOTE: this must be called before close().
virtual bool finish() = 0;

/// Invokes closes the writer. Data can no longer be written.
/// Closes the writer. Data can no longer be written. Returns format-specific
/// file metadata collected during write operations. The returned pointer can
/// be null if no metadata is available, such as for an empty data file.
///
/// NOTE: this must be called after the last finish() which returns true.
virtual void close() = 0;
virtual std::unique_ptr<FileMetadata> close() = 0;

/// Aborts the writing by closing the writer and dropping everything.
/// Data can no longer be written.
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/common/tests/SortingWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class MockWriter : public Writer {

void flush() override {}

void close() override {
std::unique_ptr<FileMetadata> close() override {
setState(State::kClosed);
return nullptr;
}

void abort() override {
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/common/tests/WriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class MockWriter : public Writer {

void abort() override {}

void close() override {}
std::unique_ptr<FileMetadata> close() override {
return nullptr;
}
};

TEST(WriterTest, stateString) {
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,14 +799,15 @@ void Writer::flush() {
flushInternal(false);
}

void Writer::close() {
std::unique_ptr<dwio::common::FileMetadata> Writer::close() {
checkRunning();
auto exitGuard = folly::makeGuard([this]() {
flushPolicy_->onClose();
setState(State::kClosed);
});
flushInternal(true);
writerBase_->close();
return std::make_unique<DwrfFileMetadata>();
}

void Writer::abort() {
Expand Down
6 changes: 5 additions & 1 deletion velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <iterator>
#include <limits>

#include "velox/dwio/common/FileMetadata.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/dwrf/common/Encryption.h"
Expand All @@ -30,6 +31,9 @@

namespace facebook::velox::dwrf {

/// DWRF-specific file metadata wrapper. Currently a placeholder.
class DwrfFileMetadata : public dwio::common::FileMetadata {};

struct WriterOptions : public dwio::common::WriterOptions {
std::shared_ptr<const Config> config = std::make_shared<Config>();
/// Changes the interface to stream list and encoding iter.
Expand Down Expand Up @@ -86,7 +90,7 @@ class Writer : public dwio::common::Writer {
return true;
}

virtual void close() override;
virtual std::unique_ptr<dwio::common::FileMetadata> close() override;

virtual void abort() override;

Expand Down
66 changes: 35 additions & 31 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/dwio/parquet/writer/WriterConfig.h"
#include "velox/exec/Cursor.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand Down Expand Up @@ -235,15 +236,15 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) {
// there will be only one data page contains all data encoded with dictionary
const std::unordered_map<std::string, std::string> normalConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetEnableDictionary),
parquet::WriterConfig::kParquetSessionEnableDictionary),
"true"},
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDictionaryPageSizeLimit),
parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit),
"1B"},
};
const std::unordered_map<std::string, std::string> normalSessionProperties = {
{parquet::WriterOptions::kParquetEnableDictionary, "true"},
{parquet::WriterOptions::kParquetDictionaryPageSizeLimit, "1B"},
{parquet::WriterConfig::kParquetSessionEnableDictionary, "true"},
{parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit, "1B"},
};

// Here we are reading the second data page. If we don't set the dictionary
Expand All @@ -264,12 +265,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) {
const std::unordered_map<std::string, std::string>
incorrectEnableDictionaryConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetEnableDictionary),
parquet::WriterConfig::kParquetSessionEnableDictionary),
invalidEnableDictionaryValue},
};
const std::unordered_map<std::string, std::string>
incorrectEnableDictionarySessionProperties = {
{parquet::WriterOptions::kParquetEnableDictionary,
{parquet::WriterConfig::kParquetSessionEnableDictionary,
invalidEnableDictionaryValue},
};

Expand All @@ -289,12 +290,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) {
const std::unordered_map<std::string, std::string>
incorrectDictionaryPageSizeConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDictionaryPageSizeLimit),
parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit),
invalidDictionaryPageSizeValue},
};
const std::unordered_map<std::string, std::string>
incorrectDictionaryPageSizeSessionProperties = {
{parquet::WriterOptions::kParquetDictionaryPageSizeLimit,
{parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit,
invalidDictionaryPageSizeValue},
};

Expand Down Expand Up @@ -327,12 +328,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingOff) {
const std::unordered_map<std::string, std::string>
withoutPageSizeConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetEnableDictionary),
parquet::WriterConfig::kParquetSessionEnableDictionary),
"false"},
};
const std::unordered_map<std::string, std::string>
withoutPageSizeSessionProperties = {
{parquet::WriterOptions::kParquetEnableDictionary, "false"},
{parquet::WriterConfig::kParquetSessionEnableDictionary, "false"},
};

const auto withoutPageSizeHeader =
Expand All @@ -354,16 +355,16 @@ TEST_F(ParquetWriterTest, dictionaryEncodingOff) {
const std::unordered_map<std::string, std::string>
withPageSizeConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetEnableDictionary),
parquet::WriterConfig::kParquetSessionEnableDictionary),
"false"},
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDictionaryPageSizeLimit),
parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit),
"1B"},
};
const std::unordered_map<std::string, std::string>
withPageSizeSessionProperties = {
{parquet::WriterOptions::kParquetEnableDictionary, "false"},
{parquet::WriterOptions::kParquetDictionaryPageSizeLimit, "1B"},
{parquet::WriterConfig::kParquetSessionEnableDictionary, "false"},
{parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit, "1B"},
};

const auto withPageSizeHeader =
Expand Down Expand Up @@ -470,15 +471,15 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) {
// applied (default is 1024)
const std::unordered_map<std::string, std::string> normalConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetWritePageSize),
parquet::WriterConfig::kParquetSessionWritePageSize),
"2KB"},
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetWriteBatchSize),
parquet::WriterConfig::kParquetSessionWriteBatchSize),
"97"},
};
const std::unordered_map<std::string, std::string> normalSessionProperties = {
{parquet::WriterOptions::kParquetWritePageSize, "2KB"},
{parquet::WriterOptions::kParquetWriteBatchSize, "97"},
{parquet::WriterConfig::kParquetSessionWritePageSize, "2KB"},
{parquet::WriterConfig::kParquetSessionWriteBatchSize, "97"},
};
const auto normalHeader = testPageSizeAndBatchSizeToGetPageHeader(
normalConfigFromFile, normalSessionProperties);
Expand All @@ -498,12 +499,12 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) {
const std::unordered_map<std::string, std::string>
incorrectPageSizeConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetWritePageSize),
parquet::WriterConfig::kParquetSessionWritePageSize),
invalidPageSizeAndBatchSizeValue},
};
const std::unordered_map<std::string, std::string>
incorrectPageSizeSessionPropertiesFromFile = {
{parquet::WriterOptions::kParquetWritePageSize,
{parquet::WriterConfig::kParquetSessionWritePageSize,
invalidPageSizeAndBatchSizeValue},
};

Expand All @@ -520,12 +521,12 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) {
const std::unordered_map<std::string, std::string>
incorrectBatchSizeConfigFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetWriteBatchSize),
parquet::WriterConfig::kParquetSessionWriteBatchSize),
invalidPageSizeAndBatchSizeValue},
};
const std::unordered_map<std::string, std::string>
incorrectBatchSizeSessionPropertiesFromFile = {
{parquet::WriterOptions::kParquetWriteBatchSize,
{parquet::WriterConfig::kParquetSessionWriteBatchSize,
invalidPageSizeAndBatchSizeValue},
};

Expand Down Expand Up @@ -561,7 +562,7 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) {
// Simulate setting DataPage version to V2 via Hive config from file.
std::unordered_map<std::string, std::string> configFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDataPageVersion),
parquet::WriterConfig::kParquetSessionDataPageVersion),
"V2"}};

ASSERT_EQ(
Expand All @@ -571,7 +572,7 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) {
// Simulate setting DataPage version to V1 via Hive config from file.
configFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDataPageVersion),
parquet::WriterConfig::kParquetSessionDataPageVersion),
"V1"}};

ASSERT_EQ(
Expand All @@ -580,14 +581,15 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) {

// Simulate setting DataPage version to V2 via connector session property.
std::unordered_map<std::string, std::string> sessionProperties = {
{parquet::WriterOptions::kParquetDataPageVersion, "V2"}};
{parquet::WriterConfig::kParquetSessionDataPageVersion, "V2"}};

ASSERT_EQ(
testDataPageVersion({}, sessionProperties),
thrift::PageType::type::DATA_PAGE_V2);

// Simulate setting DataPage version to V1 via connector session property.
sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V1"}};
sessionProperties = {
{parquet::WriterConfig::kParquetSessionDataPageVersion, "V1"}};

ASSERT_EQ(
testDataPageVersion({}, sessionProperties),
Expand All @@ -596,10 +598,11 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) {
// Simulate setting DataPage version to V1 via connector session property,
// and to V2 via Hive config from file. Session property should take
// precedence.
sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V1"}};
sessionProperties = {
{parquet::WriterConfig::kParquetSessionDataPageVersion, "V1"}};
configFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDataPageVersion),
parquet::WriterConfig::kParquetSessionDataPageVersion),
"V2"}};

ASSERT_EQ(
Expand All @@ -609,10 +612,11 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) {
// Simulate setting DataPage version to V2 via connector session property,
// and to V1 via Hive config from file. Session property should take
// precedence.
sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V2"}};
sessionProperties = {
{parquet::WriterConfig::kParquetSessionDataPageVersion, "V2"}};
configFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetDataPageVersion),
parquet::WriterConfig::kParquetSessionDataPageVersion),
"V1"}};

ASSERT_EQ(
Expand Down Expand Up @@ -673,7 +677,7 @@ TEST_F(ParquetWriterTest, parquetWriteWithArrowMemoryPool) {
TEST_F(ParquetWriterTest, updateWriterOptionsFromHiveConfig) {
std::unordered_map<std::string, std::string> configFromFile = {
{config::ConfigBase::toConfigKey(
parquet::WriterOptions::kParquetWriteTimestampUnit),
parquet::WriterConfig::kParquetSessionWriteTimestampUnit),
"3"}};
const config::ConfigBase connectorConfig(std::move(configFromFile));
const config::ConfigBase connectorSessionProperties({});
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ velox_link_libraries(
fmt::fmt
)

velox_add_library(velox_writer_config INTERFACE)
velox_add_library(velox_writer_config INTERFACE HEADERS WriterConfig.h)
Loading
Loading