diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 488ce41a761..532f2d3bcf6 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -88,6 +88,7 @@ velox_add_library( DirectInputStream.h ErrorTolerance.h ExecutorBarrier.h + FileMetadata.h FileSink.h FilterNode.h FlatMapHelper.h diff --git a/velox/dwio/common/FileMetadata.h b/velox/dwio/common/FileMetadata.h new file mode 100644 index 00000000000..f66cd56e46a --- /dev/null +++ b/velox/dwio/common/FileMetadata.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace facebook::velox::dwio::common { + +/// File format specific metadata returned when a writer is closed. +/// Caller of Writer::close() can do further processing such as aggregate +/// row group statistics to file level statistics based on the metadata. +class FileMetadata { + public: + virtual ~FileMetadata() = default; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index ec27d1a7bbb..9e973106739 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -86,11 +86,11 @@ bool SortingWriter::finish() { return true; } -void SortingWriter::close() { +std::unique_ptr SortingWriter::close() { VELOX_CHECK(isFinishing()); setState(State::kClosed); VELOX_CHECK_NULL(sortBuffer_); - outputWriter_->close(); + return outputWriter_->close(); } void SortingWriter::abort() { diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a136cff1238..c7a191a30a5 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -42,7 +42,9 @@ class SortingWriter : public Writer { /// be flushed. void flush() override; - void close() override; + /// Closes the writer. Returns file metadata, or null if no metadata is + /// available (e.g. for an empty file). + std::unique_ptr close() override; void abort() override; diff --git a/velox/dwio/common/Writer.h b/velox/dwio/common/Writer.h index 2c5c5ab7868..1950677a0bf 100644 --- a/velox/dwio/common/Writer.h +++ b/velox/dwio/common/Writer.h @@ -20,6 +20,7 @@ #include #include "velox/common/base/Portability.h" +#include "velox/dwio/common/FileMetadata.h" #include "velox/vector/BaseVector.h" namespace facebook::velox::dwio::common { @@ -69,10 +70,12 @@ class Writer { /// NOTE: this must be called before close(). virtual bool finish() = 0; - /// Invokes closes the writer. Data can no longer be written. + /// Closes the writer. Data can no longer be written. Returns format-specific + /// file metadata collected during write operations. The returned pointer can + /// be null if no metadata is available, such as for an empty data file. /// /// NOTE: this must be called after the last finish() which returns true. - virtual void close() = 0; + virtual std::unique_ptr close() = 0; /// Aborts the writing by closing the writer and dropping everything. /// Data can no longer be written. diff --git a/velox/dwio/common/tests/SortingWriterTest.cpp b/velox/dwio/common/tests/SortingWriterTest.cpp index 4beadc61e1a..d13837641c8 100644 --- a/velox/dwio/common/tests/SortingWriterTest.cpp +++ b/velox/dwio/common/tests/SortingWriterTest.cpp @@ -43,8 +43,9 @@ class MockWriter : public Writer { void flush() override {} - void close() override { + std::unique_ptr close() override { setState(State::kClosed); + return nullptr; } void abort() override { diff --git a/velox/dwio/common/tests/WriterTest.cpp b/velox/dwio/common/tests/WriterTest.cpp index 2aa5d992535..f1118927e30 100644 --- a/velox/dwio/common/tests/WriterTest.cpp +++ b/velox/dwio/common/tests/WriterTest.cpp @@ -51,7 +51,9 @@ class MockWriter : public Writer { void abort() override {} - void close() override {} + std::unique_ptr close() override { + return nullptr; + } }; TEST(WriterTest, stateString) { diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 32567d86e16..0133522c7ac 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -799,7 +799,7 @@ void Writer::flush() { flushInternal(false); } -void Writer::close() { +std::unique_ptr Writer::close() { checkRunning(); auto exitGuard = folly::makeGuard([this]() { flushPolicy_->onClose(); @@ -807,6 +807,7 @@ void Writer::close() { }); flushInternal(true); writerBase_->close(); + return std::make_unique(); } void Writer::abort() { diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 4a7d1ab540b..847c0ac3045 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -19,6 +19,7 @@ #include #include +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/dwrf/common/Encryption.h" @@ -30,6 +31,9 @@ namespace facebook::velox::dwrf { +/// DWRF-specific file metadata wrapper. Currently a placeholder. +class DwrfFileMetadata : public dwio::common::FileMetadata {}; + struct WriterOptions : public dwio::common::WriterOptions { std::shared_ptr config = std::make_shared(); /// Changes the interface to stream list and encoding iter. @@ -86,7 +90,7 @@ class Writer : public dwio::common::Writer { return true; } - virtual void close() override; + virtual std::unique_ptr close() override; virtual void abort() override; diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index 225f6d9816e..e3633224dc9 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -28,6 +28,7 @@ #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual #include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/dwio/parquet/writer/WriterConfig.h" #include "velox/exec/Cursor.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -235,15 +236,15 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) { // there will be only one data page contains all data encoded with dictionary const std::unordered_map normalConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetEnableDictionary), + parquet::WriterConfig::kParquetSessionEnableDictionary), "true"}, {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDictionaryPageSizeLimit), + parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit), "1B"}, }; const std::unordered_map normalSessionProperties = { - {parquet::WriterOptions::kParquetEnableDictionary, "true"}, - {parquet::WriterOptions::kParquetDictionaryPageSizeLimit, "1B"}, + {parquet::WriterConfig::kParquetSessionEnableDictionary, "true"}, + {parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit, "1B"}, }; // Here we are reading the second data page. If we don't set the dictionary @@ -264,12 +265,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) { const std::unordered_map incorrectEnableDictionaryConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetEnableDictionary), + parquet::WriterConfig::kParquetSessionEnableDictionary), invalidEnableDictionaryValue}, }; const std::unordered_map incorrectEnableDictionarySessionProperties = { - {parquet::WriterOptions::kParquetEnableDictionary, + {parquet::WriterConfig::kParquetSessionEnableDictionary, invalidEnableDictionaryValue}, }; @@ -289,12 +290,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingWithDictionaryPageSize) { const std::unordered_map incorrectDictionaryPageSizeConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDictionaryPageSizeLimit), + parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit), invalidDictionaryPageSizeValue}, }; const std::unordered_map incorrectDictionaryPageSizeSessionProperties = { - {parquet::WriterOptions::kParquetDictionaryPageSizeLimit, + {parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit, invalidDictionaryPageSizeValue}, }; @@ -327,12 +328,12 @@ TEST_F(ParquetWriterTest, dictionaryEncodingOff) { const std::unordered_map withoutPageSizeConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetEnableDictionary), + parquet::WriterConfig::kParquetSessionEnableDictionary), "false"}, }; const std::unordered_map withoutPageSizeSessionProperties = { - {parquet::WriterOptions::kParquetEnableDictionary, "false"}, + {parquet::WriterConfig::kParquetSessionEnableDictionary, "false"}, }; const auto withoutPageSizeHeader = @@ -354,16 +355,16 @@ TEST_F(ParquetWriterTest, dictionaryEncodingOff) { const std::unordered_map withPageSizeConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetEnableDictionary), + parquet::WriterConfig::kParquetSessionEnableDictionary), "false"}, {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDictionaryPageSizeLimit), + parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit), "1B"}, }; const std::unordered_map withPageSizeSessionProperties = { - {parquet::WriterOptions::kParquetEnableDictionary, "false"}, - {parquet::WriterOptions::kParquetDictionaryPageSizeLimit, "1B"}, + {parquet::WriterConfig::kParquetSessionEnableDictionary, "false"}, + {parquet::WriterConfig::kParquetSessionDictionaryPageSizeLimit, "1B"}, }; const auto withPageSizeHeader = @@ -470,15 +471,15 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) { // applied (default is 1024) const std::unordered_map normalConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetWritePageSize), + parquet::WriterConfig::kParquetSessionWritePageSize), "2KB"}, {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetWriteBatchSize), + parquet::WriterConfig::kParquetSessionWriteBatchSize), "97"}, }; const std::unordered_map normalSessionProperties = { - {parquet::WriterOptions::kParquetWritePageSize, "2KB"}, - {parquet::WriterOptions::kParquetWriteBatchSize, "97"}, + {parquet::WriterConfig::kParquetSessionWritePageSize, "2KB"}, + {parquet::WriterConfig::kParquetSessionWriteBatchSize, "97"}, }; const auto normalHeader = testPageSizeAndBatchSizeToGetPageHeader( normalConfigFromFile, normalSessionProperties); @@ -498,12 +499,12 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) { const std::unordered_map incorrectPageSizeConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetWritePageSize), + parquet::WriterConfig::kParquetSessionWritePageSize), invalidPageSizeAndBatchSizeValue}, }; const std::unordered_map incorrectPageSizeSessionPropertiesFromFile = { - {parquet::WriterOptions::kParquetWritePageSize, + {parquet::WriterConfig::kParquetSessionWritePageSize, invalidPageSizeAndBatchSizeValue}, }; @@ -520,12 +521,12 @@ TEST_F(ParquetWriterTest, testPageSizeAndBatchSizeConfiguration) { const std::unordered_map incorrectBatchSizeConfigFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetWriteBatchSize), + parquet::WriterConfig::kParquetSessionWriteBatchSize), invalidPageSizeAndBatchSizeValue}, }; const std::unordered_map incorrectBatchSizeSessionPropertiesFromFile = { - {parquet::WriterOptions::kParquetWriteBatchSize, + {parquet::WriterConfig::kParquetSessionWriteBatchSize, invalidPageSizeAndBatchSizeValue}, }; @@ -561,7 +562,7 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) { // Simulate setting DataPage version to V2 via Hive config from file. std::unordered_map configFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDataPageVersion), + parquet::WriterConfig::kParquetSessionDataPageVersion), "V2"}}; ASSERT_EQ( @@ -571,7 +572,7 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) { // Simulate setting DataPage version to V1 via Hive config from file. configFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDataPageVersion), + parquet::WriterConfig::kParquetSessionDataPageVersion), "V1"}}; ASSERT_EQ( @@ -580,14 +581,15 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) { // Simulate setting DataPage version to V2 via connector session property. std::unordered_map sessionProperties = { - {parquet::WriterOptions::kParquetDataPageVersion, "V2"}}; + {parquet::WriterConfig::kParquetSessionDataPageVersion, "V2"}}; ASSERT_EQ( testDataPageVersion({}, sessionProperties), thrift::PageType::type::DATA_PAGE_V2); // Simulate setting DataPage version to V1 via connector session property. - sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V1"}}; + sessionProperties = { + {parquet::WriterConfig::kParquetSessionDataPageVersion, "V1"}}; ASSERT_EQ( testDataPageVersion({}, sessionProperties), @@ -596,10 +598,11 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) { // Simulate setting DataPage version to V1 via connector session property, // and to V2 via Hive config from file. Session property should take // precedence. - sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V1"}}; + sessionProperties = { + {parquet::WriterConfig::kParquetSessionDataPageVersion, "V1"}}; configFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDataPageVersion), + parquet::WriterConfig::kParquetSessionDataPageVersion), "V2"}}; ASSERT_EQ( @@ -609,10 +612,11 @@ TEST_F(ParquetWriterTest, toggleDataPageVersion) { // Simulate setting DataPage version to V2 via connector session property, // and to V1 via Hive config from file. Session property should take // precedence. - sessionProperties = {{parquet::WriterOptions::kParquetDataPageVersion, "V2"}}; + sessionProperties = { + {parquet::WriterConfig::kParquetSessionDataPageVersion, "V2"}}; configFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetDataPageVersion), + parquet::WriterConfig::kParquetSessionDataPageVersion), "V1"}}; ASSERT_EQ( @@ -673,7 +677,7 @@ TEST_F(ParquetWriterTest, parquetWriteWithArrowMemoryPool) { TEST_F(ParquetWriterTest, updateWriterOptionsFromHiveConfig) { std::unordered_map configFromFile = { {config::ConfigBase::toConfigKey( - parquet::WriterOptions::kParquetWriteTimestampUnit), + parquet::WriterConfig::kParquetSessionWriteTimestampUnit), "3"}}; const config::ConfigBase connectorConfig(std::move(configFromFile)); const config::ConfigBase connectorSessionProperties({}); diff --git a/velox/dwio/parquet/writer/CMakeLists.txt b/velox/dwio/parquet/writer/CMakeLists.txt index aed801710a1..c131f59ea36 100644 --- a/velox/dwio/parquet/writer/CMakeLists.txt +++ b/velox/dwio/parquet/writer/CMakeLists.txt @@ -27,4 +27,4 @@ velox_link_libraries( fmt::fmt ) -velox_add_library(velox_writer_config INTERFACE) +velox_add_library(velox_writer_config INTERFACE HEADERS WriterConfig.h) diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index c70958d1566..8cf29529238 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -526,16 +526,22 @@ void Writer::newRowGroup(int32_t numRows) { PARQUET_THROW_NOT_OK(arrowContext_->writer->newRowGroup(numRows)); } -void Writer::close() { +std::unique_ptr Writer::close() { flush(); + std::unique_ptr parquetFileMetadata; if (arrowContext_->writer) { PARQUET_THROW_NOT_OK(arrowContext_->writer->close()); + parquetFileMetadata = std::make_unique( + arrowContext_->writer->metadata()); arrowContext_->writer.reset(); } + PARQUET_THROW_NOT_OK(stream_->Close()); arrowContext_->stagingChunks.clear(); + + return parquetFileMetadata; } void Writer::abort() { @@ -601,14 +607,15 @@ void WriterOptions::processConfigs( parquetWriterOptions, "Expected a Parquet WriterOptions object."); // Check serdeParameters for timestamp settings first (highest priority). - auto serdeTimestampUnitIt = serdeParameters.find(kParquetSerdeTimestampUnit); + auto serdeTimestampUnitIt = + serdeParameters.find(WriterConfig::kParquetSerdeTimestampUnit); if (serdeTimestampUnitIt != serdeParameters.end()) { parquetWriteTimestampUnit = stringToTimestampPrecision(serdeTimestampUnitIt->second); } auto serdeTimestampTimezoneIt = - serdeParameters.find(kParquetSerdeTimestampTimezone); + serdeParameters.find(WriterConfig::kParquetSerdeTimestampTimezone); if (serdeTimestampTimezoneIt != serdeParameters.end()) { // Empty string means no timezone conversion (nullopt). if (serdeTimestampTimezoneIt->second.empty()) { @@ -621,7 +628,7 @@ void WriterOptions::processConfigs( if (!parquetWriteTimestampUnit) { parquetWriteTimestampUnit = toTimestampPrecision(session.getWithFallback( - kParquetWriteTimestampUnit, connectorConfig)); + WriterConfig::kParquetSessionWriteTimestampUnit, connectorConfig)); } if (!parquetWriteTimestampTimeZone) { parquetWriteTimestampTimeZone = parquetWriterOptions->sessionTimezoneName; @@ -630,33 +637,34 @@ void WriterOptions::processConfigs( if (!enableDictionary) { enableDictionary = toParquetEnableDictionary(session.getWithFallback( - kParquetEnableDictionary, connectorConfig)); + WriterConfig::kParquetSessionEnableDictionary, connectorConfig)); } if (!dictionaryPageSizeLimit) { dictionaryPageSizeLimit = toParquetPageSize(session.getWithFallback( - kParquetDictionaryPageSizeLimit, connectorConfig)); + WriterConfig::kParquetSessionDictionaryPageSizeLimit, + connectorConfig)); } if (!useParquetDataPageV2) { useParquetDataPageV2 = isParquetV2(session.getWithFallback( - kParquetDataPageVersion, connectorConfig)); + WriterConfig::kParquetSessionDataPageVersion, connectorConfig)); } if (!dataPageSize) { dataPageSize = toParquetPageSize(session.getWithFallback( - kParquetWritePageSize, connectorConfig)); + WriterConfig::kParquetSessionWritePageSize, connectorConfig)); } if (!batchSize) { batchSize = toParquetBatchSize(session.getWithFallback( - kParquetWriteBatchSize, connectorConfig)); + WriterConfig::kParquetSessionWriteBatchSize, connectorConfig)); } if (!createdBy) { createdBy = session.getWithFallback( - kParquetCreatedBy, connectorConfig); + WriterConfig::kParquetHiveConnectorCreatedBy, connectorConfig); } // Parquet only updates ioStats_->rawBytesWritten() when a row group is @@ -667,7 +675,7 @@ void WriterOptions::processConfigs( // during writes. auto maxTargetFileSize = toParquetPageSize(session.getWithFallback( - kParquetMaxTargetFileSize, connectorConfig)); + WriterConfig::kParquetSessionMaxTargetFileSize, connectorConfig)); if (maxTargetFileSize.has_value()) { if (!flushPolicyFactory) { auto bytesInRowGroup = std::min( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 0238cdfb8c0..3046424a6f9 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -20,12 +20,15 @@ #include "velox/common/compression/Compression.h" #include "velox/common/config/Config.h" #include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/FlushPolicy.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/parquet/ParquetFieldId.h" +#include "velox/dwio/parquet/writer/WriterConfig.h" +#include "velox/dwio/parquet/writer/arrow/Metadata.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -39,6 +42,21 @@ class ArrowDataBufferSink; struct ArrowContext; +/// Parquet-specific file metadata wrapper. Provides access to the underlying +/// arrow::FileMetaData. +class ParquetFileMetadata : public dwio::common::FileMetadata { + public: + explicit ParquetFileMetadata(std::shared_ptr metadata) + : metadata_(std::move(metadata)) {} + + std::shared_ptr arrowMetadata() const { + return metadata_; + } + + private: + std::shared_ptr metadata_; +}; + class DefaultFlushPolicy : public dwio::common::FlushPolicy { public: DefaultFlushPolicy() @@ -127,35 +145,6 @@ struct WriterOptions : public dwio::common::WriterOptions { /// The structure should match the schema hierarchy with nested children. std::vector parquetFieldIds; - // Parsing session and hive configs. - - // Session and connector config names differ by '_' vs '-' separators. - // Connector keys are inferred from session keys by replacing '_' with '-'. - static constexpr const char* kParquetWriteTimestampUnit = - "hive.parquet.writer.timestamp_unit"; - static constexpr const char* kParquetEnableDictionary = - "hive.parquet.writer.enable_dictionary"; - static constexpr const char* kParquetDictionaryPageSizeLimit = - "hive.parquet.writer.dictionary_page_size_limit"; - static constexpr const char* kParquetDataPageVersion = - "hive.parquet.writer.datapage_version"; - static constexpr const char* kParquetWritePageSize = - "hive.parquet.writer.page_size"; - static constexpr const char* kParquetWriteBatchSize = - "hive.parquet.writer.batch_size"; - static constexpr const char* kParquetCreatedBy = - "hive.parquet.writer.created_by"; - static constexpr const char* kParquetMaxTargetFileSize = - "max_target_file_size"; - // Serde parameter keys for timestamp settings. These can be set via - // serdeParameters map to override the default timestamp behavior. - // The timezone key accepts a timezone string or empty string to disable - // timezone conversion. - static constexpr const char* kParquetSerdeTimestampUnit = - "parquet.writer.timestamp.unit"; - static constexpr const char* kParquetSerdeTimestampTimezone = - "parquet.writer.timestamp.timezone"; - // Process hive connector and session configs. void processConfigs( const config::ConfigBase& connectorConfig, @@ -197,10 +186,11 @@ class Writer : public dwio::common::Writer { return true; } - // Closes 'this', After close, data can no longer be added and the completed + // Closes 'this'. After close, data can no longer be added and the completed // Parquet file is flushed into 'sink' provided at construction. 'sink' stays - // live until destruction of 'this' - void close() override; + // live until destruction of 'this'. Returns file metadata, or null if no + // metadata is available (e.g. for an empty file). + std::unique_ptr close() override; void abort() override; diff --git a/velox/dwio/parquet/writer/WriterConfig.h b/velox/dwio/parquet/writer/WriterConfig.h new file mode 100644 index 00000000000..ee8788f74b9 --- /dev/null +++ b/velox/dwio/parquet/writer/WriterConfig.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::parquet { + +/// Config constants for the Parquet writer. +/// +/// IMPORTANT: These constants are kept in a separate header rather than in +/// Writer.h because Gluten's WholeStageResultIterator.cc needs access to these +/// configuration constants but cannot include Writer.h due to Arrow header +/// conflicts. This separation allows external code to reference these constants +/// without pulling in Arrow dependencies. +struct WriterConfig { + // Parsing session and hive configs. + + // This isn't a typo; session and hive connector config names are different + // ('_' vs '-'). + static constexpr const char* kParquetSessionWriteTimestampUnit = + "hive.parquet.writer.timestamp_unit"; + static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = + "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetSessionEnableDictionary = + "hive.parquet.writer.enable_dictionary"; + static constexpr const char* kParquetHiveConnectorEnableDictionary = + "hive.parquet.writer.enable-dictionary"; + static constexpr const char* kParquetSessionDictionaryPageSizeLimit = + "hive.parquet.writer.dictionary_page_size_limit"; + static constexpr const char* kParquetHiveConnectorDictionaryPageSizeLimit = + "hive.parquet.writer.dictionary-page-size-limit"; + static constexpr const char* kParquetSessionDataPageVersion = + "hive.parquet.writer.datapage_version"; + static constexpr const char* kParquetHiveConnectorDataPageVersion = + "hive.parquet.writer.datapage-version"; + static constexpr const char* kParquetSessionWritePageSize = + "hive.parquet.writer.page_size"; + static constexpr const char* kParquetHiveConnectorWritePageSize = + "hive.parquet.writer.page-size"; + static constexpr const char* kParquetSessionWriteBatchSize = + "hive.parquet.writer.batch_size"; + static constexpr const char* kParquetHiveConnectorWriteBatchSize = + "hive.parquet.writer.batch-size"; + static constexpr const char* kParquetHiveConnectorCreatedBy = + "hive.parquet.writer.created-by"; + + // Use the same property name from HiveConfig::kMaxTargetFileSize. + static constexpr const char* kParquetConnectorMaxTargetFileSize = + "max-target-file-size"; + static constexpr const char* kParquetSessionMaxTargetFileSize = + "max_target_file_size"; + // Serde parameter keys for timestamp settings. These can be set via + // serdeParameters map to override the default timestamp behavior. + // The timezone key accepts a timezone string or empty string to disable + // timezone conversion. + static constexpr const char* kParquetSerdeTimestampUnit = + "parquet.writer.timestamp.unit"; + static constexpr const char* kParquetSerdeTimestampTimezone = + "parquet.writer.timestamp.timezone"; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/text/writer/TextWriter.cpp b/velox/dwio/text/writer/TextWriter.cpp index 0293fa4ee83..be837b12522 100644 --- a/velox/dwio/text/writer/TextWriter.cpp +++ b/velox/dwio/text/writer/TextWriter.cpp @@ -175,8 +175,9 @@ void TextWriter::flush() { bufferedWriterSink_->flush(); } -void TextWriter::close() { +std::unique_ptr TextWriter::close() { bufferedWriterSink_->close(); + return std::make_unique(); } void TextWriter::abort() { diff --git a/velox/dwio/text/writer/TextWriter.h b/velox/dwio/text/writer/TextWriter.h index 2ec11b82685..045f7b7f5cb 100644 --- a/velox/dwio/text/writer/TextWriter.h +++ b/velox/dwio/text/writer/TextWriter.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" @@ -26,6 +27,9 @@ namespace facebook::velox::text { using dwio::common::SerDeOptions; +/// Text-specific file metadata wrapper. Currently a placeholder. +class TextFileMetadata : public dwio::common::FileMetadata {}; + struct WriterOptions : public dwio::common::WriterOptions { int64_t defaultFlushCount = 10 << 10; uint8_t headerLineCount = @@ -58,7 +62,7 @@ class TextWriter : public dwio::common::Writer { return true; } - void close() override; + std::unique_ptr close() override; void abort() override;