Skip to content

Add linked buffers for S3OutputStream#28488

Merged
pettyjamesm merged 1 commit into
trinodb:masterfrom
tbaeg:feat/s3-linked-buffer
Apr 1, 2026
Merged

Add linked buffers for S3OutputStream#28488
pettyjamesm merged 1 commit into
trinodb:masterfrom
tbaeg:feat/s3-linked-buffer

Conversation

@tbaeg
Copy link
Copy Markdown
Member

@tbaeg tbaeg commented Mar 1, 2026

Description

Additional context and related issues

Quick tests from M1 Max Macbook Pro with:

  1. Local minio.
  2. Iceberg connector + JDBC catalog.
  3. Queries are from a cold start and run in succession.

With linked buffer

trino> INSERT INTO iceberg.tpch_test.orders_test
    -> SELECT * FROM tpch.sf1000.orders LIMIT 100000000;
INSERT: 100000000 rows

Query 20260327_132501_00002_99kw5, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
39.39 [100M rows, 22.5GiB] [2.55M rows/s, 586MiB/s]

Query 20260327_132542_00003_99kw5, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
36.54 [102M rows, 22.5GiB] [2.79M rows/s, 631MiB/s]

Query 20260327_132620_00004_99kw5, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
36.46 [103M rows, 22.6GiB] [2.82M rows/s, 635MiB/s]

Query 20260327_132700_00005_99kw5, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
35.69 [102M rows, 22.6GiB] [2.87M rows/s, 647MiB/s]

Without linked buffer (release 480)

trino> INSERT INTO iceberg.tpch_test.orders_test
    -> SELECT * FROM tpch.sf1000.orders LIMIT 100000000;
INSERT: 100000000 rows

Query 20260327_133103_00000_gin83, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
44.08 [101M rows, 22.5GiB] [2.29M rows/s, 523MiB/s]

Query 20260327_133209_00001_gin83, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
40.41 [102M rows, 22.5GiB] [2.53M rows/s, 571MiB/s]

Query 20260327_133309_00002_gin83, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
38.23 [101M rows, 22.5GiB] [2.65M rows/s, 603MiB/s]

Query 20260327_133350_00003_gin83, FINISHED, 1 node
Splits: 86 total, 86 done (100.00%)
37.61 [101M rows, 22.5GiB] [2.68M rows/s, 613MiB/s]

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Delta Lake connector
* Reduce memory fragmentation and improve memory tracking for S3 file write
  operations. ({issue}`28488`)  

## Hive connector
* Reduce memory fragmentation and improve memory tracking for S3 file write
  operations. ({issue}`28488`)  

## Hudi connector
* Reduce memory fragmentation and improve memory tracking for S3 file write
  operations. ({issue}`28488`)  

## Iceberg connector
* Reduce memory fragmentation and improve memory tracking for S3 file write
  operations. ({issue}`28488`)  

## Lakehouse connector
* Reduce memory fragmentation and improve memory tracking for S3 file write
  operations. ({issue}`28488`)  

@cla-bot cla-bot Bot added the cla-signed label Mar 1, 2026
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 2 times, most recently from 86d3d38 to c5fd132 Compare March 1, 2026 05:01
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 5 times, most recently from b84e338 to ab20c7f Compare March 1, 2026 19:09
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 3 times, most recently from 19d7b18 to 64d5dad Compare March 2, 2026 15:39
@wendigo wendigo force-pushed the feat/s3-linked-buffer branch from 64d5dad to 5bcedd7 Compare March 2, 2026 21:49
@wendigo
Copy link
Copy Markdown
Contributor

wendigo commented Mar 2, 2026

CI hit: #22455

@wendigo
Copy link
Copy Markdown
Contributor

wendigo commented Mar 2, 2026

I've reorganized code a little and fixed a small bug that was causing invalid writes. Now tests are passing :)

@tbaeg
Copy link
Copy Markdown
Member Author

tbaeg commented Mar 2, 2026

I've reorganized code a little and fixed a small bug that was causing invalid writes. Now tests are passing :)

Thank you for the updates! They all
make sense and and looks cleaner.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses an issue with humongous heap allocations in S3OutputStream when writing to S3 with large part sizes. Instead of allocating a single large byte[] for the entire part buffer (defaulting to 32MB, which causes G1GC humongous allocation problems), it introduces a LinkedBuffer that chains smaller byte[] arrays (starting at 8KB, doubling to a max of 512KB) to hold the buffered data.

Changes:

  • S3OutputStream.java: Replaces the monolithic byte[] buffer with a new inner class LinkedBuffer that chains smaller arrays. Uploads now use InputStream-based RequestBody instead of ByteBuffer-based.
  • S3OutputFile.java: Updates putObject callers to pass ByteArrayInputStream instead of byte[] + offset.
  • TestLinkedBuffer.java: Adds unit tests for the new LinkedBuffer class.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.

