This project is a Proof Of Concept of RabbitMQ, includes guidelines about installation/configuration and some basic examples or common design patterns in an Event Driver Architecture.
All the tests in this project assumes that RabbitMQ is installed and running in the same machine. For guidelines about the installation:
Installing in MAC using HomeBrew
and more links for other OS's can be found here
After the installation, the RabbitMQ scripts will be stored in the /usr/local/sbin folder, so adding that folder to the PATH system variabel might be a good idea.
Now we can enable the Management Plugin following the guidelines explained here: Management Plugin
To enable the Management Console (before running the server), run this command:
rabbitmq-plugins enable rabbitmq_management
After enabling the Management Console we can run the server now:
rabbitmq-server
. Now we can access the web console with a web browser:
- url: http://localhost:15672
- user: guest
- password: guest
According to the Official documentation, To stop the server run this command:
rabbitmq-server stop
but it sometimes triggers the error "ERROR: node with name 'rabbit' already running on 'localhost'". If that's the case, try running this command instead:
rabbitmqctl stop
- main classes: src/main/java/com/ilozanof/learning/rabbitMQ/helloWorld
- test classes: src/test/java/com/ilozanof/learning/rabbitMQ/helloWorld
This example is based on the example described in Java Tutorial - Hello World.
instead of just using main blocks of code, here we are putting the logic in separate classes, Sender.java and Receiver.java, and jUnit tests.
The "Sender" sends a text message to the QUEUE, and the "Receiver" consumes it.
- main classes: src/main/java/com/ilozanof/learning/rabbitMQ/workQueues
- test classes: src/test/java/com/ilozanof/learning/rabbitMQ/workQueues
This example is based on the example from Java Tutorial - Work Queues , but with many changes.
-
We are reusing the same Sender implemented for the previous example.
-
A new *Worker" has been developed, which is an evolved version of the Receiver class from the preivous example. In this case, the Worker can be parametrized to:
- perform auto acknowledge or not
- keep track of the numbner opf tasks done
- simulate an error throwing an Exception, based on a value in the Constructor.
The Test in `src/test/main/com/ilozanof/learning/rabbitMQ/workQueues' checks different scenearios, like:
- round-robin task assignment performed by RabbgitMQ
- Reassigned of tasks when some of the consumers fails.
- main classes: src/main/java/com/ilozanof/learning/rabbitMQ/publishSubscribe
- test classes: src/test/java/com/ilozanof/learning/rabbitMQ/publishSubscribe
This example is based on the example from Java Tutorial - Publish/Subscribe , but with some changes:
- The consumers are now declared in independent classes, as opposed to using anonymous classes.
- There is one sender, LogSender, and 2 different consumer (although they do the same thing), LogConsumer1 and LogConsumer2. The LogReceiver class (is that a good name?) takes care of creating one specifif queue for each consumer, this queue is a disposable one, so the name is generated by the server and each Logconsumer is "assigned" to each of these queues.
Both Topic and Headers examples represent the same scenario:
- We have a Signal broadcast system, where a Sender can send signals to different systems (like applications), and each system can "live" i a different environment (like "test", "dev", or "prod").
- We have one queue for each environment, and the name of the queue is the name of the enviroment, so we have the queues test, dev and prod.
- For each queue, we have a single Consumer getting signal out of it. So we have 3 Consumers in total.
The Topic and Headers examples further down test how to send signal to those systems, and how those signal are redirected to the right queue. The difference is in HOW to redirect the signals, by using the routingKey and patterns (Topics example), or by using Headers (Headers example).
Due to the fact that the functionality in both examples is similar, they both share some common classes/interfaces, defined in the common package.
- main classes: src/main/java/com/ilozanof/learning/rabbitMQ/topic
- test classes: src/test/java/com/ilozanof/learning/rabbitMQ/topic
- main classes: src/main/java/com/ilozanof/learning/rabbitMQ/headers
- test classes: src/test/java/com/ilozanof/learning/rabbitMQ/headers
So far,the examples provided are Java standalone-classes, making use of the rabbitMQ Java API.
Now, we are going to develop a SPring applicationn, where we'll develop the same examples as before, but making use of the abstraction provided by Spring.
In this example, we are publicshing and consuming messages using a Topic. The connection to the RabbitMQ server is declared in the main class a Configuration class can be also used, maybe even better in that case)
In this case, the Receiver is implemented in a complete separate class, which is not RabbitMQ-aware at all, it only provides the business logic to process the message. This class si binded to the queue un the main class (or in a Configuration class, as we mentioned).
The Consumer is implemented by a CommandRunner, which is a Spring interface that is executed right after the main class is executed, so we can put some initilization code there. In this case we do it here, instead of sending the messages to the Queue in the test. This way, we only have to run the Tests, and the messages will be automatically sent after the main application starts.
This example is similar to the previous one, with some differences:
- We are using more than one Queue, and one topic exchange that forward the messages to them accordingly.
- The Consumer/Receiver is implemented ina a seaprate class, same as in the previous example, but the class is not binded during the initilization process in the main class, instead we use some specific spring annotations in that class (@RabbitListener), so during strting this calss is automatically registered as the consumer for the Queue declared in the same annotation.
### Scaling Consumer dinamically
This example shows how we can change the number of Threads used by the MessageListenerContainer and how the performance is affected.
NOTE: Check out the tutorial on this page: Queue Consumer Scaling with AMQP. This example is based on tht wor, but with some small changes to it.
In my first attempt, I tried using the default implementation SimpleMessageListenerContainer, changin the number of threads by invoking the method setConcurrentConsumers, but I came accross some problems: As it happends, the ListenerContainer defined with a @Bean method in the @springApplication is managed by the Spring Lifecycle, which means that the Container is started/stopped automaticaly by Spring when needed. This is great, since we only have to delare the bean, and starting and stopping operation are automatically managed. But on the other hand, in this case calling the setConcurrentConsumers won´t make anny effect most of the time, because that method only orjs if the lsitener is ACtive at that time, and i fond out that most of the times it's not.
So in order to be able to raise or low the number of threads, we need to take control over when the listenerContainer is started or stopped. and in order to do that, we need to take that component out of the Spring lifecyle. To do that, we need to:
- We define a @Bean method in the @SpringApplication that returns NOT a class instance, but a List of Scalablecontainers. So each one of the containers in that list (even if there is only one) are not managed by Spring.
- As a consequence of the point above, we need to Start and Stop the _ScalableContiners: in our app. We start them in a @EventListener method, and we stop them in a @PreDestroy method. NOTE: If we use a @PostConstruct method to start the ScalableContainers it won´t Work, since that point in time is still to early, the rest of components (queues, exchanges, etc) are not created yet.
This chapter includes some notes about some aspects of RabbitMQ that might be interesting to know and remember:
- The order of Senders/Workers is important: If you send the meesages to the queue first, and then register the consumer, only the first consumer is used. But if you register first the consumers and then you send messages to the Queue (which seems to be the right order to follow), then the loaded is balanced between the consumers properly.