diff --git a/lib/internal/Magento/Framework/Amqp/Queue.php b/lib/internal/Magento/Framework/Amqp/Queue.php index 721630a586d2..7a23494e04be 100644 --- a/lib/internal/Magento/Framework/Amqp/Queue.php +++ b/lib/internal/Magento/Framework/Amqp/Queue.php @@ -8,12 +8,12 @@ use Closure; use Exception; use Magento\Framework\MessageQueue\ConnectionLostException; +use Magento\Framework\MessageQueue\EnvelopeFactory; use Magento\Framework\MessageQueue\EnvelopeInterface; use Magento\Framework\MessageQueue\QueueInterface; use Magento\Framework\Phrase; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; -use Magento\Framework\MessageQueue\EnvelopeFactory; use Psr\Log\LoggerInterface; /** @@ -128,6 +128,67 @@ public function acknowledge(EnvelopeInterface $envelope) // @codingStandardsIgnoreEnd } + /** + * Subscribe using basic_consume and stop after processing $maxMessages. + * + * Uses prefetch (basic_qos) for efficient push-based delivery instead of + * per-message basic_get round-trips. Respects the wait timeout so the + * consumer exits cleanly when the queue empties before $maxMessages. + * + * @param callable $callback + * @param int $maxMessages + * @param int $waitTimeout Seconds to wait for the next message before exiting (0 = block forever) + * @return void + * @since 103.0.0 + */ + public function subscribeWithLimit(callable $callback, int $maxMessages, int $waitTimeout = 1): void + { + if ($maxMessages <= 0) { + return; + } + + $messagesProcessed = 0; + $consumerTag = ''; + + // @codingStandardsIgnoreStart + $callbackConverter = function (AMQPMessage $message) use ( + $callback, + &$messagesProcessed, + $maxMessages, + &$consumerTag + ) { + $envelope = $this->createEnvelopeFromAmqpMessage($message); + + try { + if ($callback instanceof Closure) { + $callback($envelope); + } else { + call_user_func($callback, $envelope); + } + } finally { + $messagesProcessed++; + if ($messagesProcessed >= $maxMessages) { + $message->getChannel()->basic_cancel($consumerTag); + } + } + }; + + $channel = $this->amqpConfig->getChannel(); + $channel->basic_qos(0, $this->prefetchCount, false); + $consumerTag = $channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter); + + $timeout = $waitTimeout > 0 ? $waitTimeout : null; + while (count($channel->callbacks)) { + try { + $channel->wait(null, false, $timeout); + } catch (AMQPTimeoutException $e) { + // No message arrived within the wait window — queue is empty or idle; exit cleanly. + break; + } + } + // @codingStandardsIgnoreEnd + } + /** * @inheritdoc * @since 103.0.0 @@ -135,16 +196,7 @@ public function acknowledge(EnvelopeInterface $envelope) public function subscribe($callback) { $callbackConverter = function (AMQPMessage $message) use ($callback) { - // @codingStandardsIgnoreStart - $properties = array_merge( - $message->get_properties(), - [ - 'topic_name' => $message->delivery_info['routing_key'], - 'delivery_tag' => $message->delivery_info['delivery_tag'], - ] - ); - // @codingStandardsIgnoreEnd - $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]); + $envelope = $this->createEnvelopeFromAmqpMessage($message); if ($callback instanceof Closure) { $callback($envelope); @@ -163,6 +215,29 @@ public function subscribe($callback) } } + /** + * Build an EnvelopeInterface from a raw AMQPMessage. + * + * Centralises the property mapping used by both subscribe() and + * subscribeWithLimit() to avoid duplication. + * + * @param AMQPMessage $message + * @return EnvelopeInterface + */ + private function createEnvelopeFromAmqpMessage(AMQPMessage $message): EnvelopeInterface + { + // @codingStandardsIgnoreStart + $properties = array_merge( + $message->get_properties(), + [ + 'topic_name' => $message->delivery_info['routing_key'], + 'delivery_tag' => $message->delivery_info['delivery_tag'], + ] + ); + // @codingStandardsIgnoreEnd + return $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]); + } + /** * @inheritdoc * @since 103.0.0 diff --git a/lib/internal/Magento/Framework/Amqp/Test/Unit/QueueTest.php b/lib/internal/Magento/Framework/Amqp/Test/Unit/QueueTest.php index b4e0b6c1fa53..19189e6c1e05 100644 --- a/lib/internal/Magento/Framework/Amqp/Test/Unit/QueueTest.php +++ b/lib/internal/Magento/Framework/Amqp/Test/Unit/QueueTest.php @@ -11,6 +11,7 @@ use Magento\Framework\Amqp\Queue; use Magento\Framework\MessageQueue\EnvelopeFactory; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; @@ -71,4 +72,63 @@ public function testSubscribe() $this->model->subscribe($callback); } + + /** + * Test verifies subscribeWithLimit sets up basic_qos and basic_consume correctly. + */ + public function testSubscribeWithLimitCallsBasicQosAndBasicConsume(): void + { + $amqpChannel = $this->createMock(AMQPChannel::class); + $amqpChannel->expects($this->once()) + ->method('basic_qos') + ->with(0, self::PREFETCH_COUNT, false); + $amqpChannel->expects($this->once()) + ->method('basic_consume') + ->with('testQueue', '', false, false, false, false, $this->isType('callable')) + ->willReturn('test-consumer-tag'); + // callbacks is empty by default so the while loop exits immediately + $this->config->expects($this->once()) + ->method('getChannel') + ->willReturn($amqpChannel); + + $this->model->subscribeWithLimit(function () { + }, 10); + } + + /** + * Test verifies subscribeWithLimit does nothing when maxMessages is zero or negative, + * matching the original dequeue() loop behaviour of for ($i = 0; $i > 0; ...). + */ + public function testSubscribeWithLimitDoesNothingWhenMaxMessagesIsZero(): void + { + // getChannel must never be called — no AMQP interaction should occur. + $this->config->expects($this->never())->method('getChannel'); + + $this->model->subscribeWithLimit(function () { + }, 0); + } + + /** + * Test verifies subscribeWithLimit exits cleanly when AMQPTimeoutException is thrown, + * meaning the queue drained before $maxMessages were processed. + */ + public function testSubscribeWithLimitExitsCleanlyOnAMQPTimeout(): void + { + $amqpChannel = $this->createMock(AMQPChannel::class); + $amqpChannel->method('basic_qos'); + $amqpChannel->method('basic_consume') + ->willReturnCallback(function () use ($amqpChannel) { + $amqpChannel->callbacks = ['test-consumer-tag' => function () { + }]; + return 'test-consumer-tag'; + }); + $amqpChannel->expects($this->once()) + ->method('wait') + ->willThrowException(new AMQPTimeoutException()); + $this->config->method('getChannel')->willReturn($amqpChannel); + + // Must not throw; AMQPTimeoutException signals empty queue, not a failure. + $this->model->subscribeWithLimit(function () { + }, 10, 1); + } } diff --git a/lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php b/lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php index f84a4a660e52..10e73ee75657 100644 --- a/lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php +++ b/lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php @@ -6,6 +6,7 @@ namespace Magento\Framework\MessageQueue; +use Magento\Framework\Amqp\Queue as AmqpQueue; use Magento\Framework\App\DeploymentConfig; use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface; use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface; @@ -53,6 +54,11 @@ public function __construct( /** * Run short running process * + * For AMQP queues this uses basic_consume with prefetch so RabbitMQ pushes + * messages to the consumer rather than requiring a basic_get round-trip per + * message. For all other queue types the original dequeue() polling loop is + * used unchanged. + * * @param QueueInterface $queue * @param int $maxNumberOfMessages * @param \Closure $callback @@ -71,9 +77,15 @@ public function invoke( $sleep = null ) { $this->poisonPillVersion = $this->poisonPillRead->getLatestVersion(); + + if ($queue instanceof AmqpQueue && $maxNumberOfMessages !== null) { + $this->invokeAmqp($queue, (int) $maxNumberOfMessages, $callback); + return; + } + $sleep = (int) $sleep ?: 1; $maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX; - $connectionName = method_exists($queue, 'getConnectionName') ? $queue->getConnectionName(): null; + $connectionName = method_exists($queue, 'getConnectionName') ? $queue->getConnectionName() : null; if ($connectionName === 'stomp') { $queue->subscribeQueue(); } @@ -101,6 +113,36 @@ public function invoke( } } + /** + * Run AMQP consumer using push-based basic_consume instead of polling basic_get. + * + * @param AmqpQueue $queue + * @param int $maxNumberOfMessages + * @param callable $callback + * @return void + */ + private function invokeAmqp(AmqpQueue $queue, int $maxNumberOfMessages, callable $callback): void + { + $poisonPillVersion = $this->poisonPillVersion; + + $wrappedCallback = function ($envelope) use ($callback, $poisonPillVersion, $queue) { + if (false === $this->poisonPillCompare->isLatestVersion($poisonPillVersion)) { + $queue->reject($envelope); + // phpcs:ignore Magento2.Security.LanguageConstruct.ExitUsage + exit(0); + } + + $callback($envelope); + }; + + // Mirror the consumers_wait_for_messages behaviour for AMQP: + // 1 (default) → block indefinitely waiting for the next message (waitTimeout=0 → null) + // 0 → exit as soon as the queue is idle (waitTimeout=1 → 1-second channel timeout) + $waitTimeout = $this->isWaitingNextMessage() ? 0 : 1; + + $queue->subscribeWithLimit($wrappedCallback, $maxNumberOfMessages, $waitTimeout); + } + /** * Checks if consumers should wait for message from the queue * diff --git a/lib/internal/Magento/Framework/MessageQueue/Test/Unit/CallbackInvokerTest.php b/lib/internal/Magento/Framework/MessageQueue/Test/Unit/CallbackInvokerTest.php new file mode 100644 index 000000000000..17e28fb8472d --- /dev/null +++ b/lib/internal/Magento/Framework/MessageQueue/Test/Unit/CallbackInvokerTest.php @@ -0,0 +1,174 @@ +poisonPillRead = $this->createMock(PoisonPillReadInterface::class); + $this->poisonPillCompare = $this->createMock(PoisonPillCompareInterface::class); + $this->deploymentConfig = $this->createMock(DeploymentConfig::class); + + $this->invoker = new CallbackInvoker( + $this->poisonPillRead, + $this->poisonPillCompare, + $this->deploymentConfig + ); + } + + /** + * For AMQP queues with a message limit, subscribeWithLimit() must be used instead + * of the dequeue() polling loop to avoid per-message basic_get round-trips. + */ + public function testInvokeUsesSubscribeWithLimitForAmqpQueue(): void + { + $this->poisonPillRead->expects($this->once())->method('getLatestVersion')->willReturn('v1'); + $this->deploymentConfig->expects($this->once())->method('get') + ->with('queue/consumers_wait_for_messages', 1) + ->willReturn(1); + + $queue = $this->createMock(AmqpQueue::class); + $queue->expects($this->once()) + ->method('subscribeWithLimit') + ->with($this->isType('callable'), 50, 0); + $queue->expects($this->never())->method('dequeue'); + + $this->invoker->invoke($queue, 50, function () { + }); + } + + /** + * When consumers_wait_for_messages=0 the channel wait timeout must be 1 second + * so the consumer exits promptly once the queue empties. + */ + public function testInvokePassesWaitTimeoutOneWhenNotWaitingForMessages(): void + { + $this->poisonPillRead->method('getLatestVersion')->willReturn('v1'); + $this->deploymentConfig->expects($this->once())->method('get') + ->with('queue/consumers_wait_for_messages', 1) + ->willReturn(0); + + $queue = $this->createMock(AmqpQueue::class); + $queue->expects($this->once()) + ->method('subscribeWithLimit') + ->with($this->isType('callable'), 10, 1); + + $this->invoker->invoke($queue, 10, function () { + }); + } + + /** + * When consumers_wait_for_messages=1 the channel wait timeout must be 0 (block + * indefinitely) to match the existing behaviour for that setting. + */ + public function testInvokePassesWaitTimeoutZeroWhenWaitingForMessages(): void + { + $this->poisonPillRead->method('getLatestVersion')->willReturn('v1'); + $this->deploymentConfig->expects($this->once())->method('get') + ->with('queue/consumers_wait_for_messages', 1) + ->willReturn(1); + + $queue = $this->createMock(AmqpQueue::class); + $queue->expects($this->once()) + ->method('subscribeWithLimit') + ->with($this->isType('callable'), 10, 0); + + $this->invoker->invoke($queue, 10, function () { + }); + } + + /** + * Non-AMQP queues (e.g. MySQL-backed) must keep using the original dequeue() loop. + */ + public function testInvokeFallsBackToDequeueLoopForNonAmqpQueue(): void + { + $this->poisonPillRead->method('getLatestVersion')->willReturn('v1'); + $this->poisonPillCompare->method('isLatestVersion')->willReturn(true); + $this->deploymentConfig->method('get') + ->with('queue/consumers_wait_for_messages', 1) + ->willReturn(0); + + $queue = $this->createMock(QueueInterface::class); + $queue->expects($this->once())->method('dequeue')->willReturn(null); + + $invoked = false; + $this->invoker->invoke($queue, 1, function () use (&$invoked) { + $invoked = true; + }); + + $this->assertFalse($invoked, 'Callback must not be called when dequeue returns null'); + } + + /** + * The wrapped callback passed to subscribeWithLimit must delegate to the original + * callback when the poison pill version is current. + */ + public function testInvokeWrappedCallbackDelegatesToOriginalCallback(): void + { + $this->poisonPillRead->method('getLatestVersion')->willReturn('v1'); + $this->poisonPillCompare->method('isLatestVersion')->willReturn(true); + $this->deploymentConfig->method('get') + ->with('queue/consumers_wait_for_messages', 1) + ->willReturn(1); + + $capturedCallback = null; + $queue = $this->createMock(AmqpQueue::class); + $queue->expects($this->once()) + ->method('subscribeWithLimit') + ->willReturnCallback(function (callable $callback) use (&$capturedCallback) { + $capturedCallback = $callback; + }); + + $originalCalled = false; + $originalCallback = function () use (&$originalCalled) { + $originalCalled = true; + }; + + $this->invoker->invoke($queue, 5, $originalCallback); + + $this->assertIsCallable($capturedCallback); + $envelope = $this->createMock(EnvelopeInterface::class); + $capturedCallback($envelope); + + $this->assertTrue($originalCalled, 'Original callback must be invoked when poison pill is current'); + } +} diff --git a/lib/internal/Magento/Framework/MessageQueue/Test/Unit/Code/Generator/_files/TRemoteService.txt b/lib/internal/Magento/Framework/MessageQueue/Test/Unit/Code/Generator/_files/TRemoteService.txt index d2c2ecadb3e8..4bdded6837a0 100644 --- a/lib/internal/Magento/Framework/MessageQueue/Test/Unit/Code/Generator/_files/TRemoteService.txt +++ b/lib/internal/Magento/Framework/MessageQueue/Test/Unit/Code/Generator/_files/TRemoteService.txt @@ -36,7 +36,7 @@ class TRepositoryInterfaceRemote implements TRepositoryInterface /** * @inheritdoc */ - public function get(string $attribute, int $typeId = null): \Magento\Framework\MessageQueue\Code\Generator\TInterface + public function get(string $attribute, ?int $typeId = null): \Magento\Framework\MessageQueue\Code\Generator\TInterface { return $this->publisher->publish( 'magento.framework.messageQueue.code.generator.tRepositoryInterface.get',