Skip to content

Commit 70e309a

Browse files
committed
test(unit-tests): improve reliability of message consumption in tests
1 parent 50119e0 commit 70e309a

3 files changed

Lines changed: 18 additions & 13 deletions

File tree

activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public void stopBroker() throws Exception {
120120
}
121121
if (broker != null) {
122122
broker.stop();
123+
broker.waitUntilStopped();
123124
}
124125
}
125126

@@ -257,6 +258,7 @@ protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exce
257258
configureBroker(newBroker);
258259
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
259260
newBroker.start();
261+
newBroker.waitUntilStarted();
260262
return newBroker;
261263
}
262264

activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,7 @@ public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
275275

276276
dumpMessages();
277277

278-
Wait.waitFor(new Wait.Condition() {
279-
@Override
280-
public boolean isSatisified() throws Exception {
281-
return proxy.getInFlightCount() == 0l;
282-
}
283-
});
278+
Wait.waitFor(() -> proxy.getInFlightCount() == 0L && proxy.cursorSize() == 0);
284279
assertEquals("prefetch", 0, proxy.getInFlightCount());
285280
assertEquals("size", 10, proxy.getQueueSize());
286281
assertEquals("cursor size", 0, proxy.cursorSize());

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,15 +199,23 @@ public void testPriorityMessagesWithJmsBrowser() throws Exception {
199199
assertNotNull(message);
200200
assertEquals(5, message.getJMSPriority());
201201

202-
// consume messages
203-
final ArrayList<Message> consumeList = consumeMessages("TestQ");
202+
// Wait for remaining messages to be fully available after consumeOneMessage closes its connection.
203+
// With lazyDispatch=true + optimizedDispatch=true, messages may briefly be in-flight
204+
// during connection teardown and not yet re-queued for dispatch to a new consumer.
205+
final int remaining = numToSend - 1;
206+
assertTrue("Remaining messages available for dispatch", Wait.waitFor(() -> {
207+
final Queue q = (Queue) broker.getDestination(destination);
208+
return q != null
209+
&& q.getDestinationStatistics().getMessages().getCount() == remaining
210+
&& q.getDestinationStatistics().getInflight().getCount() == 0;
211+
}, 5000, 100));
212+
213+
// consume messages (use timeout-based overload for reliable dispatch on slow CI)
214+
final ArrayList<Message> consumeList = consumeMessages("TestQ", remaining, TimeUnit.SECONDS.toMillis(30));
204215
LOG.info("Consumed list {}", consumeList.size());
205216

206-
// compare lists
207-
// assertEquals("Iteration: " + i
208-
// +", message 1 should be priority high", 5,
209-
// consumeList.get(0).getJMSPriority());
210-
for (int j = 1; j < (numToSend - 1); j++) {
217+
assertEquals("Iteration: " + i + ", all remaining messages consumed", remaining, consumeList.size());
218+
for (int j = 0; j < consumeList.size(); j++) {
211219
assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority());
212220
}
213221
}

0 commit comments

Comments
 (0)