From fc0ff3239a243de4310f3be03bdb66c97c0a18b8 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 24 Mar 2026 22:06:04 +0200 Subject: [PATCH 1/5] HIVE-25948: Iceberg: Enable cost-based selection between Fanout and Clustered writers using column stats NDV --- .../positive/dynamic_partition_writes.q | 31 +- .../positive/dynamic_partition_writes.q.out | 429 ++++++++++++++++++ .../SortedDynPartitionOptimizer.java | 115 +++-- 3 files changed, 540 insertions(+), 35 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q index 54e46a2a2609..1b9fb8c688da 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q @@ -25,12 +25,14 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, ' insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL); create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc; +-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter) explain insert overwrite table tbl_target_identity select a, b from tbl_src; insert overwrite table tbl_target_identity select a, b from tbl_src; select * from tbl_target_identity order by a, ccy; --bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc; +-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter) explain insert into table tbl_target_bucket select a, b from tbl_src; insert into table tbl_target_bucket select a, b from tbl_src; select * from tbl_target_bucket order by a, ccy; @@ -151,4 +153,31 @@ tblproperties ('parquet.compression'='snappy','format-version'='2'); explain insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); -select * from tbl_hour_timestamp order by id, date_time_timestamp; \ No newline at end of file +select * from tbl_hour_timestamp order by id, date_time_timestamp; + +-- threshold = -1: never sort -> FanoutWriter +set hive.optimize.sort.dynamic.partition.threshold=-1; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 1: always sort -> ClusteredWriter +set hive.optimize.sort.dynamic.partition.threshold=1; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 2: NDV of b (~5) > 2 -> sort (ClusteredWriter) +set hive.optimize.sort.dynamic.partition.threshold=2; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 100: NDV of b (~5) <= 100 -> no sort (FanoutWriter) +set hive.optimize.sort.dynamic.partition.threshold=100; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- write.fanout.enabled=false: SerDe forces threshold=1 -> always ClusteredWriter +set hive.optimize.sort.dynamic.partition.threshold=0; +drop table if exists tbl_target_nofanout; +create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false'); +explain insert into tbl_target_nofanout select a, b from tbl_src; diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 6d8325af0158..67e3b120a26a 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -1653,3 +1653,432 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 40568 2018-02-12 12:45:56 2018 40568 2018-07-03 06:07:56 2018 88669 2018-05-27 11:12:00 2018 +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=5 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=5 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=5 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_17] + Output:["_col0","_col1","_col1"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:_col1 + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=5 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=5 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=5 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_17] + Output:["_col0","_col1","_col1"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:_col1 + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=5 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=5 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=5 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=5 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=5 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=5 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: drop table if exists tbl_target_nofanout +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists tbl_target_nofanout +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_target_nofanout +POSTHOOK: query: create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_target_nofanout +PREHOOK: query: explain insert into tbl_target_nofanout select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_nofanout +POSTHOOK: query: explain insert into tbl_target_nofanout select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_nofanout +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_nofanout"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_nofanout"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=5 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=5 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=5 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index a057f4137e37..439dea6f680d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -210,8 +210,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LinkedList customSortOrder = new LinkedList<>(dpCtx.getCustomSortOrder()); LinkedList customNullOrder = new LinkedList<>(dpCtx.getCustomSortNullOrder()); - // If custom expressions (partition or sort) are present, there is an explicit requirement to do sorting - if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) { + // If custom sort expressions are present, there is an explicit requirement to do sorting. + // Custom partition expressions are evaluated inside shouldDo based on column stats. + if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, customPartitionExprs, fsParent, allRSCols)) { return null; } // if RS is inserted by enforce bucketing or sorting, we need to remove it @@ -853,22 +854,25 @@ private ArrayList getPositionsToExprNodes(List pos, // The way max number of writers allowed are computed based on // (executor/container memory) * (percentage of memory taken by orc) // and dividing that by max memory (stripe size) taken by a single writer. - private boolean shouldDo(List partitionPos, Operator fsParent) { + private boolean shouldDo(List partitionPos, + List, ExprNodeDesc>> customPartitionExprs, + Operator fsParent, + ArrayList allRSCols) { int threshold = HiveConf.getIntVar(this.parseCtx.getConf(), HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD); long MAX_WRITERS = -1; switch (threshold) { - case -1: - return false; - case 0: - break; - case 1: - return true; - default: - MAX_WRITERS = threshold; - break; + case -1: + return false; + case 0: + break; + case 1: + return true; + default: + MAX_WRITERS = threshold; + break; } Statistics tStats = fsParent.getStatistics(); @@ -880,34 +884,77 @@ private boolean shouldDo(List partitionPos, Operator sort + return true; } if (MAX_WRITERS < 0) { - double orcMemPool = this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(), - (Double) OrcConf.MEMORY_POOL.getDefaultValue()); - long orcStripSize = this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), - (Long) OrcConf.STRIPE_SIZE.getDefaultValue()); - MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf()); - LOG.debug("Memory info during SDPO opt: {}", memoryInfo); - long executorMem = memoryInfo.getMaxExecutorMemory(); - MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize; + MAX_WRITERS = computeMaxWriters(); + } + return partCardinality > MAX_WRITERS; + } + private long computeMaxWriters() { + double orcMemPool = this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(), + (Double) OrcConf.MEMORY_POOL.getDefaultValue()); + long orcStripSize = this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), + (Long) OrcConf.STRIPE_SIZE.getDefaultValue()); + MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf()); + LOG.debug("Memory info during SDPO opt: {}", memoryInfo); + long executorMem = memoryInfo.getMaxExecutorMemory(); + return (long) (executorMem * orcMemPool) / orcStripSize; + } + + /** + * Computes the partition cardinality based on column NDV statistics. + * @return positive value = estimated cardinality, 0 = no partition columns, -1 = stats unavailable + */ + private long computePartCardinality(List partitionPos, + List, ExprNodeDesc>> customPartitionExprs, + Statistics tStats, Operator fsParent, + ArrayList allRSCols) { + + if (!partitionPos.isEmpty()) { + long cardinality = 1; + for (Integer idx : partitionPos) { + ColumnInfo ci = fsParent.getSchema().getSignature().get(idx); + ColStatistics partStats = tStats.getColumnStatisticsFromColName(ci.getInternalName()); + if (partStats == null) { + return -1; + } + cardinality *= partStats.getCountDistint(); + } + return cardinality; } - if (partCardinality <= MAX_WRITERS) { - return false; + + if (!customPartitionExprs.isEmpty()) { + // extract source column names from custom expressions (same approach as allStaticPartitions) + Set partColNames = new HashSet<>(); + for (Function, ExprNodeDesc> expr : customPartitionExprs) { + ExprNodeDesc resolved = expr.apply(allRSCols); + for (ExprNodeColumnDesc colDesc : ExprNodeDescUtils.findAllColumnDescs(resolved)) { + partColNames.add(colDesc.getColumn()); + } + } + long cardinality = 1; + for (String colName : partColNames) { + ColStatistics partStats = tStats.getColumnStatisticsFromColName(colName); + if (partStats == null) { + return -1; + } + cardinality *= partStats.getCountDistint(); + } + return cardinality; } - return true; + + return 0; } } } From 596246df9b75c6147fd1f50a8d529ba648a50c8a Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 2 Apr 2026 19:08:56 +0300 Subject: [PATCH 2/5] fix --- .../hive/ql/optimizer/SortedDynPartitionOptimizer.java | 6 ++++++ .../org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 439dea6f680d..602cb1a0f120 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -215,6 +215,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, customPartitionExprs, fsParent, allRSCols)) { return null; } + + // Mark that sorting will be applied with custom partition expressions, so the writer layer + // (e.g. Iceberg) knows the input is ordered and can use a clustered writer. + if (!customPartitionExprs.isEmpty()) { + dpCtx.setHasCustomPartitionOrSortExpression(true); + } // if RS is inserted by enforce bucketing or sorting, we need to remove it // since ReduceSinkDeDuplication will not merge them to single RS. // RS inserted by enforce bucketing/sorting will have bucketing column in diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 61c519aa62f6..2c8bb6399bf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -252,7 +252,6 @@ public List, ExprNodeDesc>> getCustomPartitionExpres public void addCustomPartitionExpressions( List, ExprNodeDesc>> customPartitionExpressions) { if (!org.apache.commons.collections.CollectionUtils.isEmpty(customPartitionExpressions)) { - this.hasCustomPartitionOrSortExpr = true; this.customPartitionExpressions.addAll(customPartitionExpressions); } } @@ -290,4 +289,8 @@ public void setCustomSortNullOrder(List customSortNullOrder) { public boolean hasCustomPartitionOrSortExpression() { return hasCustomPartitionOrSortExpr; } + + public void setHasCustomPartitionOrSortExpression(boolean hasCustomPartitionOrSortExpr) { + this.hasCustomPartitionOrSortExpr = hasCustomPartitionOrSortExpr; + } } From c056a4eda847d08992ad9323b1747d781fc64cfd Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 2 Apr 2026 23:07:38 +0300 Subject: [PATCH 3/5] sort dynamic partitions by default in some qtests to avoid q.out changes --- .../src/test/queries/positive/iceberg_insert_into_partition.q | 2 ++ .../positive/iceberg_insert_into_partition_transforms.q | 2 ++ .../queries/positive/iceberg_insert_overwrite_partition.q | 2 ++ .../src/test/queries/positive/merge_iceberg_partitioned_orc.q | 1 + .../positive/update_iceberg_copy_on_write_partitioned.q | 1 + .../results/positive/update_iceberg_partitioned_avro.q.out | 4 ++-- 6 files changed, 10 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q index 5b76277c9453..9792d09a3f0e 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_int( strcol string, intcol integer diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q index e5be1af2632c..4684079c0080 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_date_transform_year( bigintcol bigint, intcol integer, diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q index 613d77964631..4fedbf00f183 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_int( strcol string, intcol integer diff --git a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q index f59761621383..379e336f608b 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q @@ -1,4 +1,5 @@ -- SORT_QUERY_RESULTS +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; create external table target_ice(a int, b string, c int) partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2'); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q index 4e9632944426..5bf5b3eb27f5 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q @@ -1,3 +1,4 @@ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; drop table if exists tbl_ice; diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out index c9938d7a2236..1f837c4b586e 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out @@ -53,8 +53,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@tbl_ice -Warning: Shuffle Join MERGEJOIN[66][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product -Warning: Shuffle Join MERGEJOIN[68][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product +Warning: Shuffle Join MERGEJOIN[64][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[66][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800) PREHOOK: type: QUERY PREHOOK: Input: default@tbl_ice From e4d287834939b5d667fb395470564dee026c2743 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Mon, 6 Apr 2026 20:06:29 +0300 Subject: [PATCH 4/5] review comments --- .../mr/hive/udf/GenericUDFIcebergBucket.java | 35 ++++- ...tGenericUDFIcebergBucketStatEstimator.java | 73 ++++++++++ .../positive/dynamic_partition_writes.q | 22 +-- .../positive/dynamic_partition_writes.q.out | 130 ++++++++---------- ...erg_insert_into_partition_transforms.q.out | 24 ++-- .../SortedDynPartitionOptimizer.java | 38 +++-- .../estimator/StatEstimatorProvider.java | 2 +- 7 files changed, 207 insertions(+), 117 deletions(-) create mode 100644 iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java index f23bfdfe0aea..7ba3413820c7 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java @@ -19,11 +19,16 @@ package org.apache.iceberg.mr.hive.udf; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; import java.util.function.Function; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -52,7 +57,7 @@ value = "_FUNC_(value, bucketCount) - " + "Returns the bucket value calculated by Iceberg bucket transform function ", extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4") -public class GenericUDFIcebergBucket extends GenericUDF { +public class GenericUDFIcebergBucket extends GenericUDF implements StatEstimatorProvider { private final IntWritable result = new IntWritable(); private int numBuckets = -1; private transient PrimitiveObjectInspector argumentOI; @@ -209,4 +214,32 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException { public String getDisplayString(String[] children) { return getStandardDisplayString("iceberg_bucket", children); } + + @Override + public StatEstimator getStatEstimator() { + return new BucketStatEstimator(); + } + + private static class BucketStatEstimator implements StatEstimator { + @Override + public Optional estimate(List argStats) { + if (argStats.size() != 2) { + return Optional.empty(); + } + ColStatistics inputStats = argStats.get(0); + ColStatistics bucketCountStats = argStats.get(1); + ColStatistics.Range bucketRange = bucketCountStats.getRange(); + if (bucketRange == null || bucketRange.minValue == null) { + return Optional.empty(); + } + long numBuckets = bucketRange.minValue.longValue(); + if (numBuckets <= 0) { + return Optional.empty(); + } + ColStatistics result = inputStats.clone(); + result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets)); + result.setRange(0, numBuckets - 1); + return Optional.of(result); + } + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java new file mode 100644 index 000000000000..8b8f47e5af02 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.udf; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the BucketStatEstimator in GenericUDFIcebergBucket. + * Verifies that the StatEstimator correctly narrows NDV based on bucket count. + */ +public class TestGenericUDFIcebergBucketStatEstimator { + + @Test + public void testNdvNarrowedByBucketCount() { + // source NDV (100) > numBuckets (8) -> output NDV should be 8 + Optional result = estimateBucket(100, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(8, result.get().getCountDistint()); + } + + @Test + public void testNdvBelowBucketCount() { + // source NDV (3) < numBuckets (8) -> output NDV should be 3 + Optional result = estimateBucket(3, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(3, result.get().getCountDistint()); + } + + @Test + public void testNdvEqualsBucketCount() { + // source NDV (8) == numBuckets (8) -> output NDV should be 8 + Optional result = estimateBucket(8, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(8, result.get().getCountDistint()); + } + + @Test + public void testZeroBucketsReturnsEmpty() { + Optional result = estimateBucket(100, 0); + Assert.assertFalse(result.isPresent()); + } + + private Optional estimateBucket(long sourceNdv, long numBuckets) { + ColStatistics sourceStats = new ColStatistics("col", "int"); + sourceStats.setCountDistint(sourceNdv); + ColStatistics numBucketsStats = new ColStatistics("numBuckets", "int"); + numBucketsStats.setRange(numBuckets, numBuckets); + + StatEstimator estimator = new GenericUDFIcebergBucket().getStatEstimator(); + return estimator.estimate(Arrays.asList(sourceStats, numBucketsStats)); + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q index 1b9fb8c688da..f32529f13716 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q @@ -25,19 +25,19 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, ' insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL); create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc; --- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter) +-- threshold = 0 (default, cost-based): NDV(b) = 5 > MAX_WRITERS (~3) -> sort (ClusteredWriter) explain insert overwrite table tbl_target_identity select a, b from tbl_src; insert overwrite table tbl_target_identity select a, b from tbl_src; select * from tbl_target_identity order by a, ccy; ---bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting -create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc; --- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter) +--bucketed case - should invoke GenericUDFIcebergBucket to estimate bucket NDV for sort decision +create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc; +-- threshold = 0 (default, cost-based): bucket NDV = min(NDV(b) = 5, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter) explain insert into table tbl_target_bucket select a, b from tbl_src; insert into table tbl_target_bucket select a, b from tbl_src; select * from tbl_target_bucket order by a, ccy; ---mixed case - 1 identity + 1 bucket cols +--mixed case - 1 identity + 1 bucket cols: NDV(b) * min(NDV(c) = 8, 3) = 5 * 3 = 15 > MAX_WRITERS (~3) -> sort (ClusteredWriter) create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc; explain insert into table tbl_target_mixed select * from tbl_src; insert into table tbl_target_mixed select * from tbl_src; @@ -45,11 +45,11 @@ select * from tbl_target_mixed order by a, ccy; select `partition` from default.tbl_target_mixed.partitions order by `partition`; select * from default.tbl_target_mixed.files; ---1 of 2 partition cols is folded with constant - should still sort +--b = 'EUR' folds ccy to constant: bucket NDV = min(NDV(c) = 8, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter) explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'; insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'; ---all partitions cols folded - should not sort as it's not needed +--all partition cols folded (b = 'USD', c = 100) -> no sort (FanoutWriter) explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100; insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100; @@ -126,7 +126,7 @@ explain insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 20 insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); select * from tbl_month_timestamp order by id, date_time_timestamp; ---day case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting +--day case - should invoke GenericUDFIcebergDay to convert the date/timestamp value to day and use for clustering and sorting create external table tbl_day_date (id string, date_time_date date, year_partition int) partitioned by spec (year_partition, day(date_time_date)) stored by iceberg stored as parquet @@ -145,7 +145,7 @@ explain insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018 insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); select * from tbl_day_timestamp order by id, date_time_timestamp; ---hour case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting +--hour case - should invoke GenericUDFIcebergHour to convert the timestamp value to hour and use for clustering and sorting create external table tbl_hour_timestamp (id string, date_time_timestamp timestamp, year_partition int) partitioned by spec (year_partition, hour(date_time_timestamp)) stored by iceberg stored as parquet @@ -165,12 +165,12 @@ set hive.optimize.sort.dynamic.partition.threshold=1; explain insert into tbl_target_identity select a, b from tbl_src; explain insert into tbl_target_bucket select a, b from tbl_src; --- threshold = 2: NDV of b (~5) > 2 -> sort (ClusteredWriter) +-- threshold = 2: bucket NDV = min(NDV(b) = 5, 3) = 3 > 2 -> sort (ClusteredWriter) set hive.optimize.sort.dynamic.partition.threshold=2; explain insert into tbl_target_identity select a, b from tbl_src; explain insert into tbl_target_bucket select a, b from tbl_src; --- threshold = 100: NDV of b (~5) <= 100 -> no sort (FanoutWriter) +-- threshold = 100: bucket NDV = min(NDV(b) = 5, 3) = 3 <= 100 -> no sort (FanoutWriter) set hive.optimize.sort.dynamic.partition.threshold=100; explain insert into tbl_target_identity select a, b from tbl_src; explain insert into tbl_target_bucket select a, b from tbl_src; diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 67e3b120a26a..01917600220a 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -207,11 +207,11 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 90 PLN 100 CZK 110 NULL -PREHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc +PREHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@tbl_target_bucket -POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc +POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@tbl_target_bucket @@ -227,7 +227,6 @@ Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE) -Reducer 3 <- Map 1 (SIMPLE_EDGE) Stage-3 Stats Work{} @@ -238,31 +237,25 @@ Stage-3 Dependency Collection{} Stage-1 Reducer 2 vectorized - File Output Operator [FS_18] - table:{"name:":"default.tbl_target_bucket"} - Select Operator [SEL_17] - Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] - <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_13] - PartitionCols:iceberg_bucket(_col1, 2) - Select Operator [SEL_12] (rows=22 width=87) - Output:["_col0","_col1"] - TableScan [TS_0] (rows=22 width=87) - default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] - Reducer 3 vectorized - File Output Operator [FS_21] - Select Operator [SEL_20] (rows=5 width=574) + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=5 width=336) + Group By Operator [GBY_15] (rows=3 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_16] + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) - Select Operator [SEL_14] (rows=22 width=87) + Group By Operator [GBY_13] (rows=4 width=402) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] - Please refer to the previous Select Operator [SEL_12] + Please refer to the previous Select Operator [SEL_10] PREHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src PREHOOK: type: QUERY @@ -348,12 +341,12 @@ Stage-3 File Output Operator [FS_21] Select Operator [SEL_20] (rows=11 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_19] (rows=11 width=591) + Group By Operator [GBY_19] (rows=11 width=595) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0, _col1 - Group By Operator [GBY_15] (rows=11 width=659) + Group By Operator [GBY_15] (rows=11 width=663) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) Select Operator [SEL_14] (rows=22 width=94) Output:["a","ccy","c"] @@ -458,7 +451,6 @@ Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE) -Reducer 3 <- Map 1 (SIMPLE_EDGE) Stage-3 Stats Work{} @@ -469,33 +461,27 @@ Stage-3 Dependency Collection{} Stage-1 Reducer 2 vectorized - File Output Operator [FS_21] - table:{"name:":"default.tbl_target_mixed"} - Select Operator [SEL_20] - Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"] - <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_16] - PartitionCols:_col1, iceberg_bucket(_col2, 3) - Select Operator [SEL_15] (rows=4 width=99) - Output:["_col0","_col1","_col2"] - Filter Operator [FIL_14] (rows=4 width=99) - predicate:(b = 'EUR') - TableScan [TS_0] (rows=22 width=94) - default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] - Reducer 3 vectorized - File Output Operator [FS_24] - Select Operator [SEL_23] (rows=4 width=1030) + File Output Operator [FS_20] + Select Operator [SEL_19] (rows=3 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_22] (rows=4 width=591) + Group By Operator [GBY_18] (rows=3 width=595) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_19] + File Output Operator [FS_14] + table:{"name:":"default.tbl_target_mixed"} + Select Operator [SEL_13] (rows=4 width=99) + Output:["_col0","_col1","_col2"] + Filter Operator [FIL_12] (rows=4 width=99) + predicate:(b = 'EUR') + TableScan [TS_0] (rows=22 width=94) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] + SHUFFLE [RS_17] PartitionCols:_col0, _col1 - Group By Operator [GBY_18] (rows=4 width=659) + Group By Operator [GBY_16] (rows=3 width=663) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) - Select Operator [SEL_17] (rows=4 width=99) + Select Operator [SEL_15] (rows=4 width=99) Output:["a","ccy","c"] - Please refer to the previous Select Operator [SEL_15] + Please refer to the previous Select Operator [SEL_13] PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'EUR' PREHOOK: type: QUERY @@ -530,7 +516,7 @@ Stage-3 File Output Operator [FS_20] Select Operator [SEL_19] (rows=1 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_18] (rows=1 width=591) + Group By Operator [GBY_18] (rows=1 width=595) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_14] @@ -543,7 +529,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] SHUFFLE [RS_17] PartitionCols:_col0, _col1 - Group By Operator [GBY_16] (rows=1 width=659) + Group By Operator [GBY_16] (rows=1 width=663) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) Select Operator [SEL_15] (rows=1 width=99) Output:["a","ccy","c"] @@ -1718,9 +1704,9 @@ Stage-3 Stage-1 Reducer 2 vectorized File Output Operator [FS_17] - Select Operator [SEL_16] (rows=5 width=574) + Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_15] (rows=5 width=336) + Group By Operator [GBY_15] (rows=3 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_11] @@ -1731,8 +1717,8 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_13] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Group By Operator [GBY_13] (rows=4 width=402) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_10] @@ -1812,25 +1798,25 @@ Stage-3 File Output Operator [FS_18] table:{"name:":"default.tbl_target_bucket"} Select Operator [SEL_17] - Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + Output:["_col0","_col1","iceberg_bucket(_col1, 3)"] <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_13] - PartitionCols:iceberg_bucket(_col1, 2) + PartitionCols:iceberg_bucket(_col1, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["_col0","_col1"] TableScan [TS_0] (rows=22 width=87) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] Reducer 3 vectorized File Output Operator [FS_21] - Select Operator [SEL_20] (rows=5 width=574) + Select Operator [SEL_20] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=5 width=336) + Group By Operator [GBY_19] (rows=3 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Group By Operator [GBY_15] (rows=4 width=402) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_12] @@ -1910,25 +1896,25 @@ Stage-3 File Output Operator [FS_18] table:{"name:":"default.tbl_target_bucket"} Select Operator [SEL_17] - Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + Output:["_col0","_col1","iceberg_bucket(_col1, 3)"] <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_13] - PartitionCols:iceberg_bucket(_col1, 2) + PartitionCols:iceberg_bucket(_col1, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["_col0","_col1"] TableScan [TS_0] (rows=22 width=87) default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] Reducer 3 vectorized File Output Operator [FS_21] - Select Operator [SEL_20] (rows=5 width=574) + Select Operator [SEL_20] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=5 width=336) + Group By Operator [GBY_19] (rows=3 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Group By Operator [GBY_15] (rows=4 width=402) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_12] @@ -1998,9 +1984,9 @@ Stage-3 Stage-1 Reducer 2 vectorized File Output Operator [FS_17] - Select Operator [SEL_16] (rows=5 width=574) + Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_15] (rows=5 width=336) + Group By Operator [GBY_15] (rows=3 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_11] @@ -2011,8 +1997,8 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_13] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Group By Operator [GBY_13] (rows=4 width=402) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] Please refer to the previous Select Operator [SEL_10] @@ -2069,14 +2055,14 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] Reducer 3 vectorized File Output Operator [FS_21] - Select Operator [SEL_20] (rows=5 width=574) + Select Operator [SEL_20] (rows=2 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=5 width=336) + Group By Operator [GBY_19] (rows=2 width=334) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=5 width=404) + Group By Operator [GBY_15] (rows=3 width=402) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out index fcd1c17b24ed..42f095dae620 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out @@ -2600,13 +2600,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Reducer 2 Execution mode: vectorized @@ -2616,7 +2616,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -2717,13 +2717,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Reducer 2 Execution mode: vectorized @@ -2733,7 +2733,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -2826,13 +2826,13 @@ STAGE PLANS: minReductionHashAggr: 0.75 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Execution mode: vectorized Reducer 2 @@ -2843,7 +2843,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -3039,13 +3039,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 - Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: decimal(38,0)), _col2 (type: decimal(38,0)), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary) Reducer 2 Execution mode: vectorized @@ -3070,7 +3070,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 - Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'DECIMAL' (type: string), _col1 (type: decimal(38,0)), _col2 (type: decimal(38,0)), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 602cb1a0f120..f5431fa34934 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.orc.OrcConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,11 +217,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } - // Mark that sorting will be applied with custom partition expressions, so the writer layer - // (e.g. Iceberg) knows the input is ordered and can use a clustered writer. - if (!customPartitionExprs.isEmpty()) { - dpCtx.setHasCustomPartitionOrSortExpression(true); - } // if RS is inserted by enforce bucketing or sorting, we need to remove it // since ReduceSinkDeDuplication will not merge them to single RS. // RS inserted by enforce bucketing/sorting will have bucketing column in @@ -235,6 +231,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + // Mark that sorting will be applied with custom partition expressions, so the writer layer + // (e.g. Iceberg) knows the input is ordered and can use a clustered writer. + if (!customPartitionExprs.isEmpty()) { + dpCtx.setHasCustomPartitionOrSortExpression(true); + } + // unlink connection between FS and its parent fsParent = fsOp.getParentOperators().get(0); // store the index of the file sink operator to later insert the modified operator with RS at the same position @@ -927,37 +929,33 @@ private long computePartCardinality(List partitionPos, Statistics tStats, Operator fsParent, ArrayList allRSCols) { + long partCardinality = 1; + if (!partitionPos.isEmpty()) { - long cardinality = 1; for (Integer idx : partitionPos) { ColumnInfo ci = fsParent.getSchema().getSignature().get(idx); ColStatistics partStats = tStats.getColumnStatisticsFromColName(ci.getInternalName()); if (partStats == null) { return -1; } - cardinality *= partStats.getCountDistint(); + partCardinality *= partStats.getCountDistint(); } - return cardinality; + return partCardinality; } if (!customPartitionExprs.isEmpty()) { - // extract source column names from custom expressions (same approach as allStaticPartitions) - Set partColNames = new HashSet<>(); for (Function, ExprNodeDesc> expr : customPartitionExprs) { ExprNodeDesc resolved = expr.apply(allRSCols); - for (ExprNodeColumnDesc colDesc : ExprNodeDescUtils.findAllColumnDescs(resolved)) { - partColNames.add(colDesc.getColumn()); - } - } - long cardinality = 1; - for (String colName : partColNames) { - ColStatistics partStats = tStats.getColumnStatisticsFromColName(colName); - if (partStats == null) { + // Use StatsUtils to get accurate output stats, which leverages StatEstimator + // implementations on UDFs (e.g. iceberg_bucket reports min(inputNDV, numBuckets)) + ColStatistics exprStats = StatsUtils.getColStatisticsFromExpression( + this.parseCtx.getConf(), tStats, resolved); + if (exprStats == null) { return -1; } - cardinality *= partStats.getCountDistint(); + partCardinality *= exprStats.getCountDistint(); } - return cardinality; + return partCardinality; } return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java index 96865d194c6e..7c391332df6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java @@ -25,5 +25,5 @@ public interface StatEstimatorProvider { /** * Returns the {@link StatEstimator} for the given UDF instance. */ - public StatEstimator getStatEstimator(); + StatEstimator getStatEstimator(); } From 509b95a8b915689e7508c1b129270a58c856602d Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 8 Apr 2026 11:15:48 +0300 Subject: [PATCH 5/5] review comments #2 --- .../mr/hive/udf/GenericUDFIcebergBucket.java | 31 ++++++++-------- ...tGenericUDFIcebergBucketStatEstimator.java | 8 ++--- .../positive/dynamic_partition_writes.q.out | 36 +++++++++---------- ...erg_insert_into_partition_transforms.q.out | 24 ++++++------- 4 files changed, 50 insertions(+), 49 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java index 7ba3413820c7..0091887caa48 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator; import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -217,28 +218,30 @@ public String getDisplayString(String[] children) { @Override public StatEstimator getStatEstimator() { - return new BucketStatEstimator(); + return new BucketStatEstimator(numBuckets); } - private static class BucketStatEstimator implements StatEstimator { + static class BucketStatEstimator implements StatEstimator { + private final int numBuckets; + + BucketStatEstimator(int numBuckets) { + this.numBuckets = numBuckets; + } + @Override public Optional estimate(List argStats) { - if (argStats.size() != 2) { - return Optional.empty(); - } - ColStatistics inputStats = argStats.get(0); - ColStatistics bucketCountStats = argStats.get(1); - ColStatistics.Range bucketRange = bucketCountStats.getRange(); - if (bucketRange == null || bucketRange.minValue == null) { + if (argStats.isEmpty() || numBuckets <= 0) { return Optional.empty(); } - long numBuckets = bucketRange.minValue.longValue(); - if (numBuckets <= 0) { - return Optional.empty(); - } - ColStatistics result = inputStats.clone(); + ColStatistics inputStats = argStats.getFirst(); + + ColStatistics result = new ColStatistics(); result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets)); + result.setNumNulls(inputStats.getNumNulls()); + result.setAvgColLen(JavaDataModel.get().primitive1()); result.setRange(0, numBuckets - 1); + result.setIsEstimated(true); + return Optional.of(result); } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java index 8b8f47e5af02..584d67be5004 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java @@ -61,13 +61,11 @@ public void testZeroBucketsReturnsEmpty() { Assert.assertFalse(result.isPresent()); } - private Optional estimateBucket(long sourceNdv, long numBuckets) { + private static Optional estimateBucket(long sourceNdv, int numBuckets) { ColStatistics sourceStats = new ColStatistics("col", "int"); sourceStats.setCountDistint(sourceNdv); - ColStatistics numBucketsStats = new ColStatistics("numBuckets", "int"); - numBucketsStats.setRange(numBuckets, numBuckets); - StatEstimator estimator = new GenericUDFIcebergBucket().getStatEstimator(); - return estimator.estimate(Arrays.asList(sourceStats, numBucketsStats)); + StatEstimator estimator = new GenericUDFIcebergBucket.BucketStatEstimator(numBuckets); + return estimator.estimate(Arrays.asList(sourceStats)); } } diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 01917600220a..0c58a118a7ce 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -240,7 +240,7 @@ Stage-3 File Output Operator [FS_17] Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_15] (rows=3 width=334) + Group By Operator [GBY_15] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_11] @@ -251,7 +251,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_13] (rows=4 width=402) + Group By Operator [GBY_13] (rows=4 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] @@ -341,12 +341,12 @@ Stage-3 File Output Operator [FS_21] Select Operator [SEL_20] (rows=11 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_19] (rows=11 width=595) + Group By Operator [GBY_19] (rows=11 width=591) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0, _col1 - Group By Operator [GBY_15] (rows=11 width=663) + Group By Operator [GBY_15] (rows=11 width=659) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) Select Operator [SEL_14] (rows=22 width=94) Output:["a","ccy","c"] @@ -464,7 +464,7 @@ Stage-3 File Output Operator [FS_20] Select Operator [SEL_19] (rows=3 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_18] (rows=3 width=595) + Group By Operator [GBY_18] (rows=3 width=591) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_14] @@ -477,7 +477,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] SHUFFLE [RS_17] PartitionCols:_col0, _col1 - Group By Operator [GBY_16] (rows=3 width=663) + Group By Operator [GBY_16] (rows=3 width=659) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) Select Operator [SEL_15] (rows=4 width=99) Output:["a","ccy","c"] @@ -516,7 +516,7 @@ Stage-3 File Output Operator [FS_20] Select Operator [SEL_19] (rows=1 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_18] (rows=1 width=595) + Group By Operator [GBY_18] (rows=1 width=591) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_14] @@ -529,7 +529,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] SHUFFLE [RS_17] PartitionCols:_col0, _col1 - Group By Operator [GBY_16] (rows=1 width=663) + Group By Operator [GBY_16] (rows=1 width=659) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) Select Operator [SEL_15] (rows=1 width=99) Output:["a","ccy","c"] @@ -1706,7 +1706,7 @@ Stage-3 File Output Operator [FS_17] Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_15] (rows=3 width=334) + Group By Operator [GBY_15] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_11] @@ -1717,7 +1717,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_13] (rows=4 width=402) + Group By Operator [GBY_13] (rows=4 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] @@ -1810,12 +1810,12 @@ Stage-3 File Output Operator [FS_21] Select Operator [SEL_20] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=3 width=334) + Group By Operator [GBY_19] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=4 width=402) + Group By Operator [GBY_15] (rows=4 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] @@ -1908,12 +1908,12 @@ Stage-3 File Output Operator [FS_21] Select Operator [SEL_20] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=3 width=334) + Group By Operator [GBY_19] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=4 width=402) + Group By Operator [GBY_15] (rows=4 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] @@ -1986,7 +1986,7 @@ Stage-3 File Output Operator [FS_17] Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_15] (rows=3 width=334) + Group By Operator [GBY_15] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized File Output Operator [FS_11] @@ -1997,7 +1997,7 @@ Stage-3 default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_13] (rows=4 width=402) + Group By Operator [GBY_13] (rows=4 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] @@ -2057,12 +2057,12 @@ Stage-3 File Output Operator [FS_21] Select Operator [SEL_20] (rows=2 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=2 width=334) + Group By Operator [GBY_19] (rows=2 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_16] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=3 width=402) + Group By Operator [GBY_15] (rows=3 width=404) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) Select Operator [SEL_14] (rows=22 width=87) Output:["a","ccy"] diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out index 42f095dae620..fcd1c17b24ed 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out @@ -2600,13 +2600,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Reducer 2 Execution mode: vectorized @@ -2616,7 +2616,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -2717,13 +2717,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Reducer 2 Execution mode: vectorized @@ -2733,7 +2733,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -2826,13 +2826,13 @@ STAGE PLANS: minReductionHashAggr: 0.75 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 578 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: struct), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary), _col6 (type: bigint), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 (type: bigint), _col13 (type: binary) Execution mode: vectorized Reducer 2 @@ -2843,7 +2843,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13 - Statistics: Num rows: 1 Data size: 510 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 504 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col1,0)) (type: bigint), COALESCE(_col2,0) (type: double), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 'LONG' (type: string), _col6 (type: bigint), _col7 (type: bigint), (_col3 - _col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: bigint), UDFToLong(_col11) (type: bigint), (_col3 - _col12) (type: bigint), COALESCE(ndv_compute_bit_vector(_col13),0) (type: bigint), _col13 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18 @@ -3039,13 +3039,13 @@ STAGE PLANS: minReductionHashAggr: 0.4 mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 - Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) null sort order: z sort order: + Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: decimal(38,0)), _col2 (type: decimal(38,0)), _col3 (type: bigint), _col4 (type: bigint), _col5 (type: binary) Reducer 2 Execution mode: vectorized @@ -3070,7 +3070,7 @@ STAGE PLANS: keys: KEY._col0 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 - Statistics: Num rows: 1 Data size: 496 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: 'DECIMAL' (type: string), _col1 (type: decimal(38,0)), _col2 (type: decimal(38,0)), (_col3 - _col4) (type: bigint), COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), named_struct('pcol_bucket',_col0) (type: struct) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6