Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,23 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int byteRead = stream.read();
if (byteRead == -1) {
// At EOF, the underlying stream returns -1. We must propagate that
// to the caller without advancing pos / next or counting a byte that
// wasn't actually read. Same pattern as the fix for GCS/S3/ADLS in
// #16055 — a sequential reader that loops until EOF would otherwise
// spin and the byte counter would drift past the file size. See
// #16062.
return -1;
}

pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();

return stream.read();
return byteRead;
}

@Override
Expand All @@ -96,6 +107,13 @@ public int read(byte[] b, int off, int len) throws IOException {
positionStream();

int bytesRead = stream.read(b, off, len);
if (bytesRead == -1) {
// Mirror the single-byte read above: don't advance pos / next or
// increment the metrics counters when EOF is reached. Without this
// guard, pos/next/readBytes would shift by -1 on every EOF call.
return -1;
}

pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,59 @@ public void testSeek() throws Exception {
}
}

@Test
public void testReadSingleByteAtEof() throws Exception {
// Regression test for #16062 — OSSInputStream#read() at EOF must
// return -1 and must not advance pos / next or count a byte that was
// never read. Without the fix, OSSInputStream forwarded the -1 to
// the caller while still incrementing pos and the readBytes /
// readOperations metrics.
OSSURI uri = new OSSURI(location("eof-single.dat"));
byte[] data = randomData(8);
writeOSSData(uri, data);

try (SeekableInputStream in = new OSSInputStream(ossClient().get(), uri)) {
// Drain to EOF byte-by-byte.
for (int i = 0; i < data.length; i++) {
assertThat(in.read()).isEqualTo(data[i] & 0xFF);
}
assertThat(in.getPos()).isEqualTo(data.length);

assertThat(in.read()).as("read() at EOF must return -1").isEqualTo(-1);
assertThat(in.getPos())
.as("pos must not advance past EOF on a -1 read")
.isEqualTo(data.length);

// Idempotent: a second EOF read also returns -1, pos unchanged.
assertThat(in.read()).isEqualTo(-1);
assertThat(in.getPos()).isEqualTo(data.length);
}
}

@Test
public void testReadBufferAtEof() throws Exception {
// Companion to testReadSingleByteAtEof: read(byte[], off, len) at EOF
// must return -1 without polluting pos. Without the fix,
// OSSInputStream added -1 to pos and to the metric counters on every
// EOF call.
OSSURI uri = new OSSURI(location("eof-buffer.dat"));
int dataSize = 8;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize + 1];
writeOSSData(uri, expected);

try (SeekableInputStream in = new OSSInputStream(ossClient().get(), uri)) {
int bytesRead = in.read(actual, 0, dataSize + 1);
assertThat(bytesRead).isEqualTo(dataSize);
assertThat(Arrays.copyOfRange(actual, 0, bytesRead)).isEqualTo(expected);

assertThat(in.read(actual, 0, 10))
.as("read(byte[], off, len) at EOF must return -1")
.isEqualTo(-1);
assertThat(in.getPos()).as("pos must not advance past EOF on a -1 read").isEqualTo(dataSize);
}
}

private byte[] randomData(int size) {
byte[] data = new byte[size];
random.nextBytes(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,33 @@ public void seek(long inputNewPos) {
@Override
public int read() throws IOException {
checkAndUseNewPos();
int byteRead = internalStream.read();
if (byteRead == -1) {
// At EOF, the underlying stream returns -1. We must propagate that
// to the caller without advancing pos or counting a byte that was
// never read. Same pattern as the fix for GCS/S3/ADLS in #16055 —
// a sequential reader that loops until EOF would otherwise spin
// and the byte counter would drift past the file size. See #16062.
return -1;
}

pos += 1;
readBytes.increment();
readOperations.increment();
return internalStream.read();
return byteRead;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
checkAndUseNewPos();
int delta = internalStream.read(b, off, len);
if (delta == -1) {
// Mirror the single-byte read above: don't advance pos or
// increment metrics when EOF is reached. Without this guard,
// pos/readBytes would shift by -1 on every EOF call.
return -1;
}

pos += delta;
readBytes.increment(delta);
readOperations.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,61 @@ public void testReadBytes() throws IOException {
.isEqualTo("012");
}
}

@Test
public void testReadSingleByteAtEof() throws IOException {
// Regression test for #16062 — single-byte read() at EOF must return -1
// and must not advance pos. Without the fix, EcsSeekableInputStream
// forwarded the underlying -1 to the caller while still incrementing
// pos (and the readBytes / readOperations metrics) by 1.
String objectName = rule.randomObjectName();
byte[] data = "0123".getBytes(StandardCharsets.UTF_8);
rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, data));

try (EcsSeekableInputStream input =
new EcsSeekableInputStream(
rule.client(), new EcsURI(rule.bucket(), objectName), MetricsContext.nullMetrics())) {
// Drain the stream byte-by-byte so the next read sees EOF.
for (int i = 0; i < data.length; i++) {
assertThat(input.read()).isEqualTo(data[i] & 0xFF);
}
assertThat(input.getPos()).isEqualTo(data.length);

// First EOF read returns -1 and pos stays at the file size.
assertThat(input.read()).as("read() at EOF must return -1").isEqualTo(-1);
assertThat(input.getPos())
.as("pos must not advance past EOF on a -1 read")
.isEqualTo(data.length);

// Repeated EOF reads stay idempotent (no drift).
assertThat(input.read()).isEqualTo(-1);
assertThat(input.getPos()).isEqualTo(data.length);
}
}

@Test
public void testReadBufferAtEof() throws IOException {
// Companion to testReadSingleByteAtEof: read(byte[], off, len) at EOF
// must return -1 without advancing pos. Without the fix,
// EcsSeekableInputStream still added -1 to pos and to the metric
// counters on every EOF call.
String objectName = rule.randomObjectName();
byte[] data = "0123".getBytes(StandardCharsets.UTF_8);
rule.client().putObject(new PutObjectRequest(rule.bucket(), objectName, data));

try (EcsSeekableInputStream input =
new EcsSeekableInputStream(
rule.client(), new EcsURI(rule.bucket(), objectName), MetricsContext.nullMetrics())) {
byte[] buffer = new byte[data.length];
assertThat(input.read(buffer, 0, buffer.length)).isEqualTo(data.length);
assertThat(input.getPos()).isEqualTo(data.length);

assertThat(input.read(buffer, 0, buffer.length))
.as("read(byte[], off, len) at EOF must return -1")
.isEqualTo(-1);
assertThat(input.getPos())
.as("pos must not advance past EOF on a -1 read")
.isEqualTo(data.length);
}
}
}
Loading