feat(transport-tcp): replace NIO selector with per-connection virtual-thread blocking I/O#2612
Conversation
…-thread blocking I/O Each connection runs a blocking SocketChannel.read() loop on its own virtual thread instead of a per-connection NIO Selector. On Java 21 blocking-mode reads/writes park the virtual thread and release the carrier, so the selector (which pins the carrier in select()) and the OP_WRITE + Thread.sleep(1) write busy-wait are removed. A full ring buffer applies backpressure (park-and-retry) instead of toggling OP_READ. Public surface, readLock, RingBuffer, and the AsyncTransportInstance callback contract are unchanged: the existing TcpTransportInstanceTest (31 tests) passes unmodified. Scaling probe (TcpTransportInstanceScalingTest): 200 idle connections use 2 carrier threads with the blocking model vs 201 with the selector model.
There was a problem hiding this comment.
Pull request overview
This PR redesigns the plc4j TCP transport implementation to replace the per-connection NIO Selector loop with a per-connection virtual-thread read loop using blocking SocketChannel.read() / write(), simplifying the transport while preserving the existing AsyncTransportInstance callback contract.
Changes:
- Replaced selector-driven async I/O with a blocking read loop on a per-connection virtual thread, writing into the existing
RingBufferand invoking the data listener. - Simplified write-side behavior by relying on blocking
SocketChannel.write()for natural backpressure (removingOP_WRITE+ sleep loop). - Added an
@Disabledscaling/probe test to help validate carrier-thread usage characteristics under high idle-connection counts.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
plc4j/transports/tcp/src/main/java/org/apache/plc4x/java/transport/tcp/TcpTransportInstance.java |
Replaces selector loop with blocking vthread read loop; updates close/write behavior and listener guarding. |
plc4j/transports/tcp/src/test/java/org/apache/plc4x/java/transport/tcp/TcpTransportInstanceScalingTest.java |
Adds a disabled probe test for observing carrier-thread scaling with many idle connections. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } finally { | ||
| writeLock.unlock(); | ||
| LOGGER.debug("TCP connection closed"); | ||
| getAuditLog().write(AuditLogEventType.CLOSE, "Closed"); |
There was a problem hiding this comment.
Fixed in 469e1f5. Moved the "TCP connection closed" debug line and the CLOSE audit event into the success path so they only fire when socketChannel.close() succeeds; on failure the catch reports the ERROR event and rethrows. readThread.join() stays in finally so the read loop is always awaited.
| // tiny holder so the daemon accept loop can observe a stop flag without a field on the test | ||
| private static final class Flag { volatile boolean running; } | ||
| private final Flag volatileFlag = new Flag(); |
There was a problem hiding this comment.
Fixed in 469e1f5. Reworded the comment to describe the holder accurately (it carries a volatile stop flag the daemon accept loop polls); the previous "without a field" wording was inaccurate since the holder is itself a field.
Move the "TCP connection closed" debug line and CLOSE audit event out of the finally block and into the success path of close(). Previously they ran even when socketChannel.close() threw and the method rethrew, so a failed close logged both an ERROR audit event and a misleading CLOSE "Closed" event. The readThread.join() stays in finally so the read loop is always awaited. Also correct an inaccurate comment in the scaling test (the stop-flag holder is a field, not a way to avoid one).
| while (writeBuffer.hasRemaining()) { | ||
| int written = socketChannel.write(writeBuffer); | ||
| if (written == -1) { | ||
| open = false; | ||
| open.set(false); | ||
| throw new TransportException("Connection closed while writing"); | ||
| } |
There was a problem hiding this comment.
Fixed in 0a1e480. Removed the dead written == -1 branch — the blocking write loop now just calls socketChannel.write(writeBuffer) until the buffer is drained. A broken or closed connection still surfaces as IOException/AsynchronousCloseException, both already handled below.
| long carriers = Thread.getAllStackTraces().keySet().stream() | ||
| .filter(t -> !t.isVirtual()) | ||
| .filter(t -> t.getName().contains("ForkJoinPool")) | ||
| .count(); | ||
| long total = Thread.getAllStackTraces().size(); |
There was a problem hiding this comment.
Fixed in 0a1e480. Now takes a single Thread.getAllStackTraces() snapshot and derives both carriers and total from it, so the two counts come from the same instant. (Also excluded ForkJoinPool.commonPool workers from the carrier count so unrelated parallel-stream workers cannot inflate it.)
- write(): blocking SocketChannel.write() never returns -1 (that signals read EOF), so the `written == -1` branch was dead code. A broken or closed connection already surfaces as IOException/AsynchronousCloseException, both handled below. Remove the check. - constructor: errorMsg already embeds e.getMessage(), so the second ERROR audit event duplicated the first. Emit a single event. - constructor: start the read-loop virtual thread last (after the INFO log and CONNECT audit), so an unchecked throw from logging/audit cannot leak an already-running read thread and the open SocketChannel — the catch only handles IOException and does not stop the read loop. - close(): skip readThread.join() when close() runs on the read thread itself (a disconnect/data listener calling close()), since joining yourself only stalls for the timeout and the loop already exits once open is false. - scaling test: take one Thread.getAllStackTraces() snapshot so carriers and total are counted from the same instant instead of two separate calls. - scaling test: exclude ForkJoinPool.commonPool workers from the carrier count so unrelated parallel-stream workers cannot inflate it.
Summary
Replaces the per-connection NIO
SelectorinTcpTransportInstancewith one virtual thread perconnection doing blocking
SocketChannel.read()/write(). Stays fully Netty-free. Follows theSPI3 transport layer (commit
372501287d); @chrisdutz greenlit a redesign of this transport andoffered bench testing on real devices — this is that redesign, TCP-only as a first step.
Scope is confined to
plc4j/transports/tcp(TcpTransportInstance). No public API / SPI / driverchanges.
Motivation (verified against current code)
Selector.select()does not release its carrier on Java 21. Two reasons:(a) pre-JEP-491 the selection path synchronizes on the selector monitor (a monitor pin, fixed in
JDK 24); and (b) more fundamentally,
select()'s native poll is not a carrier-unmounting /poller-managed operation, so the carrier stays blocked even after JEP 491. The scheduler
compensates up to
maxPoolSize(default 256) → we pay vthread overhead but get platform-threadbehavior plus a hidden ~256 ceiling, and this does not improve on newer JDKs.
write()registersOP_WRITE,wakeup()s, thenThread.sleep(1)in a loop while holdingwriteLock(never consumes the event).interestOps,reEnableReadIfNeeded, OP_READ toggling).On Java 21 a vthread blocked in a blocking-mode
SocketChannel.read()/write()parks andreleases its carrier (JDK parks it on the shared NIO poller) — no pin. So one-vthread-per-connection
blocking reads is both simpler and avoids the ceiling.
What changed
select()loop →runReadLoop()doing blockingread()into the existingRingBuffer.Selector,SelectionKey,interestOps,reEnableReadIfNeeded, and theOP_WRITE+Thread.sleep(1)write path. Blockingwrite()now provides natural backpressure.parkNanospark-and-retry, bounded to free space), never adisconnect (only the codec knows frame boundaries; COTP can legitimately drain cross-thread).
close()is lock-free CAS (AtomicBoolean): closing the channel is what unblocks a parkedread/write;
AsynchronousCloseExceptionwithopen==falseis treated as a normal shutdown.safeRun) so a misbehaving listener can't silently kill the read loop.Zero downstream impact (audited)
Public surface,
readLock,RingBuffer, and theAsyncTransportInstancecallback contract areunchanged.
readLockis deliberately kept becauseCotpTransportInstancecalls the read-sidemethods cross-thread during the S7/COTP handshake — it is a load-bearing guard, not removable.
MessageCodecBase+ driver codecs (read-thread)ConnectionBase.startReceiving(registerDataListener)CotpTransportInstance(cross-thread read-side, concurrent)readLock+ read-side thread-safety preservedOpcuaConnection(instanceof+getRemoteAddress)Evidence
TcpTransportInstanceTest(31 tests) passes unmodified on the new implementation → behavior-equivalent.Scaling — carrier (OS) threads for 200 idle connections (measured;
@Disabledprobe, run manually):The selector inflates to ~1 carrier per connection on both JDKs, so JEP 491 (JDK 24, removes
synchronizedpinning) does not help here — the cost isselect()being a non-unmounting blockingcall, not monitor pinning. The blocking model stays flat (bounded by CPU count, not connection
count), so the win does not erode as Java advances. (All four cells reproduced with the same probe:
selector
CARRIER_COUNT=201and blockingCARRIER_COUNT=2for 200 connections, on JDK 21 andJDK 25; no pinned-thread traces for the blocking model under
-Djdk.tracePinnedThreads=full.)End-to-end regression:
ModbusDockerIT(pymodbus container) — all cases green. Themodbus-tcp://,modbus-rtu:tcp://, andmodbus-ascii:tcp://cases (~38) exercise the newTcpTransportInstanceover a real socket; the UDP and TLS cases use the separate UDP / TLStransports and are unaffected by this change.
Scope / non-goals
different reader-thread models.
AsyncTransportInstancecallback is intentionally kept as a thin shimon the blocking core; migrating to a blocking-pull contract is intentionally out of scope.
Testing notes
TcpTransportInstanceScalingTestis an@Disabledevidence probe (opens 200 sockets + sleeps; not aCI regression test) — run manually, ideally with
-Djdk.tracePinnedThreads=full.mvn -pl :plc4j-transports-tcp -am verifyis green (tests + apache-rat + jacoco).