-
Notifications
You must be signed in to change notification settings - Fork 947
Netty 4.2 upgrade + IO_URING support #6021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ if [ -z "$LOGGING_ARGS" ]; then | |
| fi | ||
|
|
||
| if [ -z "$JAVA_ARGS" ]; then | ||
| JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-utility-opts}" | ||
| JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-utility-opts}" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least its the case that --enable-native-access was already in JDK17 (as the initial FFM in 17 was an incubator addition there) so hopefully that means it works on all versions of JDK17+ as opposed to just post-GA backport releases, save us doing any checking. |
||
| fi | ||
|
|
||
| # Uncomment to enable remote debugging | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,27 +15,26 @@ | |
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.activemq.artemis.core.remoting.impl.netty; | ||
| package org.apache.activemq.artemis.utils; | ||
|
|
||
| import io.netty.channel.epoll.Epoll; | ||
| import io.netty.channel.kqueue.KQueue; | ||
| import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; | ||
| import org.apache.activemq.artemis.utils.Env; | ||
| import io.netty.channel.uring.IoUring; | ||
| import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; | ||
|
|
||
| /** | ||
| * This class will check for Epoll or KQueue is available, and return false in case of NoClassDefFoundError it could be | ||
| * improved to check for other cases eventually. | ||
| * This class will check if certain dependencies are available, and return false in case of NoClassDefFoundError | ||
| */ | ||
| public class CheckDependencies { | ||
|
|
||
| public static final boolean isEpollAvailable() { | ||
| try { | ||
| return Env.isLinuxOs() && Epoll.isAvailable(); | ||
| } catch (NoClassDefFoundError noClassDefFoundError) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailabilitynoClass(); | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailabilityNoClass(); | ||
| return false; | ||
| } catch (Throwable e) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailability(e); | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailability(e); | ||
| return false; | ||
| } | ||
| } | ||
|
|
@@ -44,11 +43,24 @@ public static final boolean isKQueueAvailable() { | |
| try { | ||
| return Env.isMacOs() && KQueue.isAvailable(); | ||
| } catch (NoClassDefFoundError noClassDefFoundError) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass(); | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass(); | ||
| return false; | ||
| } catch (Throwable e) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e); | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailability(e); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| public static final boolean isIoUringAvailable() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added via netty/netty#15785 a way to verify the status of the different optimizations made by io_uring on the OS - you can use it for reporting/debugging 👍
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was about to use it, but I see now that it's so recent that it's not in a release yet. 😆 |
||
| try { | ||
| return Env.isLinuxOs() && IoUring.isAvailable(); | ||
| } catch (NoClassDefFoundError noClassDefFoundError) { | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailabilityNoClass(); | ||
| return false; | ||
| } catch (Throwable e) { | ||
| ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailability(e); | ||
| return false; | ||
| } | ||
| } | ||
|
Comment on lines
+55
to
+64
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also feels like it would make more sense in commons (I know, the previous checks are in here; same comment to them) |
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -223,6 +223,7 @@ public void shouldZeroesDirectByteBuffer() { | |
|
|
||
| @Test | ||
| public void shouldZeroesLimitedDirectByteBuffer() { | ||
| assumeTrue(PlatformDependent.hasUnsafe()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is possibly covering up a bug in either the test or more likely in the ByteUtil that was exposed by Unsafe not being available, and so this doesnt seem like the solution. I dont think this test should need Unsafe, the behaviour just seems questionable. The very next test calls the same utility method, without Unsafe. It only behaves differently as the buffer passed isnt limited. Real code could call the ByeUtil method later without Unsafe and get unexpected behaviour, it wont have the benefit of covering itself with an assume. EDIT: after typing that, I reminded myself of the time that this essentially already happened and I fixed it in the calling code: If Franz doesnt know why he did it, and it doesnt work without Unsafe anyway, I think it might be time we stopped it doing what its doing, or figure out why it did and how to adapt it to not having Unsafe.. |
||
| final byte one = (byte) 1; | ||
| final int capacity = 64; | ||
| final int bytes = 32; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,15 @@ | |
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-classes-kqueue</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-native-io_uring</artifactId> | ||
| <classifier>${netty-transport-native-io_uring-classifier}</classifier> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-classes-io_uring</artifactId> | ||
| </dependency> | ||
|
Comment on lines
+92
to
+100
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I've commented on all the other io_uring PRs, I'd personally prefer folks had to add the deps if they want them, especially given its off by default, and its entirely untested. Or at least for the client, if not both. The way its availability+use is done would need to change slightly to facilitate it to work without the classes dep though (elsewhere, I use a class per native type, and thats the only one to directly reference the types). |
||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec-http</artifactId> | ||
|
|
@@ -109,10 +118,6 @@ | |
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-handler-proxy</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec-socks</artifactId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,12 @@ | |
| */ | ||
| package org.apache.activemq.artemis.core.remoting.impl.netty; | ||
|
|
||
| import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; | ||
|
|
||
| import javax.net.ssl.SNIHostName; | ||
| import javax.net.ssl.SSLContext; | ||
| import javax.net.ssl.SSLEngine; | ||
| import javax.net.ssl.SSLParameters; | ||
| import java.io.IOException; | ||
| import java.lang.invoke.MethodHandles; | ||
| import java.net.ConnectException; | ||
| import java.net.InetAddress; | ||
| import java.net.InetSocketAddress; | ||
|
|
@@ -63,16 +62,19 @@ | |
| import io.netty.channel.ChannelPipeline; | ||
| import io.netty.channel.ChannelPromise; | ||
| import io.netty.channel.EventLoopGroup; | ||
| import io.netty.channel.MultiThreadIoEventLoopGroup; | ||
| import io.netty.channel.SimpleChannelInboundHandler; | ||
| import io.netty.channel.WriteBufferWaterMark; | ||
| import io.netty.channel.epoll.EpollEventLoopGroup; | ||
| import io.netty.channel.epoll.EpollIoHandler; | ||
| import io.netty.channel.epoll.EpollSocketChannel; | ||
| import io.netty.channel.group.ChannelGroup; | ||
| import io.netty.channel.group.DefaultChannelGroup; | ||
| import io.netty.channel.kqueue.KQueueEventLoopGroup; | ||
| import io.netty.channel.kqueue.KQueueIoHandler; | ||
| import io.netty.channel.kqueue.KQueueSocketChannel; | ||
| import io.netty.channel.nio.NioEventLoopGroup; | ||
| import io.netty.channel.nio.NioIoHandler; | ||
| import io.netty.channel.socket.nio.NioSocketChannel; | ||
| import io.netty.channel.uring.IoUringIoHandler; | ||
| import io.netty.channel.uring.IoUringSocketChannel; | ||
| import io.netty.handler.codec.base64.Base64; | ||
| import io.netty.handler.codec.http.DefaultFullHttpRequest; | ||
| import io.netty.handler.codec.http.DefaultHttpRequest; | ||
|
|
@@ -92,11 +94,11 @@ | |
| import io.netty.handler.codec.http.LastHttpContent; | ||
| import io.netty.handler.codec.http.cookie.ClientCookieDecoder; | ||
| import io.netty.handler.codec.http.cookie.Cookie; | ||
| import io.netty.handler.ssl.SslContext; | ||
| import io.netty.handler.codec.socksx.SocksVersion; | ||
| import io.netty.handler.proxy.ProxyHandler; | ||
| import io.netty.handler.proxy.Socks4ProxyHandler; | ||
| import io.netty.handler.proxy.Socks5ProxyHandler; | ||
| import io.netty.handler.ssl.SslContext; | ||
| import io.netty.handler.ssl.SslHandler; | ||
| import io.netty.resolver.NoopAddressResolverGroup; | ||
| import io.netty.util.AttributeKey; | ||
|
|
@@ -122,21 +124,23 @@ | |
| import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider; | ||
| import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig; | ||
| import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider; | ||
| import org.apache.activemq.artemis.utils.CheckDependencies; | ||
| import org.apache.activemq.artemis.utils.ConfigurationHelper; | ||
| import org.apache.activemq.artemis.utils.FutureLatch; | ||
| import org.apache.activemq.artemis.utils.IPV6Util; | ||
| import org.apache.activemq.artemis.utils.PasswordMaskingUtil; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import java.lang.invoke.MethodHandles; | ||
|
|
||
| import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; | ||
| import static org.apache.activemq.artemis.utils.Base64.encodeBytes; | ||
|
|
||
| public class NettyConnector extends AbstractConnector { | ||
|
|
||
| public static String NIO_CONNECTOR_TYPE = "NIO"; | ||
| public static String EPOLL_CONNECTOR_TYPE = "EPOLL"; | ||
| public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE"; | ||
| public static String IOURING_CONNECTOR_TYPE = "IO_URING"; | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
|
|
||
|
|
@@ -297,6 +301,8 @@ public class NettyConnector extends AbstractConnector { | |
|
|
||
| private boolean useKQueue; | ||
|
|
||
| private boolean useIoUring; | ||
|
|
||
| private int remotingThreads; | ||
|
|
||
| private boolean useGlobalWorkerPool; | ||
|
|
@@ -402,6 +408,7 @@ public NettyConnector(final Map<String, Object> configuration, | |
|
|
||
| useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); | ||
| useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); | ||
| useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); | ||
|
|
||
| useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); | ||
| host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); | ||
|
|
@@ -543,38 +550,54 @@ public synchronized void start() { | |
| return; | ||
| } | ||
|
|
||
| if (remotingThreads == -1) { | ||
| boolean defaultRemotingThreads = remotingThreads == -1; | ||
|
|
||
| if (defaultRemotingThreads) { | ||
| // Default to number of cores * 3 | ||
| remotingThreads = Runtime.getRuntime().availableProcessors() * 3; | ||
| } | ||
|
|
||
| String connectorType; | ||
|
|
||
| if (useEpoll && CheckDependencies.isEpollAvailable()) { | ||
| if (useIoUring && CheckDependencies.isIoUringAvailable()) { | ||
| //IO_URING should default to 1 remotingThread unless specified in config | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should use less than we use too with epoll - but 1 could be quite low. And check netty/netty#15524 if can help to scale up the number of event loops if required
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, setting it to 1 was a recommendation I found back from when netty io_uring was still an incubator project. I did some testing on this when I submitted the previous PR and saw that using 1 thread performed best up to something on the order of a few thousand connections and messages per second. After that it had to be increased but as I recall it, I saw no measurable improvement using more than just a few dedicated threads. Again, this was some time ago but I would guess up to ~5 threads or so on a decently sized server, say 16 cores. Perhaps it should be set to something like |
||
| remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; | ||
|
|
||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory()))); | ||
| } else { | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory()); | ||
| } | ||
|
|
||
| connectorType = IOURING_CONNECTOR_TYPE; | ||
| channelClazz = IoUringSocketChannel.class; | ||
|
|
||
| logger.debug("Connector {} using native io_uring", this); | ||
| } else if (useEpoll && CheckDependencies.isEpollAvailable()) { | ||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()))); | ||
| } else { | ||
| group = new EpollEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory()); | ||
| } | ||
| connectorType = EPOLL_CONNECTOR_TYPE; | ||
| channelClazz = EpollSocketChannel.class; | ||
| logger.debug("Connector {} using native epoll", this); | ||
| } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { | ||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()))); | ||
| } else { | ||
| group = new KQueueEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory()); | ||
| } | ||
| connectorType = KQUEUE_CONNECTOR_TYPE; | ||
| channelClazz = KQueueSocketChannel.class; | ||
| logger.debug("Connector {} using native kqueue", this); | ||
| } else { | ||
| if (useGlobalWorkerPool) { | ||
| channelClazz = NioSocketChannel.class; | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()))); | ||
| } else { | ||
| channelClazz = NioSocketChannel.class; | ||
| group = new NioEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory()); | ||
| } | ||
| connectorType = NIO_CONNECTOR_TYPE; | ||
| channelClazz = NioSocketChannel.class; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If netty is the only thing we need/decide to enable Unsafe for (dont know) on 25+, then I wonder if going with io.netty.noUnsafe sys prop might be preferable, as it at least currently seems like it wont require a second JVM startup and there would be no harm always specifying it on either older JDKs or future JDKs after its irrelevant.
Though, that would leave the warning on 25..
I still wonder about just leaving it not using Unsafe on 25. Seems like that should be possible, and that is the future beyond 25. If thats still not viable, perhaps we stick with 4.1 using Unsafe and its warnings for a bit longer until it is, and just try to ensure compatibility if an end user wants to use 4.2 (the milestone/RC builds of spring using Artemis with 4.2 suggest it mostly already is...thats the only 4.2 usage I'm personally aware of). Anyone not satisifed with no-Unsafe can always add the config themselves.