Skip to content

Replace basic_get polling with basic_consume push for AMQP consumers (cron path)#213

Open
thebraziliandeveloper wants to merge 2 commits intomage-os:release/3.xfrom
thebraziliandeveloper:feature/amqp-consumer-prefetch-subscribe
Open

Replace basic_get polling with basic_consume push for AMQP consumers (cron path)#213
thebraziliandeveloper wants to merge 2 commits intomage-os:release/3.xfrom
thebraziliandeveloper:feature/amqp-consumer-prefetch-subscribe

Conversation

@thebraziliandeveloper
Copy link
Copy Markdown

@thebraziliandeveloper thebraziliandeveloper commented Mar 31, 2026

Description

Magento already uses basic_consume (push-based) when running a consumer in daemon mode from the CLI — Consumer::process() calls Queue::subscribe() directly when no message limit is set, and Queue::subscribe() has always used basic_consume + basic_qos prefetch:

// Consumer::process() — lib/internal/Magento/Framework/MessageQueue/Consumer.php
if (!isset($maxNumberOfMessages)) {
    $queue->subscribe($this->getTransactionCallback($queue)); // ← basic_consume ✓
} else {
    $this->invoker->invoke($queue, $maxNumberOfMessages, ...); // ← basic_get ✗
}

However, when --max-messages is set — which is the path taken by every cron-spawned consumer via ConsumersRunner — execution falls into CallbackInvoker::invoke(), which uses a dequeue() loop. dequeue() calls basic_get() once per message: a synchronous request/response that requires one full network round-trip to RabbitMQ per message.

This means the same consumer that efficiently subscribes when run manually in the CLI degrades to sequential polling the moment cron spawns it with --max-messages=10000. RabbitMQ's own documentation explicitly discourages this pattern:

"BasicGet is not suitable for high throughput consumption … prefer Basic.Consume."
https://www.rabbitmq.com/docs/consumers#basics

This PR makes the cron path consistent with the CLI daemon path. It adds Queue::subscribeWithLimit(callable $callback, int $maxMessages, int $waitTimeout) — structurally identical to the existing Queue::subscribe(), with the addition of a basic_cancel call once $maxMessages have been processed. CallbackInvoker::invoke() now routes AMQP queues with a message limit through this method instead of the dequeue() loop.

As part of this change, the duplicated AMQP message-to-envelope property mapping that existed separately in subscribe() and the new method has been extracted into a private helper createEnvelopeFromAmqpMessage().

The existing dequeue() polling loop is preserved unchanged for all other queue backends (MySQL, STOMP).

Behaviour matrix:

Scenario Before After
AMQP + --max-messages (cron) basic_get loop — 1 RTT/message basic_consume + prefetch — batch push
AMQP + no limit (CLI daemon) basic_consume via subscribe() basic_consume via subscribe() (unchanged)
MySQL / STOMP + --max-messages dequeue() loop dequeue() loop (unchanged)

The consumers_wait_for_messages deployment config is respected:

  • 1 (default): channel->wait() blocks indefinitely — same as original
  • 0: 1-second idle timeout on channel->wait(), exits promptly when the queue drains — matches original immediate-exit behaviour

The poison pill check runs before each message callback, consistent with the original implementation.


Benchmark

Environment: Warden / Docker on Apple M-series, RabbitMQ co-located in the same Docker network (~0.1 ms RTT).

Method: A self-contained PHP script bootstraps Magento to obtain the AMQP connection config, declares a temporary durable queue, publishes N messages, then times each approach independently before deleting the queue. Both rounds use identical message payloads and per-message basic_ack.

Benchmark script (click to expand)
<?php
// Usage: warden env exec php-fpm php dev/tools/amqp-benchmark.php [--messages=5000] [--prefetch=100]
declare(strict_types=1);

use Magento\Framework\Amqp\Config as AmqpConfig;
use Magento\Framework\App\Bootstrap;
use Magento\Framework\App\State;

require __DIR__ . '/../../app/bootstrap.php';

$opts          = getopt('', ['messages::', 'prefetch::']);
$totalMessages = (int) ($opts['messages'] ?? 5000);
$prefetchCount = (int) ($opts['prefetch'] ?? 100);

$bootstrap  = Bootstrap::create(BP, $_SERVER);
$om         = $bootstrap->getObjectManager();
$om->get(State::class)->setAreaCode('global');

/** @var AmqpConfig $amqpConfig */
$amqpConfig = $om->get(AmqpConfig::class);

const QUEUE_NAME = 'amqp_benchmark_tmp';

function getChannel(AmqpConfig $cfg): \PhpAmqpLib\Channel\AMQPChannel
{
    $ch = $cfg->getChannel();
    $ch->queue_declare(QUEUE_NAME, false, true, false, false);
    return $ch;
}

