Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ velox_add_library(
HiveDataSink.cpp
HiveDataSource.cpp
HiveIndexSource.cpp
HivePartitionName.cpp
HiveSplitReader.cpp
HivePartitionUtil.cpp
PartitionIdGenerator.cpp
TableHandle.cpp
HEADERS
Expand All @@ -67,7 +67,6 @@ velox_add_library(
HiveDataSource.h
HiveIndexSource.h
HivePartitionFunction.h
HivePartitionName.h
HiveSplitReader.h
IndexReader.h
PartitionIdGenerator.h
Expand All @@ -83,6 +82,7 @@ velox_link_libraries(
velox_exec
velox_hive_partition_function
velox_key_encoder
PUBLIC velox_hive_iceberg_splitreader
)

velox_add_library(velox_hive_partition_function HivePartitionFunction.cpp)
Expand Down
10 changes: 9 additions & 1 deletion velox/connectors/hive/FileDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ class FileDataSource : public DataSource {
// post-read using the extraction chains.
folly::F14FastMap<column_index_t, const FileColumnHandle*> extractionColumns_;

core::ExpressionEvaluator* expressionEvaluator() const {
return expressionEvaluator_;
}

std::atomic_uint64_t& totalRemainingFilterTime() {
return totalRemainingFilterTime_;
}

dwio::common::RuntimeStatistics runtimeStats_;

private:
Expand Down Expand Up @@ -209,7 +217,7 @@ class FileDataSource : public DataSource {
/// transform column values. Indexed by output column position.
std::vector<std::function<void(VectorPtr&)>> columnPostProcessors_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
std::atomic_uint64_t totalRemainingFilterTime_{0};
std::atomic_uint64_t totalRemainingFilterCpuTime_{0};
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,9 @@ uint64_t HiveConfig::maxTargetFileSizeBytes(
config::CapacityUnit::BYTE);
}

bool HiveConfig::fanoutEnabled(const config::ConfigBase* session) const {
return session->get<bool>(
kFanoutEnabledSession, config_->get<bool>(kFanoutEnabled, true));
}

} // namespace facebook::velox::connector::hive
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ class HiveConfig : public FileConfig {
static constexpr const char* kWriteFileCreateConfig =
"write-file-create-config";

/// Controls the writer mode, whether the fanout mode writer is enabled,
/// default value is true, setting to false means clustered mode.
/// Currently applies only to the Iceberg writer.
static constexpr const char* kFanoutEnabled = "fanout-enabled";
static constexpr const char* kFanoutEnabledSession = "fanout_enabled";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const config::ConfigBase* session) const;

Expand Down Expand Up @@ -229,6 +235,9 @@ class HiveConfig : public FileConfig {

explicit HiveConfig(std::shared_ptr<const config::ConfigBase> config)
: FileConfig(std::move(config)) {}

/// Return if fanout writer mode is enabled.
bool fanoutEnabled(const config::ConfigBase* session) const;
};

} // namespace facebook::velox::connector::hive
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const config::ConfigProvider* HiveConnector::configProvider() const {
std::unique_ptr<DataSource> HiveConnector::createDataSource(
const RowTypePtr& outputType,
const ConnectorTableHandlePtr& tableHandle,
const ColumnHandleMap& columnHandles,
const std::unordered_map<std::string, ColumnHandlePtr>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
return std::make_unique<HiveDataSource>(
outputType,
Expand Down
9 changes: 8 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FieldReference.h"

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

namespace facebook::velox::connector::hive {
namespace {

Expand Down Expand Up @@ -703,7 +707,6 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
}

namespace {

core::CallTypedExprPtr replaceInputs(
const core::CallTypedExpr* call,
std::vector<core::TypedExprPtr>&& inputs) {
Expand Down Expand Up @@ -876,6 +879,10 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
}
} // namespace

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

core::TypedExprPtr extractFiltersFromRemainingFilter(
const core::TypedExprPtr& expr,
core::ExpressionEvaluator* evaluator,
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,6 @@ std::unique_ptr<common::Filter> createRangeFilter(
const variant& lower,
const variant& upper);

std::string makeUuid();

} // namespace facebook::velox::connector::hive
Loading
Loading