diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java index 376b391d9c24..0e938322d30e 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -154,6 +154,39 @@ public static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFil return new BitmapPositionDeleteIndex(bitmap, deleteFile); } + /** + * Extracts the raw Roaring bitmap bytes from the envelope, validating magic and CRC. Returns a + * zero-copy {@link ByteSlice} pointing into the original byte array. + * + *

Envelope format: [4B length BE][4B magic LE][roaring LE][4B CRC BE] + * + *

Returns a slice over just the "roaring" portion. + */ + static ByteSlice extractRoaringBitmap(byte[] bytes, DeleteFile deleteFile) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + int bitmapDataLength = readBitmapDataLength(buffer, deleteFile); + + // validate CRC + int crc = computeChecksum(bytes, bitmapDataLength); + int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength; + int expectedCrc = buffer.getInt(crcOffset); + Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC"); + + // validate magic + ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength); + int magicNumber = bitmapData.getInt(); + Preconditions.checkArgument( + magicNumber == MAGIC_NUMBER, + "Invalid magic number: %s, expected %s", + magicNumber, + MAGIC_NUMBER); + + // the Roaring bytes start right after the magic, and run to end of bitmap data + int roaringOffset = LENGTH_SIZE_BYTES + MAGIC_NUMBER_SIZE_BYTES; + int roaringLength = bitmapDataLength - MAGIC_NUMBER_SIZE_BYTES; + return new ByteSlice(bytes, roaringOffset, roaringLength); + } + // computes and validates the length of the bitmap data (magic bytes + bitmap) private static int computeBitmapDataLength(RoaringPositionBitmap bitmap) { long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes(); diff --git a/core/src/main/java/org/apache/iceberg/deletes/ByteSlice.java b/core/src/main/java/org/apache/iceberg/deletes/ByteSlice.java new file mode 100644 index 000000000000..45a5f80e9503 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/ByteSlice.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +/** A zero-copy view over a region of a byte array. */ +public class ByteSlice { + private final byte[] data; + private final int offset; + private final int length; + + public ByteSlice(byte[] data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + } + + public byte[] data() { + return data; + } + + public int offset() { + return offset; + } + + public int length() { + return length; + } + + /** Returns a copy of just the slice. */ + public byte[] toByteArray() { + byte[] copy = new byte[length]; + System.arraycopy(data, offset, copy, 0, length); + return copy; + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java index 6f97b3a6ac87..cea13b2def58 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -113,6 +113,19 @@ static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) { return BitmapPositionDeleteIndex.deserialize(bytes, deleteFile); } + /** + * Extracts the raw portable Roaring bitmap bytes from an Iceberg deletion vector envelope, + * validating the magic number and CRC. Returns a zero-copy {@link ByteSlice} pointing into the + * original byte array. + * + * @param bytes the full DV blob (length + magic + bitmap + CRC) + * @param deleteFile the DV file for validation + * @return a slice over the raw Roaring bitmap bytes (little-endian, portable format) + */ + static ByteSlice extractRoaringBitmap(byte[] bytes, DeleteFile deleteFile) { + return BitmapPositionDeleteIndex.extractRoaringBitmap(bytes, deleteFile); + } + /** Returns an empty immutable position delete index. */ static PositionDeleteIndex empty() { return EmptyPositionDeleteIndex.get(); diff --git a/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java index 2809750970a7..95c8f84b0ea5 100644 --- a/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java +++ b/core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.ByteSlice; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mapping.NameMapping; @@ -119,6 +120,19 @@ default ReadBuilder setAll(Map properties) { /** Sets a mapping from external schema names to Iceberg type IDs. */ ReadBuilder withNameMapping(NameMapping nameMapping); + /** + * Pushes position deletes into the reader so that deleted rows are excluded during scanning. The + * bitmap is a portable Roaring bitmap (little-endian) where each set bit represents a deleted row + * position. Formats that support this can skip deleted rows at the scan level rather than + * filtering them after the fact. Formats that do not support this can safely ignore the bitmap. + * + * @param bitmap a slice over portable Roaring bitmap bytes representing deleted row positions + * @return this for method chaining + */ + default ReadBuilder positionDeleteBitmap(ByteSlice bitmap) { + return this; + } + /** Builds the reader. */ CloseableIterable build(); } diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 02b06b70e483..b2b64f8a922b 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -30,6 +32,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.ByteSlice; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.deletes.PositionDeleteIndexUtil; @@ -50,6 +53,7 @@ import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +172,54 @@ public PositionDeleteIndex loadPositionDeletes( } } + /** + * Loads position deletes and returns the deleted positions as a portable Roaring bitmap + * (little-endian, per the Roaring + * format spec). For deletion vectors, the raw bitmap bytes are extracted directly from the + * file without deserializing, returned as a zero-copy {@link ByteSlice}. For position delete + * files, positions are loaded and a new bitmap is built. + * + * @param deleteFiles position delete files or a deletion vector + * @param filePath the data file path for which to load deletes + * @return a slice over portable Roaring bitmap bytes, or null if there are no deletes + */ + public ByteSlice loadPositionDeleteBitmap( + Iterable deleteFiles, CharSequence filePath) { + if (ContentFileUtil.containsSingleDV(deleteFiles)) { + DeleteFile dv = Iterables.getOnlyElement(deleteFiles); + validateDV(dv, filePath); + return readDVBitmap(dv); + } + + PositionDeleteIndex index = getOrReadPosDeletes(deleteFiles, filePath); + if (index == null || index.isEmpty()) { + return null; + } + + RoaringBitmap bitmap = new RoaringBitmap(); + index.forEach(pos -> bitmap.add((int) pos)); + bitmap.runOptimize(); + ByteBuffer buf = ByteBuffer.allocate(bitmap.serializedSizeInBytes()); + buf.order(ByteOrder.LITTLE_ENDIAN); + bitmap.serialize(buf); + byte[] bytes = buf.array(); + return new ByteSlice(bytes, 0, bytes.length); + } + + private ByteSlice readDVBitmap(DeleteFile dv) { + LOG.trace("Reading DV bitmap bytes without deserializing {}", dv.location()); + InputFile inputFile = loadInputFile.apply(dv); + long offset = dv.contentOffset(); + int length = dv.contentSizeInBytes().intValue(); + byte[] bytes = new byte[length]; + try { + IOUtil.readFully(inputFile, offset, bytes, 0, length); + return PositionDeleteIndex.extractRoaringBitmap(bytes, dv); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private PositionDeleteIndex readDV(DeleteFile dv) { LOG.trace("Opening DV file {}", dv.location()); InputFile inputFile = loadInputFile.apply(dv); diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 69236cee8f9b..a2ff65100667 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -24,11 +24,13 @@ import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; +import java.util.Optional; import org.apache.iceberg.Accessor; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.ByteSlice; import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; @@ -65,6 +67,7 @@ public abstract class DeleteFilter { private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; private Predicate eqDeleteRows = null; + private boolean posDeletesPushedDown = false; protected DeleteFilter( String filePath, @@ -258,8 +261,31 @@ public PositionDeleteIndex deletedRowPositions() { return deleteRowPositions; } - private CloseableIterable applyPosDeletes(CloseableIterable records) { + /** + * Returns the position deletes as a portable Roaring bitmap {@link ByteSlice} for pushdown into a + * format-native scanner. For deletion vectors this is zero-copy from the on-disk bytes. When this + * returns a non-empty value, position deletes are marked as handled and {@link + * #filter(CloseableIterable)} will not re-apply them. + */ + public Optional bitmapBytes() { if (posDeletes.isEmpty()) { + return Optional.empty(); + } + + DeleteLoader loader = deleteLoader(); + if (loader instanceof BaseDeleteLoader) { + ByteSlice slice = ((BaseDeleteLoader) loader).loadPositionDeleteBitmap(posDeletes, filePath); + if (slice != null) { + this.posDeletesPushedDown = true; + return Optional.of(slice); + } + } + + return Optional.empty(); + } + + private CloseableIterable applyPosDeletes(CloseableIterable records) { + if (posDeletes.isEmpty() || posDeletesPushedDown) { return records; } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java index efbdcb70aa4a..d8d1fa45772a 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java @@ -73,6 +73,8 @@ public static void register() { (icebergSchema, fileSchema, engineSchema) -> GenericVortexWriter.buildWriter(icebergSchema), (VortexFormatModel.ReaderFunction) GenericVortexReader::buildReader)); + + FormatModelRegistry.register(VortexFormatModel.forPositionDeletes()); } private GenericFormatModels() {} diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index f18f5785105f..090d5bf8e759 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -20,10 +20,12 @@ import java.io.Serializable; import java.util.Map; +import java.util.Optional; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.TableScan; +import org.apache.iceberg.deletes.ByteSlice; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -66,7 +68,12 @@ public CloseableIterable open(FileScanTask task) { DeleteFilter deletes = new GenericDeleteFilter(io, task, tableSchema, projection); Schema readSchema = deletes.requiredSchema(); - CloseableIterable records = openFile(task, readSchema); + // Try to extract position deletes as bitmap bytes for pushdown into the format reader. + // If the format supports it, position deletes are handled at scan level and the filter + // will skip re-applying them. + Optional bitmapBytes = deletes.bitmapBytes(); + + CloseableIterable records = openFile(task, readSchema, bitmapBytes.orElse(null)); records = deletes.filter(records); records = applyResidual(records, readSchema, task.residual()); @@ -84,7 +91,8 @@ private CloseableIterable applyResidual( return records; } - private CloseableIterable openFile(FileScanTask task, Schema fileProjection) { + private CloseableIterable openFile( + FileScanTask task, Schema fileProjection, ByteSlice posDeleteBitmap) { InputFile input = io.newInputFile(task.file()); Map partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant); @@ -95,6 +103,10 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject builder = builder.reuseContainers(); } + if (posDeleteBitmap != null) { + builder = builder.positionDeleteBitmap(posDeleteBitmap); + } + return builder .project(fileProjection) .idToConstant(partition) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/PositionDeleteVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/PositionDeleteVortexWriter.java new file mode 100644 index 000000000000..994214d76b3c --- /dev/null +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/PositionDeleteVortexWriter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.vortex; + +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.vortex.VortexValueWriter; + +/** + * Writes {@link PositionDelete} objects to Arrow vectors for Vortex position delete file output. + * + *

