Skip to content

Commit cc7465a

Browse files
committed
HIVE-27190: Implement col stats cache for hive iceberg table
1 parent d55885e commit cc7465a

5 files changed

Lines changed: 63 additions & 46 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,13 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
707707
return false;
708708
}
709709

710+
@Override
711+
public String getStatsCacheKeySuffix(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
712+
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
713+
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
714+
return snapshot != null ? String.valueOf(snapshot.snapshotId()) : null;
715+
}
716+
710717
@Override
711718
public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
712719
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());

ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,16 @@ default boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table
334334
return false;
335335
}
336336

337+
/**
338+
* Returns an optional suffix to include in the column statistics cache key. This allows storage handlers to
339+
* differentiate cache entries based on table-specific context (e.g., snapshot ID for Iceberg tables).
340+
* @param table table object
341+
* @return a cache key suffix, or null if no additional differentiation is needed
342+
*/
343+
default String getStatsCacheKeySuffix(org.apache.hadoop.hive.ql.metadata.Table table) {
344+
return null;
345+
}
346+
337347
/**
338348
* Check if the storage handler can set col statistics.
339349
* @param table table object

ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import java.util.HashSet;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.Optional;
2526
import java.util.Set;
2627
import java.util.concurrent.atomic.AtomicInteger;
27-
import java.util.stream.Collectors;
2828

2929
import org.apache.calcite.linq4j.tree.Expression;
3030
import org.apache.calcite.plan.RelOptSchema;
@@ -82,7 +82,6 @@
8282
import com.google.common.collect.ImmutableList;
8383
import com.google.common.collect.ImmutableMap;
8484
import com.google.common.collect.Lists;
85-
import com.google.common.collect.Sets;
8685

8786
public class RelOptHiveTable implements RelOptTable {
8887

@@ -125,7 +124,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, RelDataTypeFactory typeFactor
125124
this.schema = calciteSchema;
126125
this.typeFactory = typeFactory;
127126
this.qualifiedTblName = ImmutableList.copyOf(qualifiedTblName);
128-
this.name = this.qualifiedTblName.stream().collect(Collectors.joining("."));
127+
this.name = String.join(".", this.qualifiedTblName);
129128
this.rowType = rowType;
130129
this.hiveTblMetadata = hiveTblMetadata;
131130
this.hiveColStatsMap = new HashMap<>();
@@ -192,15 +191,15 @@ public List<ColumnStrategy> getColumnStrategies() {
192191
public RelOptHiveTable copy(RelDataType newRowType) {
193192
// 1. Build map of column name to col index of original schema
194193
// Assumption: Hive Table can not contain duplicate column names
195-
Map<String, Integer> nameToColIndxMap = new HashMap<String, Integer>();
194+
Map<String, Integer> nameToColIndxMap = new HashMap<>();
196195
for (RelDataTypeField f : this.rowType.getFieldList()) {
197196
nameToColIndxMap.put(f.getName(), f.getIndex());
198197
}
199198

200199
// 2. Build nonPart/Part/Virtual column info for new RowSchema
201-
List<ColumnInfo> newHiveNonPartitionCols = new ArrayList<ColumnInfo>();
202-
List<ColumnInfo> newHivePartitionCols = new ArrayList<ColumnInfo>();
203-
List<VirtualColumn> newHiveVirtualCols = new ArrayList<VirtualColumn>();
200+
List<ColumnInfo> newHiveNonPartitionCols = new ArrayList<>();
201+
List<ColumnInfo> newHivePartitionCols = new ArrayList<>();
202+
List<VirtualColumn> newHiveVirtualCols = new ArrayList<>();
204203
Map<Integer, VirtualColumn> virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols,
205204
this.noOfNonVirtualCols);
206205
Integer originalColIndx;
@@ -329,8 +328,8 @@ private List<RelReferentialConstraint> generateReferentialConstraints() {
329328
ImmutableList.Builder<RelReferentialConstraint> builder = ImmutableList.builder();
330329
if (foreignKeyInfo != null && !foreignKeyInfo.getForeignKeys().isEmpty()) {
331330
for (List<ForeignKeyCol> fkCols : foreignKeyInfo.getForeignKeys().values()) {
332-
String parentDatabaseName = fkCols.get(0).parentDatabaseName;
333-
String parentTableName = fkCols.get(0).parentTableName;
331+
String parentDatabaseName = fkCols.getFirst().parentDatabaseName;
332+
String parentTableName = fkCols.getFirst().parentTableName;
334333
String qualifiedName;
335334
List<String> parentTableQualifiedName = new ArrayList<>();
336335
if (parentDatabaseName != null && !parentDatabaseName.isEmpty()) {
@@ -390,7 +389,7 @@ public <T> T unwrap(Class<T> arg0) {
390389

391390
@Override
392391
public List<RelCollation> getCollationList() {
393-
ImmutableList.Builder<RelFieldCollation> collationList = new ImmutableList.Builder<RelFieldCollation>();
392+
ImmutableList.Builder<RelFieldCollation> collationList = new ImmutableList.Builder<>();
394393
for (Order sortColumn : this.hiveTblMetadata.getSortCols()) {
395394
for (int i=0; i<this.hiveTblMetadata.getSd().getCols().size(); i++) {
396395
FieldSchema field = this.hiveTblMetadata.getSd().getCols().get(i);
@@ -411,7 +410,7 @@ public List<RelCollation> getCollationList() {
411410

412411
@Override
413412
public RelDistribution getDistribution() {
414-
ImmutableList.Builder<Integer> columnPositions = new ImmutableList.Builder<Integer>();
413+
ImmutableList.Builder<Integer> columnPositions = new ImmutableList.Builder<>();
415414
for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) {
416415
for (int i=0; i<this.hiveTblMetadata.getSd().getCols().size(); i++) {
417416
FieldSchema field = this.hiveTblMetadata.getSd().getCols().get(i);
@@ -435,7 +434,7 @@ public double getRowCount() {
435434
if (null == partitionList) {
436435
// we are here either unpartitioned table or partitioned table with no
437436
// predicates
438-
computePartitionList(hiveConf, null, new HashSet<Integer>());
437+
computePartitionList(hiveConf, null, new HashSet<>());
439438
}
440439
rowCount = StatsUtils.getNumRows(hiveConf, getNonPartColumns(), hiveTblMetadata,
441440
partitionList, noColsMissingStats);
@@ -465,7 +464,7 @@ private String getColNamesForLogging(Set<String> colLst) {
465464
public void computePartitionList(HiveConf conf, RexNode pruneNode, Set<Integer> partOrVirtualCols) {
466465
try {
467466
if (!hiveTblMetadata.isPartitioned() || pruneNode == null
468-
|| InputFinder.bits(pruneNode).length() == 0) {
467+
|| InputFinder.bits(pruneNode).isEmpty()) {
469468
// there is no predicate on partitioning column, we need all partitions
470469
// in this case.
471470
partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(),
@@ -485,11 +484,11 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode, Set<Integer>
485484
}
486485

487486
private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) {
488-
List<String> nonPartColNamesThatRqrStats = new ArrayList<String>();
489-
List<Integer> nonPartColIndxsThatRqrStats = new ArrayList<Integer>();
490-
List<String> partColNamesThatRqrStats = new ArrayList<String>();
491-
List<Integer> partColIndxsThatRqrStats = new ArrayList<Integer>();
492-
Set<String> colNamesFailedStats = new HashSet<String>();
487+
List<String> nonPartColNamesThatRqrStats = new ArrayList<>();
488+
List<Integer> nonPartColIndxsThatRqrStats = new ArrayList<>();
489+
List<String> partColNamesThatRqrStats = new ArrayList<>();
490+
List<Integer> partColIndxsThatRqrStats = new ArrayList<>();
491+
Set<String> colNamesFailedStats = new HashSet<>();
493492

494493
// 1. Separate required columns to Non Partition and Partition Cols
495494
ColumnInfo tmp;
@@ -514,19 +513,24 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats)
514513
if (null == partitionList) {
515514
// We could be here either because its an unpartitioned table or because
516515
// there are no pruning predicates on a partitioned table.
517-
computePartitionList(hiveConf, null, new HashSet<Integer>());
516+
computePartitionList(hiveConf, null, new HashSet<>());
518517
}
519518

520-
String partitionListKey = partitionList.getKey().orElse(null);
521-
ColumnStatsList colStatsCached = colStatsCache.get(partitionListKey);
522-
if (colStatsCached == null) {
523-
colStatsCached = new ColumnStatsList();
524-
colStatsCache.put(partitionListKey, colStatsCached);
525-
}
519+
String key = partitionList.getKey();
520+
521+
String partitionListKey = Optional.ofNullable(hiveTblMetadata.getStorageHandler())
522+
.map(handler -> handler.getStatsCacheKeySuffix(hiveTblMetadata))
523+
.map(suffix -> key + ";" + suffix)
524+
.orElse(key);
525+
526+
ColumnStatsList colStatsCached = colStatsCache.computeIfAbsent(
527+
partitionListKey,
528+
k -> new ColumnStatsList()
529+
);
526530

527531
// 2. Obtain Col Stats for Non Partition Cols
528-
if (nonPartColNamesThatRqrStats.size() > 0) {
529-
List<ColStatistics> hiveColStats = new ArrayList<ColStatistics>();
532+
if (!nonPartColNamesThatRqrStats.isEmpty()) {
533+
List<ColStatistics> hiveColStats = new ArrayList<>();
530534

531535
if (!hiveTblMetadata.isPartitioned()) {
532536
// 2.1 Handle the case for unpartitioned table.
@@ -547,9 +551,9 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats)
547551
if (hiveColStats.isEmpty()) {
548552
colNamesFailedStats.addAll(nonPartColNamesThatRqrStats);
549553
} else if (hiveColStats.size() != nonPartColNamesThatRqrStats.size()) {
550-
Set<String> setOfFiledCols = new HashSet<String>(nonPartColNamesThatRqrStats);
554+
Set<String> setOfFiledCols = new HashSet<>(nonPartColNamesThatRqrStats);
551555

552-
Set<String> setOfObtainedColStats = new HashSet<String>();
556+
Set<String> setOfObtainedColStats = new HashSet<>();
553557
for (ColStatistics cs : hiveColStats) {
554558
setOfObtainedColStats.add(cs.getColumnName());
555559
}
@@ -561,7 +565,7 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats)
561565
// nonPartColNamesThatRqrStats. reorder hiveColStats so we can build hiveColStatsMap
562566
// using nonPartColIndxsThatRqrStats as below
563567
Map<String, ColStatistics> columnStatsMap =
564-
new HashMap<String, ColStatistics>(hiveColStats.size());
568+
new HashMap<>(hiveColStats.size());
565569
for (ColStatistics cs : hiveColStats) {
566570
columnStatsMap.put(cs.getColumnName(), cs);
567571
// even though the stats were estimated we need to warn user that
@@ -586,22 +590,21 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats)
586590
if (partitionList.getNotDeniedPartns().isEmpty()) {
587591
// no need to make a metastore call
588592
rowCount = 0;
589-
hiveColStats = new ArrayList<ColStatistics>();
593+
hiveColStats = new ArrayList<>();
590594
for (int i = 0; i < nonPartColNamesThatRqrStats.size(); i++) {
591595
// add empty stats object for each column
592596
hiveColStats.add(
593597
new ColStatistics(
594598
nonPartColNamesThatRqrStats.get(i),
595599
hiveNonPartitionColsMap.get(nonPartColIndxsThatRqrStats.get(i)).getTypeName()));
596600
}
597-
colNamesFailedStats.clear();
598601
colStatsCached.updateState(State.COMPLETE);
599602
} else {
600603
Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList,
601604
hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached,
602605
nonPartColNamesThatRqrStats, true);
603606
rowCount = stats.getNumRows();
604-
hiveColStats = new ArrayList<ColStatistics>();
607+
hiveColStats = new ArrayList<>();
605608
for (String c : nonPartColNamesThatRqrStats) {
606609
ColStatistics cs = stats.getColumnStatisticsFromColName(c);
607610
if (cs != null) {
@@ -622,7 +625,7 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats)
622625
}
623626
}
624627

625-
if (hiveColStats != null && hiveColStats.size() == nonPartColNamesThatRqrStats.size()) {
628+
if (hiveColStats.size() == nonPartColNamesThatRqrStats.size()) {
626629
for (int i = 0; i < hiveColStats.size(); i++) {
627630
// the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats
628631
// are in same order
@@ -754,7 +757,7 @@ public int hashCode() {
754757
}
755758

756759
public String getPartitionListKey() {
757-
return partitionList != null ? partitionList.getKey().orElse(null) : null;
760+
return partitionList != null ? partitionList.getKey() : null;
758761
}
759762

760763
}

ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public class ParseContext {
128128
private boolean needViewColumnAuthorization;
129129

130130
private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo =
131-
new LinkedHashMap<ReduceSinkOperator, RuntimeValuesInfo>();
131+
new LinkedHashMap<>();
132132
/**
133133
* Mapping holding information about semijoins.
134134
*
@@ -451,7 +451,7 @@ public Map<String, ColumnStatsList> getColStatsCache() {
451451
* @return col stats
452452
*/
453453
public ColumnStatsList getColStatsCached(PrunedPartitionList partList) {
454-
return ctx.getOpContext().getColStatsCache().get(partList.getKey().orElse(null));
454+
return ctx.getOpContext().getColStatsCache().get(partList.getKey());
455455
}
456456

