diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java index b530f24783d2..96c97bbb233a 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java @@ -82,12 +82,23 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); + int byteRead = stream.read(); + if (byteRead == -1) { + // At EOF, the underlying stream returns -1. We must propagate that + // to the caller without advancing pos / next or counting a byte that + // wasn't actually read. Same pattern as the fix for GCS/S3/ADLS in + // #16055 — a sequential reader that loops until EOF would otherwise + // spin and the byte counter would drift past the file size. See + // #16062. + return -1; + } + pos += 1; next += 1; readBytes.increment(); readOperations.increment(); - return stream.read(); + return byteRead; } @Override @@ -96,6 +107,13 @@ public int read(byte[] b, int off, int len) throws IOException { positionStream(); int bytesRead = stream.read(b, off, len); + if (bytesRead == -1) { + // Mirror the single-byte read above: don't advance pos / next or + // increment the metrics counters when EOF is reached. Without this + // guard, pos/next/readBytes would shift by -1 on every EOF call. + return -1; + } + pos += bytesRead; next += bytesRead; readBytes.increment(bytesRead); diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java index 053610983c10..edfa13748b9c 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java @@ -119,6 +119,59 @@ public void testSeek() throws Exception { } } + @Test + public void testReadSingleByteAtEof() throws Exception { + // Regression test for #16062 — OSSInputStream#read() at EOF must + // return -1 and must not advance pos / next or count a byte that was + // never read. Without the fix, OSSInputStream forwarded the -1 to + // the caller while still incrementing pos and the readBytes / + // readOperations metrics. + OSSURI uri = new OSSURI(location("eof-single.dat")); + byte[] data = randomData(8); + writeOSSData(uri, data); + + try (SeekableInputStream in = new OSSInputStream(ossClient().get(), uri)) { + // Drain to EOF byte-by-byte. + for (int i = 0; i < data.length; i++) { + assertThat(in.read()).isEqualTo(data[i] & 0xFF); + } + assertThat(in.getPos()).isEqualTo(data.length); + + assertThat(in.read()).as("read() at EOF must return -1").isEqualTo(-1); + assertThat(in.getPos()) + .as("pos must not advance past EOF on a -1 read") + .isEqualTo(data.length); + + // Idempotent: a second EOF read also returns -1, pos unchanged. + assertThat(in.read()).isEqualTo(-1); + assertThat(in.getPos()).isEqualTo(data.length); + } + } + + @Test + public void testReadBufferAtEof() throws Exception { + // Companion to testReadSingleByteAtEof: read(byte[], off, len) at EOF + // must return -1 without polluting pos. Without the fix, + // OSSInputStream added -1 to pos and to the metric counters on every + // EOF call. + OSSURI uri = new OSSURI(location("eof-buffer.dat")); + int dataSize = 8; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize + 1]; + writeOSSData(uri, expected); + + try (SeekableInputStream in = new OSSInputStream(ossClient().get(), uri)) { + int bytesRead = in.read(actual, 0, dataSize + 1); + assertThat(bytesRead).isEqualTo(dataSize); + assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected); + + assertThat(in.read(actual, 0, 10)) + .as("read(byte[], off, len) at EOF must return -1") + .isEqualTo(-1); + assertThat(in.getPos()).as("pos must not advance past EOF on a -1 read").isEqualTo(dataSize); + } + } + private byte[] randomData(int size) { byte[] data = new byte[size]; random.nextBytes(data); diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java index 6d32d3ba981d..edbd2c978e94 100644 --- a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsSeekableInputStream.java @@ -79,16 +79,33 @@ public void seek(long inputNewPos) { @Override public int read() throws IOException { checkAndUseNewPos(); + int byteRead = internalStream.read(); + if (byteRead == -1) { + // At EOF, the underlying stream returns -1. We must propagate that + // to the caller without advancing pos or counting a byte that was + // never read. Same pattern as the fix for GCS/S3/ADLS in #16055 — + // a sequential reader that loops until EOF would otherwise spin + // and the byte counter would drift past the file size. See #16062. + return -1; + } + pos += 1; readBytes.increment(); readOperations.increment(); - return internalStream.read(); + return byteRead; } @Override public int read(byte[] b, int off, int len) throws IOException { checkAndUseNewPos(); int delta = internalStream.read(b, off, len); + if (delta == -1) { + // Mirror the single-byte read above: don't advance pos or + // increment metrics when EOF is reached. Without this guard, + // pos/readBytes would shift by -1 on every EOF call. + return -1; + } + pos += delta; readBytes.increment(delta); readOperations.increment(); diff --git a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java index de97aa8d6f96..903f4d958c20 100644 --- a/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java +++ b/dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsSeekableInputStream.java @@ -90,4 +90,61 @@ public void testReadBytes() throws IOException { .isEqualTo("012"); } } + + @Test + public void testReadSingleByteAtEof() throws IOException { + // Regression test for #16062 — single-byte read() at EOF must return -1 + // and must not advance pos. Without the fix, EcsSeekableInputStream + // forwarded the underlying -1 to the caller while still incrementing + // pos (and the readBytes / readOperations metrics) by 1. + String objectName = rule.randomObjectName(); + byte[] data = "0123".getBytes(StandardCharsets.UTF_8); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, data)); + + try (EcsSeekableInputStream input = + new EcsSeekableInputStream( + rule.client(), new EcsURI(rule.bucket(), objectName), MetricsContext.nullMetrics())) { + // Drain the stream byte-by-byte so the next read sees EOF. + for (int i = 0; i < data.length; i++) { + assertThat(input.read()).isEqualTo(data[i] & 0xFF); + } + assertThat(input.getPos()).isEqualTo(data.length); + + // First EOF read returns -1 and pos stays at the file size. + assertThat(input.read()).as("read() at EOF must return -1").isEqualTo(-1); + assertThat(input.getPos()) + .as("pos must not advance past EOF on a -1 read") + .isEqualTo(data.length); + + // Repeated EOF reads stay idempotent (no drift). + assertThat(input.read()).isEqualTo(-1); + assertThat(input.getPos()).isEqualTo(data.length); + } + } + + @Test + public void testReadBufferAtEof() throws IOException { + // Companion to testReadSingleByteAtEof: read(byte[], off, len) at EOF + // must return -1 without advancing pos. Without the fix, + // EcsSeekableInputStream still added -1 to pos and to the metric + // counters on every EOF call. + String objectName = rule.randomObjectName(); + byte[] data = "0123".getBytes(StandardCharsets.UTF_8); + rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, data)); + + try (EcsSeekableInputStream input = + new EcsSeekableInputStream( + rule.client(), new EcsURI(rule.bucket(), objectName), MetricsContext.nullMetrics())) { + byte[] buffer = new byte[data.length]; + assertThat(input.read(buffer, 0, buffer.length)).isEqualTo(data.length); + assertThat(input.getPos()).isEqualTo(data.length); + + assertThat(input.read(buffer, 0, buffer.length)) + .as("read(byte[], off, len) at EOF must return -1") + .isEqualTo(-1); + assertThat(input.getPos()) + .as("pos must not advance past EOF on a -1 read") + .isEqualTo(data.length); + } + } }