Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 86 additions & 11 deletions lib/internal/Magento/Framework/Amqp/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
use Closure;
use Exception;
use Magento\Framework\MessageQueue\ConnectionLostException;
use Magento\Framework\MessageQueue\EnvelopeFactory;
use Magento\Framework\MessageQueue\EnvelopeInterface;
use Magento\Framework\MessageQueue\QueueInterface;
use Magento\Framework\Phrase;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use Magento\Framework\MessageQueue\EnvelopeFactory;
use Psr\Log\LoggerInterface;

/**
Expand Down Expand Up @@ -128,23 +128,75 @@ public function acknowledge(EnvelopeInterface $envelope)
// @codingStandardsIgnoreEnd
}

/**
* Subscribe using basic_consume and stop after processing $maxMessages.
*
* Uses prefetch (basic_qos) for efficient push-based delivery instead of
* per-message basic_get round-trips. Respects the wait timeout so the
* consumer exits cleanly when the queue empties before $maxMessages.
*
* @param callable $callback
* @param int $maxMessages
* @param int $waitTimeout Seconds to wait for the next message before exiting (0 = block forever)
* @return void
* @since 103.0.0
*/
public function subscribeWithLimit(callable $callback, int $maxMessages, int $waitTimeout = 1): void
{
if ($maxMessages <= 0) {
return;
}

$messagesProcessed = 0;
$consumerTag = '';

// @codingStandardsIgnoreStart
$callbackConverter = function (AMQPMessage $message) use (
$callback,
&$messagesProcessed,
$maxMessages,
&$consumerTag
) {
$envelope = $this->createEnvelopeFromAmqpMessage($message);

try {
if ($callback instanceof Closure) {
$callback($envelope);
} else {
call_user_func($callback, $envelope);
}
} finally {
$messagesProcessed++;
if ($messagesProcessed >= $maxMessages) {
$message->getChannel()->basic_cancel($consumerTag);
}
}
};

$channel = $this->amqpConfig->getChannel();
$channel->basic_qos(0, $this->prefetchCount, false);
$consumerTag = $channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter);

$timeout = $waitTimeout > 0 ? $waitTimeout : null;
while (count($channel->callbacks)) {
try {
$channel->wait(null, false, $timeout);
} catch (AMQPTimeoutException $e) {
// No message arrived within the wait window — queue is empty or idle; exit cleanly.
break;
}
}
// @codingStandardsIgnoreEnd
}

