diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 3639c5ac991..26b22506595 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -244,7 +244,7 @@ protected synchronized void setup() throws Exception { ClientSessionFactory cf = null; for (int i = 0; i < spec.getMaxSession(); i++) { - //if we are sharing the ceonnection only create 1 + //if we are sharing the connection only create 1 if (!spec.isSingleConnection()) { cf = null; } @@ -259,10 +259,9 @@ protected synchronized void setup() throws Exception { handler.setup(); handlers.add(handler); } catch (Exception e) { + logger.trace("Failed to setup session {} for activation {}", i, spec, e); if (cf != null) { - if (!spec.isSingleConnection()) { - cf.close(); - } + cf.close(); } if (session != null) { session.close(); @@ -270,6 +269,12 @@ protected synchronized void setup() throws Exception { if (firstException == null) { firstException = e; } + if (spec.isSingleConnection()) { + // The shared ClientSessionFactory is in a broken state; stop the loop + // all remaining sessions would fail with "ClientSession closed while + // creating session", masking the real error. + break; + } } } //if we have any exceptions close all the handlers and throw the first exception. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java new file mode 100644 index 00000000000..1a9b8b02254 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQMessageHandlerSingleConnectionTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.ra; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.junit.jupiter.api.Test; + +/** + * Tests for single connection mode exception handling in ActiveMQActivation. + * Related to ARTEMIS-5987: Properly handling exceptions with single connection. + */ +public class ActiveMQMessageHandlerSingleConnectionTest extends ActiveMQRATestBase { + + @Override + public boolean useSecurity() { + return false; + } + + /** + * Test that when using single connection mode and session creation fails (server is down), + * the activation setup fails immediately with the original exception and does not continue + * creating more sessions that would mask the real error with a "ClientSessionFactory is closed" + * error. + * + * Without the fix, the loop would continue after closing the shared ClientSessionFactory, + * and subsequent sessions would fail with a different exception, hiding the root cause. + */ + @Test + public void testSingleConnectionSessionCreationFailurePropagatesOriginalException() throws Exception { + server.stop(); + + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + spec.setMaxSession(5); // Multiple sessions to exercise the loop + spec.setSingleConnection(true); + spec.setSetupAttempts(1); // Only try once, no reconnection + + CountDownLatch latch = new CountDownLatch(1); + DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); + + try { + qResourceAdapter.endpointActivation(endpointFactory, spec); + fail("Expected activation to fail because server is down"); + } catch (Exception e) { + // The thrown exception must be the original connection failure, not a secondary + // "ClientSessionFactory is closed" / "IllegalStateException" that would appear + // if the loop continued past the first failure in single connection mode. + assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage()); + assertEquals(ActiveMQExceptionType.NOT_CONNECTED, ((ActiveMQException) e).getType(), + "Expected NOT_CONNECTED exception type but got: " + ((ActiveMQException) e).getType()); + } finally { + qResourceAdapter.stop(); + // Restart the server so that tearDown() can cleanly stop jmsServer + server.start(); + } + } + + /** + * Test that when using single connection mode and session creation fails mid-loop (after some + * sessions have already been set up successfully), the original exception is still propagated + * and does not get masked by a secondary "ClientSessionFactory is closed" error. + * + * This exercises the handler teardown path: the already-created handlers must be torn down + * and the loop must break immediately on the first failure. + */ + @Test + public void testSingleConnectionMidLoopFailurePropagatesOriginalException() throws Exception { + // Allow the first 2 CreateSession packets through, fail on the 3rd. + // This means sessions 0 and 1 succeed, session 2 fails mid-loop. + AtomicInteger sessionCreateCount = new AtomicInteger(0); + Interceptor interceptor = (packet, connection) -> { + if (packet instanceof CreateSessionMessage && sessionCreateCount.incrementAndGet() > 2) { + throw new ActiveMQException(ActiveMQExceptionType.NOT_CONNECTED, "injected mid-loop failure"); + } + return true; + }; + server.getRemotingService().addIncomingInterceptor(interceptor); + + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + spec.setMaxSession(5); // 2 succeed, then fail mid-loop + spec.setSingleConnection(true); + spec.setSetupAttempts(1); + + CountDownLatch latch = new CountDownLatch(1); + DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); + + try { + qResourceAdapter.endpointActivation(endpointFactory, spec); + fail("Expected activation to fail due to injected mid-loop failure"); + } catch (Exception e) { + assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage()); + // Without the fix, the loop would continue past the failure and subsequent sessions + // would fail with "ClientSessionFactory is closed", masking the original error. + assertFalse(e.getMessage().contains("closed"), "Got a masked 'factory closed' exception instead of the original error: " + e.getMessage()); + } finally { + server.getRemotingService().removeIncomingInterceptor(interceptor); + qResourceAdapter.stop(); + } + } + + /** + * Test that single connection mode works correctly under normal operation — all sessions + * share one underlying connection and messages are delivered. + */ + @Test + public void testSingleConnectionNormalOperation() throws Exception { + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + spec.setMaxSession(3); + spec.setSingleConnection(true); + + CountDownLatch latch = new CountDownLatch(3); + DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); + + qResourceAdapter.endpointActivation(endpointFactory, spec); + + try (ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession()) { + ClientProducer producer = session.createProducer(MDBQUEUEPREFIXED); + for (int i = 0; i < 3; i++) { + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString("test-message-" + i); + producer.send(message); + } + } + + assertTrue(latch.await(5, TimeUnit.SECONDS), "All 3 messages should be received within 5s"); + assertNotNull(endpoint.lastMessage, "At least one message should have been received"); + + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + qResourceAdapter.stop(); + } + +}