Skip to content

Commit 1f3db07

Browse files
authored
feat(#1739): Add requestTimeout to ConnectionFactory to prevent hanging threads in syncSendPacket() (#1740)
* Add requestTimeout to ConnectionFactory to prevent hanging threads in syncSendPacket() When the broker becomes unreachable without the TCP connection being properly closed (network partition, half-open connection), FutureResponse.getResult() calls ArrayBlockingQueue.take() which blocks indefinitely, causing threads to hang forever. Introduce a configurable requestTimeout (default 0, no timeout) on ActiveMQConnectionFactory and ActiveMQConnection, similar to the existing sendTimeout. When set, syncSendPacket(Command) uses poll(timeout) instead of take(), throwing RequestTimedOutIOException when the timeout expires. Can be configured programmatically via factory.setRequestTimeout(ms) or via URL parameter jms.requestTimeout=ms. * Add consumer creation in without timeout test * Fix flaky testSyncSendPacketFailFromTimeout by using BrokerPlugin to delay responses Use a BrokerFilter that delays addConsumer for the test queue by 5s, making the 500ms requestTimeout fire deterministically instead of relying on a 1ms race condition.
1 parent 0fe85b3 commit 1f3db07

4 files changed

Lines changed: 228 additions & 2 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
164164
private boolean watchTopicAdvisories = true;
165165
private long warnAboutUnstartedConnectionTimeout = 500L;
166166
private int sendTimeout =0;
167+
private int requestTimeout =0;
167168
private boolean sendAcksAsync=true;
168169
private boolean checkForDuplicates = true;
169170
private boolean queueOnlyConnection = false;
@@ -1526,7 +1527,7 @@ public Response syncSendPacket(Command command, int timeout) throws JMSException
15261527
* @throws JMSException
15271528
*/
15281529
public Response syncSendPacket(Command command) throws JMSException {
1529-
return syncSendPacket(command, 0);
1530+
return syncSendPacket(command, requestTimeout);
15301531
}
15311532

15321533
/**
@@ -1855,6 +1856,20 @@ public void setSendTimeout(int sendTimeout) {
18551856
this.sendTimeout = sendTimeout;
18561857
}
18571858

1859+
/**
1860+
* @return the requestTimeout (in milliseconds)
1861+
*/
1862+
public int getRequestTimeout() {
1863+
return requestTimeout;
1864+
}
1865+
1866+
/**
1867+
* @param requestTimeout the requestTimeout to set (in milliseconds)
1868+
*/
1869+
public void setRequestTimeout(int requestTimeout) {
1870+
this.requestTimeout = requestTimeout;
1871+
}
1872+
18581873
/**
18591874
* @return the sendAcksAsync
18601875
*/

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
147147
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
148148
private long warnAboutUnstartedConnectionTimeout = 500L;
149149
private int sendTimeout = 0;
150+
private int requestTimeout = 0;
150151
private int connectResponseTimeout = 0;
151152
private boolean sendAcksAsync=true;
152153
private TransportListener transportListener;
@@ -444,6 +445,7 @@ protected void configureConnection(ActiveMQConnection connection) throws JMSExce
444445
connection.setProducerWindowSize(getProducerWindowSize());
445446
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
446447
connection.setSendTimeout(getSendTimeout());
448+
connection.setRequestTimeout(getRequestTimeout());
447449
connection.setCloseTimeout(getCloseTimeout());
448450
connection.setSendAcksAsync(isSendAcksAsync());
449451
connection.setAuditDepth(getAuditDepth());
@@ -748,6 +750,20 @@ public void setSendTimeout(int sendTimeout) {
748750
this.sendTimeout = sendTimeout;
749751
}
750752

753+
/**
754+
* @return the requestTimeout (in milliseconds)
755+
*/
756+
public int getRequestTimeout() {
757+
return requestTimeout;
758+
}
759+
760+
/**
761+
* @param requestTimeout the requestTimeout to set (in milliseconds)
762+
*/
763+
public void setRequestTimeout(int requestTimeout) {
764+
this.requestTimeout = requestTimeout;
765+
}
766+
751767
/**
752768
* @return the sendAcksAsync
753769
*/
@@ -874,6 +890,7 @@ public void populateProperties(Properties props) {
874890
props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
875891
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
876892
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
893+
props.setProperty("requestTimeout", Integer.toString(getRequestTimeout()));
877894
props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout()));
878895
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
879896
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));