function publish(AmqpConfig $cfg, int $n): void
{
    $ch      = getChannel($cfg);
    $payload = json_encode(['product_ids' => [1], 'attributes' => ['meta_description' => 'bench'], 'store_id' => 0]);
    echo "  Publishing $n messages... ";
    $t = microtime(true);
    for ($i = 0; $i < $n; $i++) {
        $ch->basic_publish(new \PhpAmqpLib\Message\AMQPMessage($payload, ['delivery_mode' => 2]), '', QUEUE_NAME);
    }
    printf("done [%.3fs]\n", microtime(true) - $t);
}

function benchGet(AmqpConfig $cfg, int $total): array
{
    $ch = getChannel($cfg); $done = 0; $t = microtime(true);
    while ($done < $total) {
        $msg = $ch->basic_get(QUEUE_NAME);
        if ($msg === null) { usleep(1000); continue; }
        $ch->basic_ack($msg->delivery_info['delivery_tag']);
        $done++;
    }
    $e = microtime(true) - $t;
    return ['elapsed' => $e, 'msg_per_s' => round($done / $e)];
}

function benchConsume(AmqpConfig $cfg, int $total, int $prefetch): array
{
    $ch = getChannel($cfg); $done = 0; $t = microtime(true);
    $ch->basic_qos(0, $prefetch, false);
    $ch->basic_consume(QUEUE_NAME, '', false, false, false, false,
        function ($msg) use ($ch, &$done, $total) {
            $ch->basic_ack($msg->delivery_info['delivery_tag']);
            if (++$done >= $total) { $ch->basic_cancel($msg->delivery_info['consumer_tag']); }
        });
    while (count($ch->callbacks)) { $ch->wait(null, false, 5.0); }
    $e = microtime(true) - $t;
    return ['elapsed' => $e, 'msg_per_s' => round($done / $e)];
}

printf("\nMessages: %d | Prefetch: %d\n\n", $totalMessages, $prefetchCount);
echo "[1/2] basic_get\n"; getChannel($amqpConfig)->queue_purge(QUEUE_NAME); publish($amqpConfig, $totalMessages);
$rGet = benchGet($amqpConfig, $totalMessages);
printf("  %.3fs — %d msg/s\n\n", $rGet['elapsed'], $rGet['msg_per_s']);

echo "[2/2] basic_consume + prefetch\n"; getChannel($amqpConfig)->queue_purge(QUEUE_NAME); publish($amqpConfig, $totalMessages);
$rCon = benchConsume($amqpConfig, $totalMessages, $prefetchCount);
printf("  %.3fs — %d msg/s\n\n", $rCon['elapsed'], $rCon['msg_per_s']);

printf("Speedup: %.1fx | Time saved: %.3fs\n", $rGet['elapsed'] / $rCon['elapsed'], $rGet['elapsed'] - $rCon['elapsed']);
getChannel($amqpConfig)->queue_delete(QUEUE_NAME);

Results — 5,000 messages, prefetch=100 (3 runs):

Run basic_get before basic_consume after Speedup
1 0.720s (6,945 msg/s) 0.302s (16,535 msg/s) 2.4×
2 0.674s (7,415 msg/s) 0.307s (16,285 msg/s) 2.2×
3 0.884s (5,659 msg/s) 0.336s (14,871 msg/s) 2.6×

Prefetch value comparison (5,000 messages, single run each):

Prefetch Time Throughput
10 0.363s 13,791 msg/s
50 0.373s 13,414 msg/s
100 (default) 0.337s 14,859 msg/s
200 0.444s 11,267 msg/s
500 0.299s 16,727 msg/s

These numbers are from a co-located Docker environment where RTT is ~0.1 ms. In a production deployment with a dedicated RabbitMQ host the difference is proportionally larger — at a typical 2 ms RTT:

  • basic_get: 5,000 × 2 ms = ~10 seconds of pure queue overhead per cron consumer run
  • basic_consume: ~50 round-trips × 2 ms = ~0.1 seconds

Manual testing scenarios

  1. Reproduce the benchmark

    # Create 5,000 fixture products (run once)
    warden env exec php-fpm php dev/tools/amqp-benchmark-fixtures.php
    
    # Run the benchmark — prints before/after comparison
    warden env exec php-fpm php dev/tools/amqp-benchmark.php --messages=5000 --prefetch=100
  2. Real consumer end-to-end — trigger a mass product attribute update from Admin to populate the product_action_attribute.update queue, then:

    php bin/magento queue:consumers:start product_action_attribute.update --max-messages=100 --single-thread

    Verify products are updated and var/log/ is clean.

  3. MySQL queue unaffected — configure a consumer on a db connection and confirm the original dequeue() loop is still used (no subscribeWithLimit call).

  4. consumers_wait_for_messages=0 — set in env.php, run a consumer against an empty queue, confirm it exits within ~1 second instead of blocking indefinitely.

  5. --max-messages=0 edge case — confirm the consumer exits immediately without connecting to RabbitMQ (guarded by if ($maxMessages <= 0) { return; }).


