Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand Down Expand Up @@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField fieldRef) {

switch (field.getFieldID()) {
case 0: // "metadata" field (binary)
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);
return metadata.array();
case 1: // "value" field (binary)
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);
return value.array();
default:
Expand All @@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
return null;
}
Variant variant = (Variant) data;
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes());
ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);

ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
.order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);

// Return the data for our fields in the correct order: metadata, value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.mr.hive.writer;

import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -31,9 +32,13 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;

class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {

private final Map<String, String> properties;
private Record sampleRecord = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is null assignment redundant?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not final, sonar doesn't report this as well


HiveFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Expand All @@ -54,6 +59,7 @@
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema);
properties = table.properties();
}

static Builder builderFor(Table table) {
Expand All @@ -78,6 +84,11 @@
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
// Configure variant shredding function if conditions are met:
if (hasVariantColumns(dataSchema()) && isVariantShreddingEnabled(properties)) {
var shreddingFunction = Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema());
builder.variantShreddingFunc(shreddingFunction);
}
}

@Override
Expand Down Expand Up @@ -149,4 +160,30 @@
positionDeleteRowSchema);
}
}

/**
* Check if the schema contains any variant columns.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a Javadoc-style comment, but the content is not a Javadoc. Maybe it is better to remove one * at the first line to create a regular non-Javadoc comment

private static boolean hasVariantColumns(Schema schema) {
return schema.columns().stream()
.anyMatch(field -> field.type() instanceof Types.VariantType);
}

/**
* Check if variant shredding is enabled via table properties.
*/
private static boolean isVariantShreddingEnabled(Map<String, String> properties) {
String shreddingEnabled = properties.get("variant.shredding.enabled");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to use constant string for property

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added constant

return "true".equalsIgnoreCase(shreddingEnabled);
}

/**
* Set a sample record to use for data-driven variant shredding schema generation.
* Should be called before the Parquet writer is created.
*/
public void initialize(Record record) {

Check warning on line 184 in iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this variable to not match a restricted identifier.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZru_m4pJB0kz5OX_r2z&open=AZru_m4pJB0kz5OX_r2z&pullRequest=6152
if (this.sampleRecord == null) {
this.sampleRecord = record;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of this is not necessary here because there is no conflict between class members and local variables.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final Set<String> missingColumns;
private final List<Types.NestedField> missingOrStructFields;

private final HiveFileWriterFactory fileWriterFactory;

HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
OutputFileFactory dataFileFactory, Context context) {
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context));
Expand All @@ -48,17 +50,18 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
this.missingColumns = context.missingColumns();
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList();
this.fileWriterFactory = fileWriterFactory;
}

@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns);
fileWriterFactory.initialize(record);

writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId));
}


@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
Expand Down
Loading