457457
/**
@@ -515,8 +515,7 @@ public Set<ReadEntity> getSemanticInputs() {
515515
return semanticInputs;
516516
}
517517

518-
public void replaceRootTask(Task<?> rootTask,
519-
List<? extends Task<?>> tasks) {
518+
public void replaceRootTask(Task<?> rootTask, List<? extends Task<?>> tasks) {
520519
this.rootTasks.remove(rootTask);
521520
this.rootTasks.addAll(tasks);
522521
}
@@ -663,7 +662,7 @@ public void setColumnStatsAutoGatherContexts(
663662

664663
public Collection<Operator> getAllOps() {
665664
List<Operator> ops = new ArrayList<>();
666-
Set<Operator> visited = new HashSet<Operator>();
665+
Set<Operator> visited = new HashSet<>();
667666
for (Operator<?> op : getTopOps().values()) {
668667
getAllOps(ops, visited, op);
669668
}

ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818

1919
package org.apache.hadoop.hive.ql.parse;
2020

21-
import java.util.ArrayList;
2221
import java.util.Collections;
2322
import java.util.List;
2423
import java.util.Objects;
25-
import java.util.Optional;
2624
import java.util.Set;
2725

2826
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -37,7 +35,7 @@ public class PrunedPartitionList {
3735
private final Table source;
3836

3937
/** Key to identify this partition list. */
40-
private final Optional<String> ppListKey;
38+
private final String ppListKey;
4139