File Description
S3OutputStream.java Core change: introduces LinkedBuffer inner class; updates write/flush/upload logic to use InputStream-based RequestBody
S3OutputFile.java Updates callers of putObject to pass ByteArrayInputStream to match the new signature
TestLinkedBuffer.java New unit tests covering LinkedBuffer basic operations, expansion, ceiling, and reset

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch from 5a19d16 to 659be03 Compare March 4, 2026 13:32
@tbaeg
Copy link
Copy Markdown
Member Author

tbaeg commented Mar 4, 2026

@wendigo Any thoughts on these updates?

@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 2 times, most recently from 058b3aa to 79ebc04 Compare March 6, 2026 20:05
@tbaeg tbaeg requested review from chenjian2664 and wendigo March 6, 2026 21:33
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch from 0a257a2 to 8c8da23 Compare March 30, 2026 15:51
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 4 times, most recently from 29c77a0 to be0e8e7 Compare March 30, 2026 17:03
@pettyjamesm
Copy link
Copy Markdown
Member

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 30, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java (2)

211-250: ⚠️ Potential issue | 🟠 Major

Keep the flushed buffer reserved until the async upload completes.

After Line 211 snapshots buffer into dataStreamProvider and Line 250 hands it to uploadPage(...), that flushed LinkedBuffer is still strongly reachable through inProgressUploadFuture. Line 239 immediately drops the reservation to the new active buffer (or 0 on the final part), so slow uploads can under-report retained heap by roughly one whole part per stream.

💡 Suggested direction
     private Future<CompletedPart> inProgressUploadFuture;
+    private long inFlightUploadBytes;
-            memoryContext.setBytes(buffer == null ? 0 : buffer.allocated());
+            updateMemoryUsage();
             multipartUploadStarted = true;
+            inFlightUploadBytes = dataStreamProvider.allocated();
+            updateMemoryUsage();
             inProgressUploadFuture = supplyAsync(() -> uploadPage(dataStreamProvider), uploadExecutor);
private void updateMemoryUsage()
{
    memoryContext.setBytes((buffer == null ? 0 : buffer.allocated()) + inFlightUploadBytes);
}

Use the same helper for the write-path updates on Lines 125, 139, and 143, and clear inFlightUploadBytes after waitForPreviousUploadFinish() returns. memoryContext.close() also needs to stay after the final wait, otherwise the last part is dropped from accounting before close() returns.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java`
around lines 211 - 250, The current write path snapshots buffer into
dataStreamProvider and then immediately updates memoryContext based only on the
new active buffer, dropping the reservation for the flushed buffer while its
async upload (inProgressUploadFuture -> uploadPage) is still running; to fix,
add an inFlightUploadBytes counter and an updateMemoryUsage() helper that sets
memoryContext.setBytes((buffer == null ? 0 : buffer.allocated()) +
inFlightUploadBytes), increment inFlightUploadBytes by the flushed
buffer.allocated() before launching supplyAsync(uploadPage(...)),
clear/decrement inFlightUploadBytes after waitForPreviousUploadFinish()/when the
previous upload completes, and use updateMemoryUsage() at all write-path updates
(where buffer is changed) and ensure memoryContext.close() remains after the
final wait so the last part stays accounted for.

130-147: ⚠️ Potential issue | 🟡 Minor

Add precondition validation to preserve the OutputStream.write(byte[], off, len) contract.

The current implementation silently accepts invalid inputs: write(bytes, 0, -1) becomes a no-op, and write(null, 0, 0) returns without throwing NullPointerException. The JDK contract requires throwing NullPointerException if the array is null and IndexOutOfBoundsException if the offset or length are invalid. Add validation before the multipart-splitting loop.

Suggested fix
+import static java.util.Objects.checkFromIndexSize;
     public void write(byte[] bytes, int offset, int length)
             throws IOException
     {
         ensureOpen();
+        requireNonNull(bytes, "bytes is null");
+        checkFromIndexSize(offset, length, bytes.length);
+        if (length == 0) {
+            return;
+        }
         // make sure we don't exceed the part size
         while (length > 0) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java`