/**
* @inheritdoc
* @since 103.0.0
*/
public function subscribe($callback)
{
$callbackConverter = function (AMQPMessage $message) use ($callback) {
// @codingStandardsIgnoreStart
$properties = array_merge(
$message->get_properties(),
[
'topic_name' => $message->delivery_info['routing_key'],
'delivery_tag' => $message->delivery_info['delivery_tag'],
]
);
// @codingStandardsIgnoreEnd
$envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
$envelope = $this->createEnvelopeFromAmqpMessage($message);

if ($callback instanceof Closure) {
$callback($envelope);
Expand All @@ -163,6 +215,29 @@ public function subscribe($callback)
}
}

/**
* Build an EnvelopeInterface from a raw AMQPMessage.
*
* Centralises the property mapping used by both subscribe() and
* subscribeWithLimit() to avoid duplication.
*
* @param AMQPMessage $message
* @return EnvelopeInterface
*/
private function createEnvelopeFromAmqpMessage(AMQPMessage $message): EnvelopeInterface
{
// @codingStandardsIgnoreStart
$properties = array_merge(
$message->get_properties(),
[
'topic_name' => $message->delivery_info['routing_key'],
'delivery_tag' => $message->delivery_info['delivery_tag'],
]
);
// @codingStandardsIgnoreEnd
return $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
}

/**
* @inheritdoc
* @since 103.0.0
Expand Down
56 changes: 56 additions & 0 deletions lib/internal/Magento/Framework/Amqp/Test/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Magento\Framework\Amqp\Queue;
use Magento\Framework\MessageQueue\EnvelopeFactory;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -71,4 +72,59 @@ public function testSubscribe()

$this->model->subscribe($callback);
}

/**
* Test verifies subscribeWithLimit sets up basic_qos and basic_consume correctly.
*/
public function testSubscribeWithLimitCallsBasicQosAndBasicConsume(): void
{
$amqpChannel = $this->createMock(AMQPChannel::class);
$amqpChannel->expects($this->once())
->method('basic_qos')
->with(0, self::PREFETCH_COUNT, false);
$amqpChannel->expects($this->once())
->method('basic_consume')
->with('testQueue', '', false, false, false, false, $this->isType('callable'))
->willReturn('test-consumer-tag');
// callbacks is empty by default so the while loop exits immediately
$this->config->expects($this->once())
->method('getChannel')
->willReturn($amqpChannel);

$this->model->subscribeWithLimit(function () {}, 10);
}

/**
* Test verifies subscribeWithLimit does nothing when maxMessages is zero or negative,
* matching the original dequeue() loop behaviour of for ($i = 0; $i > 0; ...).
*/
public function testSubscribeWithLimitDoesNothingWhenMaxMessagesIsZero(): void
{
// getChannel must never be called — no AMQP interaction should occur.
$this->config->expects($this->never())->method('getChannel');

$this->model->subscribeWithLimit(function () {}, 0);
}

/**
* Test verifies subscribeWithLimit exits cleanly when AMQPTimeoutException is thrown,
* meaning the queue drained before $maxMessages were processed.
*/
public function testSubscribeWithLimitExitsCleanlyOnAMQPTimeout(): void
{
$amqpChannel = $this->createMock(AMQPChannel::class);
$amqpChannel->method('basic_qos');
$amqpChannel->method('basic_consume')
->willReturnCallback(function () use ($amqpChannel) {
$amqpChannel->callbacks = ['test-consumer-tag' => function () {}];
return 'test-consumer-tag';
});
$amqpChannel->expects($this->once())
->method('wait')
->willThrowException(new AMQPTimeoutException());
$this->config->method('getChannel')->willReturn($amqpChannel);

// Must not throw; AMQPTimeoutException signals empty queue, not a failure.
$this->model->subscribeWithLimit(function () {}, 10, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

namespace Magento\Framework\MessageQueue;

use Magento\Framework\Amqp\Queue as AmqpQueue;
use Magento\Framework\App\DeploymentConfig;
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface;
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface;
Expand Down Expand Up @@ -53,6 +54,11 @@ public function __construct(
/**
* Run short running process
*
* For AMQP queues this uses basic_consume with prefetch so RabbitMQ pushes
* messages to the consumer rather than requiring a basic_get round-trip per
* message. For all other queue types the original dequeue() polling loop is
* used unchanged.
*
* @param QueueInterface $queue
* @param int $maxNumberOfMessages
* @param \Closure $callback
Expand All @@ -71,9 +77,15 @@ public function invoke(
$sleep = null
) {
$this->poisonPillVersion = $this->poisonPillRead->getLatestVersion();

if ($queue instanceof AmqpQueue && $maxNumberOfMessages !== null) {
$this->invokeAmqp($queue, (int) $maxNumberOfMessages, $callback);
return;
}

$sleep = (int) $sleep ?: 1;
$maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX;
$connectionName = method_exists($queue, 'getConnectionName') ? $queue->getConnectionName(): null;
$connectionName = method_exists($queue, 'getConnectionName') ? $queue->getConnectionName() : null;
if ($connectionName === 'stomp') {
$queue->subscribeQueue();
}
Expand Down Expand Up @@ -101,6 +113,36 @@ public function invoke(
}
}

/**
* Run AMQP consumer using push-based basic_consume instead of polling basic_get.
*
* @param AmqpQueue $queue
* @param int $maxNumberOfMessages
* @param callable $callback
* @return void
*/
private function invokeAmqp(AmqpQueue $queue, int $maxNumberOfMessages, callable $callback): void
{
$poisonPillVersion = $this->poisonPillVersion;

$wrappedCallback = function ($envelope) use ($callback, $poisonPillVersion, $queue) {
if (false === $this->poisonPillCompare->isLatestVersion($poisonPillVersion)) {
$queue->reject($envelope);
// phpcs:ignore Magento2.Security.LanguageConstruct.ExitUsage
exit(0);
}

$callback($envelope);
};

// Mirror the consumers_wait_for_messages behaviour for AMQP:
// 1 (default) → block indefinitely waiting for the next message (waitTimeout=0 → null)
// 0 → exit as soon as the queue is idle (waitTimeout=1 → 1-second channel timeout)
$waitTimeout = $this->isWaitingNextMessage() ? 0 : 1;

$queue->subscribeWithLimit($wrappedCallback, $maxNumberOfMessages, $waitTimeout);
}

/**
* Checks if consumers should wait for message from the queue
*
Expand Down
Loading
Loading