4240
/** Partitions that either satisfy the partition criteria, or may satisfy it. */
4341
private final Set<Partition> partitions;
@@ -56,7 +54,7 @@ public PrunedPartitionList(Table source, Set<Partition> partitions,
5654
public PrunedPartitionList(Table source, String key, Set<Partition> partitions,
5755
List<String> referred, boolean hasUnknowns) {
5856
this.source = Objects.requireNonNull(source);
59-
this.ppListKey = Optional.ofNullable(key);
57+
this.ppListKey = key;
6058
this.referred = Objects.requireNonNull(referred);
6159
this.partitions = Objects.requireNonNull(partitions);
6260
this.hasUnknowns = hasUnknowns;
@@ -66,7 +64,7 @@ public Table getSourceTable() {
6664
return source;
6765
}
6866

69-
public Optional<String> getKey() {
67+
public String getKey() {
7068
return ppListKey;
7169
}
7270

@@ -82,7 +80,7 @@ public Set<Partition> getPartitions() {
8280
* @return all partitions.
8381
*/
8482
public List<Partition> getNotDeniedPartns() {
85-
return Collections.unmodifiableList(new ArrayList<>(partitions));
83+
return List.copyOf(partitions);
8684
}
8785

8886
/**

0 commit comments

Comments
 (0)