Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
*.iml
# gradle build
.gradle
build
build
out
1 change: 1 addition & 0 deletions api/src/main/java/com/netflix/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Enum of supported file formats.
*/
public enum FileFormat {
ORC("orc"),
PARQUET("parquet"),
AVRO("avro");

Expand Down
6 changes: 6 additions & 0 deletions api/src/main/java/com/netflix/iceberg/UpdateProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public interface UpdateProperties extends PendingUpdate<Map<String, String>> {
*/
UpdateProperties remove(String key);

/**
* Set the default file format for the table.
* @param format
* @return this
*/
UpdateProperties defaultFormat(FileFormat format);
}
17 changes: 17 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ subprojects {

ext {
avroVersion = '1.8.2'
orcVersion = '1.4.2'
parquetVersion = '1.9.1-SNAPSHOT'

jacksonVersion = '2.6.7'
Expand Down Expand Up @@ -114,6 +115,19 @@ project(':iceberg-core') {
}
}

project(':iceberg-orc') {
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')

compile "org.apache.orc:orc-core:$orcVersion:nohive"

compileOnly('org.apache.hadoop:hadoop-client:2.7.3') {
exclude group: 'org.apache.avro', module: 'avro'
}
}
}

project(':iceberg-parquet') {
dependencies {
compile project(':iceberg-api')
Expand All @@ -137,6 +151,7 @@ project(':iceberg-spark') {
compile project(':iceberg-common')
compile project(':iceberg-avro')
compile project(':iceberg-core')
compile project(':iceberg-orc')
compile project(':iceberg-parquet')

compileOnly "org.apache.avro:avro:$avroVersion"
Expand Down Expand Up @@ -174,10 +189,12 @@ project(':iceberg-runtime') {
shadow project(':iceberg-common')
shadow project(':iceberg-avro')
shadow project(':iceberg-core')
shadow project(':iceberg-orc')
shadow project(':iceberg-parquet')
shadow project(':iceberg-spark')

shadow "org.apache.avro:avro:$avroVersion"
shadow "org.apache.orc:orc-core:$orcVersion:nohive"
shadow "org.apache.parquet:parquet-avro:$parquetVersion"
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/com/netflix/iceberg/PropertiesUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public UpdateProperties remove(String key) {
return this;
}

@Override
public UpdateProperties defaultFormat(FileFormat format) {
set(TableProperties.DEFAULT_FILE_FORMAT, format.name());
return this;
}

@Override
public Map<String, String> apply() {
this.base = ops.refresh();
Expand Down
126 changes: 126 additions & 0 deletions orc/src/main/java/com/netflix/iceberg/orc/ColumnIdMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2018 Hortonworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.iceberg.orc;

import org.apache.orc.TypeDescription;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;

/**
* The mapping from ORC's TypeDescription to the Iceberg column ids.
*
* Keep the API limited to Map rather than a concrete type so that we can
* change it later.
*/
public class ColumnIdMap implements Map<TypeDescription, Integer> {

private final IdentityHashMap<TypeDescription, Integer> idMap =
new IdentityHashMap<>();

@Override
public int size() {
return idMap.size();
}

@Override
public boolean isEmpty() {
return idMap.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return idMap.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return idMap.containsValue(value);
}

@Override
public Integer get(Object key) {
return idMap.get(key);
}

@Override
public Integer put(TypeDescription key, Integer value) {
return idMap.put(key, value);
}

@Override
public Integer remove(Object key) {
return idMap.remove(key);
}

@Override
public void putAll(Map<? extends TypeDescription, ? extends Integer> map) {
idMap.putAll(map);
}

@Override
public void clear() {
idMap.clear();
}

@Override
public Set<TypeDescription> keySet() {
return idMap.keySet();
}

@Override
public Collection<Integer> values() {
return idMap.values();
}

@Override
public Set<Entry<TypeDescription, Integer>> entrySet() {
return idMap.entrySet();
}

public ByteBuffer serialize() {
StringBuilder buffer = new StringBuilder();
boolean needComma = false;
for(TypeDescription key: idMap.keySet()) {
if (needComma) {
buffer.append(',');
} else {
needComma = true;
}
buffer.append(key.getId());
buffer.append(':');
buffer.append(idMap.get(key).intValue());
}
return ByteBuffer.wrap(buffer.toString().getBytes(StandardCharsets.UTF_8));
}

public static ColumnIdMap deserialize(TypeDescription schema,
ByteBuffer serial) {
ColumnIdMap result = new ColumnIdMap();
String[] parts = StandardCharsets.UTF_8.decode(serial).toString().split(",");
for(int i = 0; i < parts.length; ++i) {
String[] subparts = parts[i].split(":");
result.put(schema.findSubtype(Integer.parseInt(subparts[0])),
Integer.parseInt(subparts[1]));
}
return result;
}
}
146 changes: 146 additions & 0 deletions orc/src/main/java/com/netflix/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2018 Hortonworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.iceberg.orc;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.hadoop.HadoopInputFile;
import com.netflix.iceberg.hadoop.HadoopOutputFile;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class ORC {
private ORC() {
}

public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}

public static class WriteBuilder {
private final OutputFile file;
private final Configuration conf;
private Schema schema = null;
private Map<String, byte[]> metadata = new HashMap<>();

private WriteBuilder(OutputFile file) {
this.file = file;
if (file instanceof HadoopOutputFile) {
conf = new Configuration(((HadoopOutputFile) file).getConf());
} else {
conf = new Configuration();
}
}

public WriteBuilder metadata(String property, String value) {
metadata.put(property, value.getBytes(StandardCharsets.UTF_8));
return this;
}

public WriteBuilder config(String property, String value) {
conf.set(property, value);
return this;
}

public WriteBuilder schema(Schema schema) {
this.schema = schema;
return this;
}

public OrcFileAppender build() {
OrcFile.WriterOptions options =
OrcFile.writerOptions(conf);
return new OrcFileAppender(schema, file, options, metadata);
}
}

public static ReadBuilder read(InputFile file) {
return new ReadBuilder(file);
}

public static class ReadBuilder {
private final InputFile file;
private final Configuration conf;
private com.netflix.iceberg.Schema schema = null;
private Long start = null;
private Long length = null;

private ReadBuilder(InputFile file) {
Preconditions.checkNotNull(file, "Input file cannot be null");
this.file = file;
if (file instanceof HadoopInputFile) {
conf = new Configuration(((HadoopInputFile) file).getConf());
} else {
conf = new Configuration();
}
}

/**
* Restricts the read to the given range: [start, start + length).
*
* @param start the start position for this read
* @param length the length of the range this read should scan
* @return this builder for method chaining
*/
public ReadBuilder split(long start, long length) {
this.start = start;
this.length = length;
return this;
}

public ReadBuilder schema(com.netflix.iceberg.Schema schema) {
this.schema = schema;
return this;
}

public ReadBuilder config(String property, String value) {
conf.set(property, value);
return this;
}

public OrcIterator build() {
Preconditions.checkNotNull(schema, "Schema is required");
try {
Path path = new Path(file.location());
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
ColumnIdMap columnIds = new ColumnIdMap();
TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds);
Reader.Options options = reader.options();
if (start != null) {
options.range(start, length);
}
options.schema(orcSchema);
return new OrcIterator(path, orcSchema, reader.rows(options));
} catch (IOException e) {
throw new RuntimeException("Can't open " + file.location(), e);
}
}
}
}
Loading