Skip to content
Open

[WIP] #6413

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public static boolean isSchemaEvolutionEnabled(Table table, Configuration conf)
}

public static boolean isFullPartitionSpec(Table table, Map<String, String> partitionSpec) {
if (table.hasNonNativePartitionSupport()) {
return true;
}
for (FieldSchema partitionCol : table.getPartCols()) {
if (partitionSpec.get(partitionCol.getName()) == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ private List<FieldSchema> getColumnsByPattern() throws HiveException {

private List<FieldSchema> getCols() throws HiveException {
Table table = context.getDb().getTable(desc.getTableName());
List<FieldSchema> allColumns = new ArrayList<>();
allColumns.addAll(table.getCols());
allColumns.addAll(table.getPartCols());
return allColumns;
return new ArrayList<>(table.getAllCols());
}

private Matcher getMatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private Deserializer getDeserializer(Table table) throws SQLException {
private void getColumnsNoColumnPath(Table table, Partition partition, List<FieldSchema> cols) throws HiveException {
cols.addAll(partition == null || table.getTableType() == TableType.VIRTUAL_VIEW ?
table.getCols() : partition.getCols());
if (!desc.isFormatted()) {
if (!desc.isFormatted() && !table.hasNonNativePartitionSupport()) {
cols.addAll(table.getPartCols());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ private void addPartitionData(DataOutputStream out, HiveConf conf, String column
List<FieldSchema> partitionColumns = null;
// TODO (HIVE-29413): Refactor to a generic getPartCols() implementation
if (table.isPartitioned()) {
partitionColumns = table.hasNonNativePartitionSupport() ?
table.getStorageHandler().getPartitionKeys(table) :
table.getPartCols();
partitionColumns = table.getPartCols();
}
if (CollectionUtils.isNotEmpty(partitionColumns) &&
conf.getBoolVar(ConfVars.HIVE_DISPLAY_PARTITION_COLUMNS_SEPARATELY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public List<String> getValues() {
values = new ArrayList<>();

// TODO (HIVE-29413): Refactor to a generic getPartCols() implementation
for (FieldSchema fs : table.hasNonNativePartitionSupport()
? table.getStorageHandler().getPartitionKeys(table)
: table.getPartCols()) {
for (FieldSchema fs : table.getPartCols()) {
String val = partSpec.get(fs.getName());
values.add(val);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ protected void initialize(Table table,
// set default if location is not set and this is a physical
// table partition (not a view partition)
if (table.getDataLocation() != null) {
Path partPath = new Path(table.getDataLocation(), Warehouse.makePartName(table.getPartCols(), tPartition.getValues()));
Path partPath = new Path(table.getDataLocation(),
Warehouse.makePartName(table.getPartCols(), tPartition.getValues()));
tPartition.getSd().setLocation(partPath.toString());
}
}
Expand Down
66 changes: 50 additions & 16 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class Table implements Serializable {
private transient StorageHandlerInfo storageHandlerInfo;
private transient MaterializedViewMetadata materializedViewMetadata;

private List<FieldSchema> cachedPartCols;

private TableSpec tableSpec;

private boolean materializedTable;
Expand Down Expand Up @@ -193,6 +196,9 @@ public Table makeCopy() {

newTab.setMetaTable(this.getMetaTable());
newTab.setSnapshotRef(this.getSnapshotRef());
if (this.cachedPartCols != null) {
newTab.cachedPartCols = new ArrayList<>(this.cachedPartCols);
}
return newTab;
}

Expand All @@ -214,6 +220,7 @@ public org.apache.hadoop.hive.metastore.api.Table getTTable() {
*/
public void setTTable(org.apache.hadoop.hive.metastore.api.Table tTable) {
this.tTable = tTable;
clearCachedPartCols();
}

/**
Expand Down Expand Up @@ -595,24 +602,43 @@ public boolean equals(Object obj) {
}

public List<FieldSchema> getPartCols() {
List<FieldSchema> partKeys = tTable.getPartitionKeys();
if (partKeys == null) {
partKeys = new ArrayList<>();
tTable.setPartitionKeys(partKeys);
if (cachedPartCols != null) {
return cachedPartCols;
}
List<FieldSchema> partKeys;
if (isTableTypeSet() && hasNonNativePartitionSupport()) {
partKeys = getStorageHandler().getPartitionKeys(this);
} else {
partKeys = tTable.getPartitionKeys();
if (partKeys == null) {
partKeys = new ArrayList<>();
tTable.setPartitionKeys(partKeys);
}
}
cachedPartCols = partKeys;
return partKeys;
}

private void clearCachedPartCols() {
cachedPartCols = null;
}

private boolean isTableTypeSet() {
if (tTable.getParameters() == null) {
return false;
}
String tableType = tTable.getParameters().get(HiveMetaHook.TABLE_TYPE);
return tableType != null;
}

public FieldSchema getPartColByName(String colName) {
return getPartCols().stream()
.filter(key -> key.getName().toLowerCase().equals(colName))
.findFirst().orElse(null);
return hasNonNativePartitionSupport() ? null : getPartCols().stream()
.filter(key -> key.getName().toLowerCase().equals(colName))
.findFirst().orElse(null);
}

public List<String> getPartColNames() {
List<FieldSchema> partCols = hasNonNativePartitionSupport() ?
getStorageHandler().getPartitionKeys(this) : getPartCols();
return partCols.stream().map(FieldSchema::getName)
return getPartCols().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -761,14 +787,22 @@ private List<FieldSchema> getColsInternal(boolean forMs) {
* @return List&lt;FieldSchema&gt;
*/
public List<FieldSchema> getAllCols() {
ArrayList<FieldSchema> f_list = new ArrayList<FieldSchema>();
f_list.addAll(getCols());
f_list.addAll(getPartCols());
return f_list;
List<FieldSchema> allCols = new ArrayList<>(getCols());
Set<String> colNames = new HashSet<>();
for (FieldSchema col : allCols) {
colNames.add(col.getName());
}
for (FieldSchema col : getPartCols()) {
if (!colNames.contains(col.getName())) {
allCols.add(col);
}
}
return allCols;
}

public void setPartCols(List<FieldSchema> partCols) {
tTable.setPartitionKeys(partCols);
clearCachedPartCols();
}

public String getCatName() {
Expand Down Expand Up @@ -812,7 +846,7 @@ public void setOutputFormatClass(String name) throws HiveException {
}

public boolean isPartitioned() {
return hasNonNativePartitionSupport() ? getStorageHandler().isPartitioned(this) :
return hasNonNativePartitionSupport() ? getStorageHandler().isPartitioned(this) :
CollectionUtils.isNotEmpty(getPartCols());
}

Expand Down Expand Up @@ -1153,7 +1187,7 @@ public static void validateColumns(List<FieldSchema> columns, List<FieldSchema>
}
colNames.add(colName);
}
if (partCols != null) {
if (partCols != null && !icebergTable) {
// there is no overlap between columns and partitioning columns
for (FieldSchema partCol: partCols) {
String colName = normalize(partCol.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,7 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
for (FieldNode col : cols) {
int index = originalOutputColumnNames.indexOf(col.getFieldName());
Table tab = cppCtx.getParseContext().getViewProjectToTableSchema().get(op);
List<FieldSchema> fullFieldList = new ArrayList<FieldSchema>(tab.getCols());
fullFieldList.addAll(tab.getPartCols());
List<FieldSchema> fullFieldList = new ArrayList<>(tab.getAllCols());
cppCtx.getParseContext().getColumnAccessInfo()
.add(tab.getCompleteName(), fullFieldList.get(index).getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private void analyzeAcidExport(ASTNode ast, Table exportTable, ASTNode tokRefOrN
//now generate insert statement
//insert into newTableName select * from ts <where partition spec>
StringBuilder rewrittenQueryStr = generateExportQuery(
newTable.getPartCols(), tokRefOrNameExportTable, (ASTNode) tokRefOrNameExportTable.parent, newTableName);
newTable.getPartCols(),
tokRefOrNameExportTable, (ASTNode) tokRefOrNameExportTable.parent, newTableName);
ReparseResult rr = ParseUtils.parseRewrittenQuery(ctx, rewrittenQueryStr);
Context rewrittenCtx = rr.rewrittenCtx;
rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3048,6 +3048,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc
final NotNullConstraint nnc = tabMetaData.getNotNullConstraint();
final PrimaryKeyInfo pkc = tabMetaData.getPrimaryKeyInfo();

Set<String> alreadyAdded = new HashSet<>();
for (StructField structField : fields) {
colName = structField.getFieldName();
colInfo = new ColumnInfo(
Expand All @@ -3056,6 +3057,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc
isNullable(colName, nnc, pkc), tableAlias, false);
colInfo.setSkewedCol(isSkewedCol(tableAlias, qb, colName));
rr.put(tableAlias, colName, colInfo);
alreadyAdded.add(colName);
cInfoLst.add(colInfo);
}
// TODO: Fix this
Expand All @@ -3065,6 +3067,9 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc
// 3.2 Add column info corresponding to partition columns
for (FieldSchema part_col : tabMetaData.getPartCols()) {
colName = part_col.getName();
if (tabMetaData.hasNonNativePartitionSupport()) {
break;
}
colInfo = new ColumnInfo(colName,
TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()),
isNullable(colName, nnc, pkc), tableAlias, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf,
this.isInsertInto = isInsertInto;
this.origCtx = ctx;
columns = tbl.getCols();
partitionColumns = tbl.getPartCols();
// current behaviour intact until we have getCols() giving only non-partition columns for non native tables as well
partitionColumns = tbl.hasNonNativePartitionSupport() ? new ArrayList<>() : tbl.getPartCols();
}

public List<LoadFileDesc> getLoadFileWork() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ private static CharSequence genPartitionClause(Table tbl, List<TransformSpec> pa


private static String getColTypeOf(Table tbl, String partKey) {
for (FieldSchema fs : tbl.hasNonNativePartitionSupport() ?
tbl.getStorageHandler().getPartitionKeys(tbl) : tbl.getPartitionKeys()) {
for (FieldSchema fs : tbl.getPartCols()) {
if (partKey.equalsIgnoreCase(fs.getName())) {
return fs.getType().toLowerCase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.QueryState;
Expand Down Expand Up @@ -89,7 +88,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -511,7 +512,7 @@ private void reparseAndSuperAnalyze(Table table, URI fromURI) throws SemanticExc

// Partition spec was already validated by caller when create TableSpec object.
// So, need not validate inpPartSpec here.
List<FieldSchema> parts = table.getPartCols();
List<FieldSchema> parts = table.hasNonNativePartitionSupport() ? Collections.emptyList() : table.getPartCols();
if (tableTree.getChildCount() >= 2) {
ASTNode partSpecNode = (ASTNode) tableTree.getChild(1);
inpPartSpec = new HashMap<>(partSpecNode.getChildCount());
Expand Down Expand Up @@ -561,7 +562,7 @@ private void reparseAndSuperAnalyze(Table table, URI fromURI) throws SemanticExc
}

rewrittenQueryStr.append(getFullTableNameForSQL((ASTNode)(tableTree.getChild(0))));
addPartitionColsToInsert(table.getPartCols(), inpPartSpec, rewrittenQueryStr);
addPartitionColsToInsert(parts, inpPartSpec, rewrittenQueryStr);
rewrittenQueryStr.append(" select * from ");
rewrittenQueryStr.append(tempTblName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.rewrite.MergeStatement;
import org.apache.hadoop.hive.ql.parse.rewrite.RewriterFactory;
import org.apache.hadoop.hive.ql.plan.HiveOperation;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -230,7 +229,7 @@ private MergeStatement.UpdateClause handleUpdate(ASTNode whenMatchedUpdateClause
String deleteExtraPredicate) throws SemanticException {
assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
Map<String, String> newValuesMap = new HashMap<>(targetTable.getCols().size() + targetTable.getPartCols().size());
Map<String, String> newValuesMap = new HashMap<>(targetTable.getAllCols().size());
ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
//columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
//before re-parsing, i.e. they are known to SemanticAnalyzer logic
Expand Down
3 changes: 1 addition & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,7 @@ public static Map<Integer, List<ExprNodeGenericFuncDesc>> getFullPartitionSpecs(
CommonTree ast, Table table, Configuration conf, boolean canGroupExprs) throws SemanticException {
String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULT_PARTITION_NAME);
Map<String, String> colTypes = new HashMap<>();
List<FieldSchema> partitionKeys = table.hasNonNativePartitionSupport() ?
table.getStorageHandler().getPartitionKeys(table) : table.getPartitionKeys();
List<FieldSchema> partitionKeys = table.getPartCols();
for (FieldSchema fs : partitionKeys) {
colTypes.put(fs.getName().toLowerCase(), fs.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void checkValidSetClauseTarget(ASTNode colName, Table targetTable) thr

// Make sure this isn't one of the partitioning columns, that's not supported.
for (FieldSchema fschema : targetTable.getPartCols()) {
if (fschema.getName().equalsIgnoreCase(columnName)) {
if (fschema.getName().equalsIgnoreCase(columnName) && !targetTable.hasNonNativePartitionSupport()) {
throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12001,6 +12001,8 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException {
// Determine row schema for TSOP.
// Include column names from SerDe, the partition and virtual columns.
rwsch = new RowResolver();
Set<String> partCols = tab.hasNonNativePartitionSupport() ?
Sets.newHashSet(tab.getPartColNames()) : Collections.emptySet();
try {
// Including parameters passed in the query
if (properties != null) {
Expand All @@ -12018,8 +12020,6 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException {
deserializer.handleJobLevelConfiguration(conf);
List<? extends StructField> fields = rowObjectInspector
.getAllStructFieldRefs();
Set<String> partCols = tab.hasNonNativePartitionSupport() ?
Sets.newHashSet(tab.getPartColNames()) : Collections.emptySet();
for (int i = 0; i < fields.size(); i++) {
/**
* if the column is a skewed column, use ColumnInfo accordingly
Expand All @@ -12039,6 +12039,9 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException {
// Hack!! - refactor once the metadata APIs with types are ready
// Finally add the partitioning columns
for (FieldSchema part_col : tab.getPartCols()) {
if(partCols.contains(part_col.getName())){
break;
}
LOG.trace("Adding partition col: " + part_col);
rwsch.put(alias, part_col.getName(), new ColumnInfo(part_col.getName(),
TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), alias, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void appendWhenMatchedUpdateClause(MergeStatement.UpdateClause updateClau
sqlGenerator.append(hintStr);
hintStr = null;
}
List<String> values = new ArrayList<>(targetTable.getCols().size() + targetTable.getPartCols().size());
List<String> values = new ArrayList<>(targetTable.getAllCols().size());
values.addAll(sqlGenerator.getDeleteValues(Context.Operation.MERGE));
addValues(targetTable, targetAlias, updateClause.getNewValuesMap(), values);
addValuesForRowLineageForCopyOnMerge(isRowLineageSupported, values,
Expand Down
Loading
Loading