The output schema is [file_path: string, pos: long]. + */ +public class PositionDeleteVortexWriter implements VortexValueWriter> { + @Override + public void write(PositionDelete datum, VectorSchemaRoot root, int rowIndex) { + VarCharVector pathVector = (VarCharVector) root.getVector(0); + byte[] pathBytes = datum.path().toString().getBytes(StandardCharsets.UTF_8); + pathVector.setSafe(rowIndex, pathBytes); + + BigIntVector posVector = (BigIntVector) root.getVector(1); + posVector.setSafe(rowIndex, datum.pos()); + } + + @Override + public Stream> metrics() { + return Stream.empty(); + } +} diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java index 3bfccd3a01ce..16f366820a10 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java @@ -31,12 +31,15 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.data.vortex.PositionDeleteVortexWriter; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.formats.BaseFormatModel; import org.apache.iceberg.formats.ModelWriteBuilder; import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -73,6 +76,12 @@ public static VortexFormatModel> create( false); } + public static VortexFormatModel, Void, VortexRowReader> + forPositionDeletes() { + return new VortexFormatModel<>( + PositionDelete.deleteClass(), Void.class, null, null, false); + } + public static VortexFormatModel> create( Class type, Class schemaType, @@ -195,17 +204,43 @@ public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { @Override public FileAppender build() throws IOException { - Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(content, "Content type is required"); return switch (content) { - case DATA, EQUALITY_DELETES -> buildAppender(schema); - case POSITION_DELETES -> - throw new UnsupportedOperationException( - "Position deletes are not yet supported for Vortex format"); + case DATA, EQUALITY_DELETES -> { + Preconditions.checkNotNull(schema, "Schema is required"); + yield buildAppender(schema); + } + case POSITION_DELETES -> buildPosDeleteAppender(); }; } + @SuppressWarnings("unchecked") + private FileAppender buildPosDeleteAppender() throws IOException { + org.apache.iceberg.Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema(); + DType dtype = VortexSchemas.toDType(posDeleteSchema); + Schema arrowSchema = VortexSchemas.toArrowSchema(posDeleteSchema); + + VortexValueWriter valueWriter = + (VortexValueWriter) new PositionDeleteVortexWriter<>(); + + OutputFile rawOutputFile = outputFile.encryptingOutputFile(); + String uri = VortexFileUtil.resolveUri(rawOutputFile.location()); + Map properties = + Maps.newHashMap(VortexFileUtil.resolveOutputProperties(rawOutputFile)); + properties.putAll(metadata); + + VortexWriter vortexWriter = VortexWriter.create(uri, dtype, properties); + return new VortexFileAppender<>( + vortexWriter, + valueWriter, + arrowSchema, + VortexFileAppender.DEFAULT_BATCH_SIZE, + rawOutputFile, + posDeleteSchema, + metricsConfig); + } + @SuppressWarnings("unchecked") private FileAppender buildAppender(org.apache.iceberg.Schema writeSchema) throws IOException { @@ -242,6 +277,7 @@ private static class ReadBuilderWrapper implements ReadBuilder { private Map idToConstant; private Optional filterPredicate = Optional.empty(); private long[] rowRange; + private org.apache.iceberg.deletes.ByteSlice posDeleteBitmap; private ReadBuilderWrapper( InputFile inputFile, @@ -312,6 +348,12 @@ public ReadBuilder withNameMapping(NameMapping nameMapping) { return this; } + @Override + public ReadBuilder positionDeleteBitmap(org.apache.iceberg.deletes.ByteSlice bitmap) { + this.posDeleteBitmap = bitmap; + return this; + } + @Override @SuppressWarnings("unchecked") public CloseableIterable build() { @@ -340,7 +382,13 @@ public CloseableIterable build() { } return new VortexIterable<>( - inputFile, readSchema, filterPredicate, rowRange, readerFunc, batchReaderFunc); + inputFile, + readSchema, + filterPredicate, + rowRange, + posDeleteBitmap, + readerFunc, + batchReaderFunc); } } } diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java index 2e2d2fa76c36..e8cf6ac8cfc3 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java @@ -30,14 +30,16 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.ByteSlice; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ public class VortexIterable extends CloseableGroup implements CloseableIterab private final InputFile inputFile; private final Optional filterPredicate; private final long[] rowRange; + private final ByteSlice posDeleteBitmap; private final Function> rowReaderFunc; private final Function> batchReaderFunction; private final List projection; @@ -57,13 +60,20 @@ public class VortexIterable extends CloseableGroup implements CloseableIterab Schema icebergSchema, Optional filterPredicate, long[] rowRange, + ByteSlice posDeleteBitmap, Function> readerFunction, Function> batchReaderFunction) { this.inputFile = inputFile; - // We have the file schema, we need to assign Iceberg IDs to the entire file schema - this.projection = Lists.transform(icebergSchema.columns(), Types.NestedField::name); + // Strip metadata columns (e.g. _pos, _deleted) from the projection sent to the Vortex scan, + // since these are computed columns that don't exist in the physical file. + this.projection = + icebergSchema.columns().stream() + .filter(field -> !MetadataColumns.isMetadataColumn(field.fieldId())) + .map(Types.NestedField::name) + .collect(Collectors.toList()); this.filterPredicate = filterPredicate; this.rowRange = rowRange; + this.posDeleteBitmap = posDeleteBitmap; this.rowReaderFunc = readerFunction; this.batchReaderFunction = batchReaderFunction; } @@ -82,13 +92,18 @@ public CloseableIterator iterator() { Optional optRange = Optional.ofNullable(this.rowRange); - ArrayIterator batchStream = - vortexFile.newScan( - ScanOptions.builder() - .addAllColumns(projection) - .predicate(scanPredicate) - .rowRange(optRange) - .build()); + ScanOptions.Builder scanBuilder = + ScanOptions.builder() + .addAllColumns(projection) + .predicate(scanPredicate) + .rowRange(optRange); + + if (posDeleteBitmap != null) { + scanBuilder.deletePositions( + posDeleteBitmap.data(), posDeleteBitmap.offset(), posDeleteBitmap.length()); + } + + ArrayIterator batchStream = vortexFile.newScan(scanBuilder.build()); Preconditions.checkNotNull(batchStream, "batchStream"); DType dtype = batchStream.getDataType();