Related Pull Requests

Fixed Issues (if relevant)

Questions or comments

This PR is intentionally narrow in scope: only CallbackInvoker and Queue are changed, and only for the AMQP + limited-message path. The dequeue() method is left untouched. Happy to split into smaller commits or add integration test coverage if the maintainers prefer.

Contribution checklist

  • Pull request has a meaningful description of its purpose
  • All commits are accompanied by meaningful commit messages
  • All new or changed code is covered with unit/integration tests (if applicable)
  • All automated tests passed successfully (all builds are green)

Magento already uses basic_consume (push-based) when running a consumer
in daemon mode via Queue::subscribe(). However, when --max-messages is
set — the path taken by every cron-spawned consumer via ConsumersRunner
— CallbackInvoker falls into a dequeue() loop that calls basic_get()
once per message, requiring one full network round-trip per message.
RabbitMQ explicitly discourages basic_get for continuous consumption.

This change adds Queue::subscribeWithLimit(), structurally identical to
the existing Queue::subscribe() but cancelling the consumer via
basic_cancel() once $maxMessages have been processed. CallbackInvoker
now routes AMQP queues with a message limit through this method, making
the cron path consistent with the CLI daemon path.

The dequeue() polling loop is preserved unchanged for MySQL and STOMP
backends. The consumers_wait_for_messages config is respected: value 1
blocks indefinitely (matching original behaviour), value 0 uses a
1-second idle timeout to exit promptly on an empty queue.

Benchmark (5,000 messages, Docker/local RabbitMQ ~0.1ms RTT):
  basic_get before:  ~0.72s  (~6,900 msg/s)
  basic_consume after: ~0.30s  (~16,500 msg/s)  — 2.4x faster

At a typical production RTT of 2ms the improvement is ~100x:
  5,000 x 2ms = 10s before vs ~50 round-trips x 2ms = 0.1s after.
@thebraziliandeveloper thebraziliandeveloper marked this pull request as ready for review March 31, 2026 17:37
@thebraziliandeveloper thebraziliandeveloper requested a review from a team as a code owner March 31, 2026 17:37
@rhoerr rhoerr changed the base branch from main to release/3.x April 1, 2026 01:54
@rhoerr
Copy link
Copy Markdown
Contributor

rhoerr commented Apr 1, 2026

Hi @gowrizrh -- I thought of you given your AMQP experience. Any chance you can review this?

Thanks @thebraziliandeveloper for the PR

@gowrizrh
Copy link
Copy Markdown
Member

gowrizrh commented Apr 2, 2026

This is a nice addition and it looks good overall, but I'll need more time to dive deeper.

@rhoerr It looks like there are additional changes apart from the author's original modifications, is this expected? Should I ignore everything except the changes part of 3ea0cc0

@rhoerr rhoerr changed the base branch from release/3.x to main April 2, 2026 13:29
@rhoerr rhoerr changed the base branch from main to release/3.x April 2, 2026 13:29
@rhoerr
Copy link
Copy Markdown
Contributor

rhoerr commented Apr 2, 2026

This is a nice addition and it looks good overall, but I'll need more time to dive deeper.

@rhoerr It looks like there are additional changes apart from the author's original modifications, is this expected? Should I ignore everything except the changes part of 3ea0cc0

Pardon that -- I changed the PR target and that brought some other work along. I updated the target branch to fix that. This PR is isolated to the single commit now. Thank you

@rhoerr rhoerr added the enhancement New feature or request label Apr 7, 2026
Copy link
Copy Markdown
Member

@gowrizrh gowrizrh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat change, thanks. I've approved it

@thebraziliandeveloper
Copy link
Copy Markdown
Author

Thanks @gowrizrh

I see some failing checks, should I fix them so the team can merge the PR?

- Expand empty closures `function () {}` to multi-line form to satisfy
  the Magento2 coding standard (closing brace must be on its own line)
  in QueueTest.php and CallbackInvokerTest.php
- Update TRemoteService.txt fixture to use nullable type `?int $typeId`
  matching the TRepositoryInterface source; the old fixture used `int`
  which caused RemoteServiceGeneratorTest::testGenerate mage-os#1 to fail
@thebraziliandeveloper thebraziliandeveloper force-pushed the feature/amqp-consumer-prefetch-subscribe branch from a660746 to 73c7bb6 Compare April 7, 2026 13:05
@rhoerr
Copy link
Copy Markdown
Contributor

rhoerr commented Apr 7, 2026

Thanks @thebraziliandeveloper , I reran PR checks. Can you look at the AMQP unit test and integration test errors?

@thebraziliandeveloper
Copy link
Copy Markdown
Author

@rhoerr will check this week

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants