|
35 | 35 | import org.apache.activemq.ActiveMQConnectionFactory; |
36 | 36 | import org.apache.activemq.broker.BrokerRegistry; |
37 | 37 | import org.apache.activemq.broker.BrokerService; |
| 38 | +import org.apache.activemq.broker.region.Destination; |
| 39 | +import org.apache.activemq.broker.region.RegionBroker; |
38 | 40 | import org.apache.activemq.command.ActiveMQQueue; |
39 | 41 | import org.apache.activemq.network.NetworkConnector; |
40 | 42 | import org.apache.activemq.store.memory.MemoryPersistenceAdapter; |
| 43 | +import org.apache.activemq.util.Wait; |
41 | 44 | import org.junit.Ignore; |
42 | 45 | import org.junit.Test; |
43 | 46 | import org.slf4j.Logger; |
@@ -336,26 +339,27 @@ public Message createMessage(final Session session) |
336 | 339 | broker.stop(); |
337 | 340 | } |
338 | 341 |
|
339 | | - // need to ensure broker bridge is alive before starting the consumer |
340 | | - // peeking at the internals will give us this info |
| 342 | + // Wait until both brokers have their local consumer AND the remote demand subscription |
| 343 | + // from the other broker's bridge (>= 2 consumers per queue). This guarantees: |
| 344 | + // 1. Both local consumers (container1, container2) are truly subscribed |
| 345 | + // 2. The network bridges are fully started and have propagated demand subscriptions |
341 | 346 | private void waitForBridgeFormation() throws Exception { |
342 | | - long done = System.currentTimeMillis() + 30000; |
343 | | - while (done > System.currentTimeMillis()) { |
344 | | - if (hasBridge("one") && hasBridge("two")) { |
345 | | - return; |
346 | | - } |
347 | | - Thread.sleep(1000); |
348 | | - } |
| 347 | + assertTrue("Both brokers should have local + bridge demand consumers for " + TESTING_QUEUE, |
| 348 | + Wait.waitFor(() -> getQueueConsumerCount("one") >= 2 && getQueueConsumerCount("two") >= 2, |
| 349 | + 30000, 100)); |
349 | 350 | } |
350 | 351 |
|
351 | | - private boolean hasBridge(String name) { |
352 | | - boolean result = false; |
353 | | - BrokerService broker = BrokerRegistry.getInstance().lookup(name); |
354 | | - if (broker != null && !broker.getNetworkConnectors().isEmpty()) { |
355 | | - if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { |
356 | | - result = true; |
357 | | - } |
| 352 | + private int getQueueConsumerCount(String brokerName) { |
| 353 | + try { |
| 354 | + final BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); |
| 355 | + if (broker == null) { |
| 356 | + return 0; |
358 | 357 | } |
359 | | - return result; |
| 358 | + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); |
| 359 | + final Destination dest = regionBroker.getDestinationMap().get(new ActiveMQQueue(TESTING_QUEUE)); |
| 360 | + return dest != null ? dest.getConsumers().size() : 0; |
| 361 | + } catch (Exception ignored) { |
| 362 | + return 0; |
| 363 | + } |
360 | 364 | } |
361 | 365 | } |
0 commit comments