Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ protected synchronized void setup() throws Exception {
ClientSessionFactory cf = null;

for (int i = 0; i < spec.getMaxSession(); i++) {
//if we are sharing the ceonnection only create 1
//if we are sharing the connection only create 1
if (!spec.isSingleConnection()) {
cf = null;
}
Expand All @@ -259,17 +259,22 @@ protected synchronized void setup() throws Exception {
handler.setup();
handlers.add(handler);
} catch (Exception e) {
logger.trace("Failed to setup session {} for activation {}", i, spec, e);
if (cf != null) {
if (!spec.isSingleConnection()) {
cf.close();
}
cf.close();
}
if (session != null) {
session.close();
}
if (firstException == null) {
firstException = e;
}
if (spec.isSingleConnection()) {
// The shared ClientSessionFactory is in a broken state; stop the loop
// all remaining sessions would fail with "ClientSession closed while
// creating session", masking the real error.
break;
}
}
}
//if we have any exceptions close all the handlers and throw the first exception.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.ra;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.junit.jupiter.api.Test;

/**
* Tests for single connection mode exception handling in ActiveMQActivation.
* Related to ARTEMIS-5987: Properly handling exceptions with single connection.
*/
public class ActiveMQMessageHandlerSingleConnectionTest extends ActiveMQRATestBase {

@Override
public boolean useSecurity() {
return false;
}

/**
* Test that when using single connection mode and session creation fails (server is down),
* the activation setup fails immediately with the original exception and does not continue
* creating more sessions that would mask the real error with a "ClientSessionFactory is closed"
* error.
*
* Without the fix, the loop would continue after closing the shared ClientSessionFactory,
* and subsequent sessions would fail with a different exception, hiding the root cause.
*/
@Test
public void testSingleConnectionSessionCreationFailurePropagatesOriginalException() throws Exception {
server.stop();

ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);

ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setMaxSession(5); // Multiple sessions to exercise the loop
spec.setSingleConnection(true);
spec.setSetupAttempts(1); // Only try once, no reconnection

CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);

try {
qResourceAdapter.endpointActivation(endpointFactory, spec);
fail("Expected activation to fail because server is down");
} catch (Exception e) {
// The thrown exception must be the original connection failure, not a secondary
// "ClientSessionFactory is closed" / "IllegalStateException" that would appear
// if the loop continued past the first failure in single connection mode.
assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage());
assertEquals(ActiveMQExceptionType.NOT_CONNECTED, ((ActiveMQException) e).getType(),
"Expected NOT_CONNECTED exception type but got: " + ((ActiveMQException) e).getType());
} finally {
qResourceAdapter.stop();
// Restart the server so that tearDown() can cleanly stop jmsServer
server.start();
}
}

/**
* Test that when using single connection mode and session creation fails mid-loop (after some
* sessions have already been set up successfully), the original exception is still propagated
* and does not get masked by a secondary "ClientSessionFactory is closed" error.
*
* This exercises the handler teardown path: the already-created handlers must be torn down
* and the loop must break immediately on the first failure.
*/
@Test
public void testSingleConnectionMidLoopFailurePropagatesOriginalException() throws Exception {
// Allow the first 2 CreateSession packets through, fail on the 3rd.
// This means sessions 0 and 1 succeed, session 2 fails mid-loop.
AtomicInteger sessionCreateCount = new AtomicInteger(0);
Interceptor interceptor = (packet, connection) -> {
if (packet instanceof CreateSessionMessage && sessionCreateCount.incrementAndGet() > 2) {
throw new ActiveMQException(ActiveMQExceptionType.NOT_CONNECTED, "injected mid-loop failure");
}
return true;
};
server.getRemotingService().addIncomingInterceptor(interceptor);

ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);

ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setMaxSession(5); // 2 succeed, then fail mid-loop
spec.setSingleConnection(true);
spec.setSetupAttempts(1);

CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);

try {
qResourceAdapter.endpointActivation(endpointFactory, spec);
fail("Expected activation to fail due to injected mid-loop failure");
} catch (Exception e) {
assertTrue(e instanceof ActiveMQException, "Expected ActiveMQException but got: " + e.getClass().getName() + ": " + e.getMessage());
// Without the fix, the loop would continue past the failure and subsequent sessions
// would fail with "ClientSessionFactory is closed", masking the original error.
assertFalse(e.getMessage().contains("closed"), "Got a masked 'factory closed' exception instead of the original error: " + e.getMessage());
} finally {
server.getRemotingService().removeIncomingInterceptor(interceptor);
qResourceAdapter.stop();
}
}

/**
* Test that single connection mode works correctly under normal operation — all sessions
* share one underlying connection and messages are delivered.
*/
@Test
public void testSingleConnectionNormalOperation() throws Exception {
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);

ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setMaxSession(3);
spec.setSingleConnection(true);

CountDownLatch latch = new CountDownLatch(3);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);

qResourceAdapter.endpointActivation(endpointFactory, spec);

try (ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession()) {
ClientProducer producer = session.createProducer(MDBQUEUEPREFIXED);
for (int i = 0; i < 3; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("test-message-" + i);
producer.send(message);
}
}

assertTrue(latch.await(5, TimeUnit.SECONDS), "All 3 messages should be received within 5s");
assertNotNull(endpoint.lastMessage, "At least one message should have been received");

qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}

}