around lines 130 - 147, In S3OutputStream.write(byte[] bytes, int offset, int
length) add precondition validation before the multipart-splitting loop: throw
NullPointerException if bytes is null; throw IndexOutOfBoundsException if offset
< 0, length < 0, or offset + length > bytes.length (or if offset >
bytes.length); do these checks at the top of the method (before using buffer,
partSize, memoryContext or calling flushBuffer) so the method conforms to the
OutputStream.write(byte[], off, len) contract while keeping the existing
multipart logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java`:
- Around line 211-250: The current write path snapshots buffer into
dataStreamProvider and then immediately updates memoryContext based only on the
new active buffer, dropping the reservation for the flushed buffer while its
async upload (inProgressUploadFuture -> uploadPage) is still running; to fix,
add an inFlightUploadBytes counter and an updateMemoryUsage() helper that sets
memoryContext.setBytes((buffer == null ? 0 : buffer.allocated()) +
inFlightUploadBytes), increment inFlightUploadBytes by the flushed
buffer.allocated() before launching supplyAsync(uploadPage(...)),
clear/decrement inFlightUploadBytes after waitForPreviousUploadFinish()/when the
previous upload completes, and use updateMemoryUsage() at all write-path updates
(where buffer is changed) and ensure memoryContext.close() remains after the
final wait so the last part stays accounted for.
- Around line 130-147: In S3OutputStream.write(byte[] bytes, int offset, int
length) add precondition validation before the multipart-splitting loop: throw
NullPointerException if bytes is null; throw IndexOutOfBoundsException if offset
< 0, length < 0, or offset + length > bytes.length (or if offset >
bytes.length); do these checks at the top of the method (before using buffer,
partSize, memoryContext or calling flushBuffer) so the method conforms to the
OutputStream.write(byte[], off, len) contract while keeping the existing
multipart logic unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9e8b285b-1aee-4d12-83ba-7a3494f2b28d

📥 Commits

Reviewing files that changed from the base of the PR and between 2519f1a and be0e8e7.

📒 Files selected for processing (3)
  • lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java
  • lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java
  • lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestLinkedBuffer.java

@pettyjamesm
Copy link
Copy Markdown
Member

@tbaeg - expand the above AI review comments, they look relevant. One for handling negative indexes (sure, why not) but the other about memory tracking for the background upload looks right (and also hasn't been handled in the past, but we might as well fix that while we're here)

@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 3 times, most recently from ad4ebfb to 62b9e9d Compare March 31, 2026 14:46
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
0,
bufferSize);
dataStreamProvider);
buffer = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing memoryContext update on this path. The memory tracking code is now non-trivial, so maybe worth making it a separate method

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I am missing something, calls to write will already track all the memory that is being used since this is not a multipart upload. Any update to the memoryContext would be redundant.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory consumed by buffer would have been previously tracked, but at this moment when we release the buffer our memory consumption should go to 0 after the putObject call completes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add, but felt like an additional call setting it to 0 was moot since immediately after we close the memoryContext. Thoughts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a call to updateMemory() (even though we expect a subsequent call to close()) just so that all of the memory update logic is in one place and that the accounting is updated any time that inFlightBytes or buffer changes.

Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch 3 times, most recently from a35533f to 361c639 Compare March 31, 2026 17:36
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
0,
bufferSize);
dataStreamProvider);
buffer = null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a call to updateMemory() (even though we expect a subsequent call to close()) just so that all of the memory update logic is in one place and that the accounting is updated any time that inFlightBytes or buffer changes.

@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch from 361c639 to c5364c7 Compare April 1, 2026 00:23
@pettyjamesm
Copy link
Copy Markdown
Member

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 1, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java (1)

174-188: ⚠️ Potential issue | 🟠 Major

Close memoryContext on the failed-close path.

When failed is true, close() returns after abortUpload() and skips memoryContext.close(). This can leak retained memory accounting on error paths.

🛠️ Proposed fix
         if (failed) {
             try {
                 abortUpload();
-                return;
             }
             catch (SdkException e) {
                 throw new IOException(e);
             }
+            finally {
+                memoryContext.close();
+            }
+            return;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java`
around lines 174 - 188, The close() method currently returns early when the
failed flag is true and abortUpload() succeeds, which skips
memoryContext.close() and leaks memory accounting; modify the failed-close path
so memoryContext.close() is always invoked (e.g., call memoryContext.close()
after abortUpload() before returning or move memoryContext.close() into a
finally block that runs for both success and failure), and if abortUpload()
throws SdkException rethrow as IOException as before but still ensure
memoryContext.close() is executed; refer to the close() method, the failed
boolean, abortUpload(), and memoryContext.close() when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java`:
- Around line 174-188: The close() method currently returns early when the
failed flag is true and abortUpload() succeeds, which skips
memoryContext.close() and leaks memory accounting; modify the failed-close path
so memoryContext.close() is always invoked (e.g., call memoryContext.close()
after abortUpload() before returning or move memoryContext.close() into a
finally block that runs for both success and failure), and if abortUpload()
throws SdkException rethrow as IOException as before but still ensure
memoryContext.close() is executed; refer to the close() method, the failed
boolean, abortUpload(), and memoryContext.close() when making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 25e7cb2f-46fa-481e-9e26-de29fe537af4

📥 Commits

Reviewing files that changed from the base of the PR and between be0e8e7 and c5364c7.

📒 Files selected for processing (3)
  • lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java
  • lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java
  • lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestLinkedBuffer.java
✅ Files skipped from review due to trivial changes (1)
  • lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestLinkedBuffer.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java

Comment thread lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java Outdated
@tbaeg tbaeg force-pushed the feat/s3-linked-buffer branch from c5364c7 to f00515e Compare April 1, 2026 14:00
@pettyjamesm pettyjamesm merged commit aef5fa3 into trinodb:master Apr 1, 2026
70 checks passed
@pettyjamesm
Copy link
Copy Markdown
Member

Merged, thanks for the contribution @tbaeg!

@github-actions github-actions Bot added this to the 481 milestone Apr 1, 2026
@ebyhr ebyhr mentioned this pull request Apr 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Humongous allocations and low heap saturation when writing to S3

7 participants