activemq-client/src/main/java/org/apache/activemq/TransactionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public void rollback() throws JMSException {
281281
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
282282
this.transactionId = null;
283283
//make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
284-
this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
284+
this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : this.connection.getRequestTimeout());
285285
// Notify the listener that the tx was rolled back
286286
if (localTransactionEventListener != null) {
287287
localTransactionEventListener.rollbackEvent();
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.fail;
22+
23+
import jakarta.jms.JMSException;
24+
import jakarta.jms.MessageConsumer;
25+
import jakarta.jms.Session;
26+
27+
import org.apache.activemq.broker.Broker;
28+
import org.apache.activemq.broker.BrokerFilter;
29+
import org.apache.activemq.broker.BrokerPlugin;
30+
import org.apache.activemq.broker.BrokerService;
31+
import org.apache.activemq.broker.ConnectionContext;
32+
import org.apache.activemq.broker.TransportConnector;
33+
import org.apache.activemq.broker.region.Subscription;
34+
import org.apache.activemq.command.ActiveMQQueue;
35+
import org.apache.activemq.command.ConsumerInfo;
36+
import org.apache.activemq.transport.RequestTimedOutIOException;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
41+
/**
42+
* Tests that {@link ActiveMQConnection#syncSendPacket(org.apache.activemq.command.Command)}
43+
* applies the configured {@code requestTimeout}.
44+
*/
45+
public class SyncSendPacketTimeoutTest {
46+
47+
private BrokerService broker;
48+
private String brokerUrl;
49+
50+
@Before
51+
public void setUp() throws Exception {
52+
broker = new BrokerService();
53+
broker.setPersistent(false);
54+
broker.setUseJmx(false);
55+
broker.addConnector("tcp://localhost:0");
56+
broker.start();
57+
broker.waitUntilStarted();
58+
brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString();
59+
}
60+
61+
@After
62+
public void tearDown() throws Exception {
63+
if (broker != null) {
64+
broker.stop();
65+
broker.waitUntilStopped();
66+
}
67+
}
68+
69+
@Test
70+
public void testRequestTimeoutDefaultIsZero() throws Exception {
71+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
72+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
73+
assertEquals("Default requestTimeout should be 0", 0, connection.getRequestTimeout());
74+
}
75+
}
76+
77+
@Test
78+
public void testRequestTimeoutConfiguredViaFactory() throws Exception {
79+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
80+
factory.setRequestTimeout(5000);
81+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
82+
assertEquals("requestTimeout should be propagated from factory", 5000, connection.getRequestTimeout());
83+
}
84+
}
85+
86+
@Test
87+
public void testSyncSendPacketSucceedsWithRequestTimeout() throws Exception {
88+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
89+
factory.setRequestTimeout(5000);
90+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
91+
connection.start();
92+
// Creating a session triggers syncSendPacket internally — should succeed within
93+
// timeout
94+
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
95+
// Creating a consumer triggers syncSendPacket internally — should succeed
96+
// within timeout
97+
MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) {
98+
assertNotNull("Session should be created successfully", session);
99+
assertNotNull("Consumer should be created successfully", consumer);
100+
}
101+
}
102+
}
103+
104+
@Test
105+
public void testSyncSendPacketSucceedsWithoutRequestTimeout() throws Exception {
106+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
107+
// requestTimeout=0 means no timeout (default)
108+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
109+
connection.start();
110+
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
111+
// Creating a consumer triggers syncSendPacket internally — should succeed
112+
// no timeout
113+
MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) {
114+
assertNotNull("Session should be created successfully with no timeout", session);
115+
assertNotNull("Consumer should be created successfully", consumer);
116+
}
117+
}
118+
}
119+
120+
@Test
121+
public void testRequestTimeoutConfiguredViaUrl() throws Exception {
122+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl + "?jms.requestTimeout=3000");
123+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
124+
assertEquals("requestTimeout should be set via URL parameter", 3000, connection.getRequestTimeout());
125+
}
126+
}
127+
128+
@Test
129+
public void testSyncSendPacketFailFromTimeout() throws Exception {
130+
// Restart the broker with a plugin that delays addConsumer responses
131+
broker.stop();
132+
broker.waitUntilStopped();
133+
134+
broker = new BrokerService();
135+
broker.setPersistent(false);
136+
broker.setUseJmx(false);
137+
broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
138+
@Override
139+
public Broker installPlugin(Broker broker) {
140+
return new BrokerFilter(broker) {
141+
@Override
142+
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
143+
// Only delay consumers on our test queue, not advisory consumers
144+
if (info.getDestination().getPhysicalName().equals("test")) {
145+
Thread.sleep(5000);
146+
}
147+
return super.addConsumer(context, info);
148+
}
149+
};
150+
}
151+
}});
152+
broker.addConnector("tcp://localhost:0");
153+
broker.start();
154+
broker.waitUntilStarted();
155+
brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString();
156+
157+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
158+
factory.setWatchTopicAdvisories(false);
159+
factory.setRequestTimeout(500);
160+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
161+
connection.start();
162+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
163+
164+
Exception exception = null;
165+
try {
166+
session.createConsumer(session.createQueue("test"));
167+
fail("Expected JMSException due to request timeout");
168+
} catch (JMSException expected) {
169+
exception = expected;
170+
}
171+
assertNotNull("Should have caught a JMSException", exception);
172+
assertEquals(RequestTimedOutIOException.class,
173+
TransportConnector.getRootCause(exception).getClass());
174+
}
175+
}
176+
177+
@Test
178+
public void testSyncSendPacketOverrideDefaultRequestTimeout() throws Exception {
179+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
180+
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
181+
connection.start();
182+
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
183+
// After session creation set the timeout default to be very short to test that
184+
// overriding directly works
185+
connection.setRequestTimeout(1);
186+
ConsumerInfo info = new ConsumerInfo(session.getSessionInfo(),
187+
session.getNextConsumerId().getValue());
188+
info.setDestination(new ActiveMQQueue("test"));
189+
// Send info packet with timeout override
190+
assertNotNull("Consumer should be created successfully with no timeout",
191+
connection.syncSendPacket(info, 5000));
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)