Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.comet.parquet;

import java.util.Map;

public class ParquetColumnSpec {

private final String[] path;
Expand All @@ -27,20 +29,26 @@ public class ParquetColumnSpec {
private final boolean isRepeated;
private final int maxDefinitionLevel;
private final int maxRepetitionLevel;
private String logicalTypeName;
private Map<String, String> logicalTypeParams;

public ParquetColumnSpec(
String[] path,
String physicalType,
int typeLength,
boolean isRepeated,
int maxDefinitionLevel,
int maxRepetitionLevel) {
int maxRepetitionLevel,
String logicalTypeName,
Comment thread
kazuyukitanimura marked this conversation as resolved.
Map<String, String> logicalTypeParams) {
this.path = path;
this.physicalType = physicalType;
this.typeLength = typeLength;
this.isRepeated = isRepeated;
this.maxDefinitionLevel = maxDefinitionLevel;
this.maxRepetitionLevel = maxRepetitionLevel;
this.logicalTypeName = logicalTypeName;
this.logicalTypeParams = logicalTypeParams;
}

public String[] getPath() {
Expand All @@ -66,4 +74,12 @@ public int getMaxRepetitionLevel() {
public int getMaxDefinitionLevel() {
return maxDefinitionLevel;
}

public String getLogicalTypeName() {
return logicalTypeName;
}

public Map<String, String> getLogicalTypeParams() {
return logicalTypeParams;
}
}
119 changes: 117 additions & 2 deletions common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.*;

import org.apache.comet.CometSchemaImporter;
Expand Down Expand Up @@ -290,15 +291,129 @@ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpe
}

String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
// Reconstruct the logical type from parameters
LogicalTypeAnnotation logicalType = null;
if (columnSpec.getLogicalTypeName() != null) {
logicalType =
reconstructLogicalType(
columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams());
}

PrimitiveType primitiveType;
if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
primitiveType = new PrimitiveType(repetition, primType, columnSpec.getTypeLength(), name);
primitiveType =
Types.primitive(primType, repetition)
.length(columnSpec.getTypeLength())
.as(logicalType)
.named(name);
} else {
primitiveType = new PrimitiveType(repetition, primType, name);
primitiveType = Types.primitive(primType, repetition).as(logicalType).named(name);
}

MessageType schema = new MessageType("root", primitiveType);
return schema.getColumnDescription(columnSpec.getPath());
}

private static LogicalTypeAnnotation reconstructLogicalType(
String logicalTypeName, java.util.Map<String, String> params) {

switch (logicalTypeName) {
// MAP
case "MapLogicalTypeAnnotation":
return LogicalTypeAnnotation.mapType();

// LIST
case "ListLogicalTypeAnnotation":
return LogicalTypeAnnotation.listType();

// STRING
case "StringLogicalTypeAnnotation":
return LogicalTypeAnnotation.stringType();

// MAP_KEY_VALUE
case "MapKeyValueLogicalTypeAnnotation":
return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();

// ENUM
case "EnumLogicalTypeAnnotation":
return LogicalTypeAnnotation.enumType();

// DECIMAL
case "DecimalLogicalTypeAnnotation":
int scale = Integer.parseInt(params.get("scale"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add error handling to return a helpful error if these keys do not exist, rather than fail with NPE?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense to me. I have added a check.

int precision = Integer.parseInt(params.get("precision"));
return LogicalTypeAnnotation.decimalType(scale, precision);

// DATE
case "DateLogicalTypeAnnotation":
return LogicalTypeAnnotation.dateType();

// TIME
case "TimeLogicalTypeAnnotation":
boolean isUTC = Boolean.parseBoolean(params.getOrDefault("isAdjustedToUTC", "true"));
String timeUnitStr = params.getOrDefault("unit", "MICROS");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of default values seems like it could be dangerous if the caller forgets to pass a parameter or has a typo in the key?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks


LogicalTypeAnnotation.TimeUnit timeUnit;
switch (timeUnitStr) {
case "MILLIS":
timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
break;
case "MICROS":
timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
break;
case "NANOS":
timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
break;
default:
timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
}
return LogicalTypeAnnotation.timeType(isUTC, timeUnit);

// TIMESTAMP
case "TimestampLogicalTypeAnnotation":
boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
String unitStr = params.getOrDefault("unit", "MICROS");

LogicalTypeAnnotation.TimeUnit unit;
switch (unitStr) {
case "MILLIS":
unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
break;
case "MICROS":
unit = LogicalTypeAnnotation.TimeUnit.MICROS;
break;
case "NANOS":
unit = LogicalTypeAnnotation.TimeUnit.NANOS;
break;
default:
unit = LogicalTypeAnnotation.TimeUnit.MICROS;
}
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);

// INTEGER
case "IntLogicalTypeAnnotation":
boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
int bitWidth = Integer.parseInt(params.get("bitWidth"));
return LogicalTypeAnnotation.intType(bitWidth, isSigned);

// JSON
case "JsonLogicalTypeAnnotation":
return LogicalTypeAnnotation.jsonType();

// BSON
case "BsonLogicalTypeAnnotation":
return LogicalTypeAnnotation.bsonType();

// UUID
case "UUIDLogicalTypeAnnotation":
return LogicalTypeAnnotation.uuidType();

// INTERVAL
case "IntervalLogicalTypeAnnotation":
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();

default:
throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
}
}
}
Loading