diff --git a/kombu/transport/gcpubsub.py b/kombu/transport/gcpubsub.py index b47f620837..b4594f2d07 100644 --- a/kombu/transport/gcpubsub.py +++ b/kombu/transport/gcpubsub.py @@ -356,6 +356,7 @@ def _put(self, queue, message, **kwargs): qdesc.topic_path, encoded_message.encode("utf-8"), routing_key=routing_key, + retry=Retry(deadline=self.retry_timeout_seconds), ) def _put_fanout(self, exchange, message, routing_key, **kwargs): diff --git a/t/unit/transport/test_gcpubsub.py b/t/unit/transport/test_gcpubsub.py index 504eb50a4e..1c0e81b939 100644 --- a/t/unit/transport/test_gcpubsub.py +++ b/t/unit/transport/test_gcpubsub.py @@ -3,12 +3,13 @@ from concurrent.futures import Future from datetime import datetime from queue import Empty -from unittest.mock import MagicMock, call, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest from _socket import timeout as socket_timeout from google.api_core.exceptions import (AlreadyExists, DeadlineExceeded, PermissionDenied) +from google.api_core.retry import Retry from google.pubsub_v1.types.pubsub import Subscription from kombu.transport.gcpubsub import (AtomicCounter, Channel, QueueDescriptor, @@ -328,8 +329,37 @@ def test_put(self, channel): ) channel._get_routing_key = MagicMock(return_value="test_key") channel.publisher.publish = MagicMock() + channel.retry_timeout_seconds = 300 channel._put(queue, message) - channel.publisher.publish.assert_called_once() + channel.publisher.publish.assert_called_once_with( + "topic_path", + ANY, + routing_key="test_key", + retry=ANY, + ) + call_kwargs = channel.publisher.publish.call_args[1] + assert isinstance(call_kwargs['retry'], Retry) + assert call_kwargs['retry']._timeout == 300 + + def test_put_uses_custom_retry_timeout(self, channel): + queue = "test_queue" + message = { + "properties": {"delivery_info": {"routing_key": "test_key"}} + } + channel.entity_name = MagicMock(return_value=queue) + channel._queue_cache[channel.entity_name(queue)] = QueueDescriptor( + name=queue, + topic_path="topic_path", + subscription_id=queue, + subscription_path="subscription_path", + ) + channel._get_routing_key = MagicMock(return_value="test_key") + channel.publisher.publish = MagicMock() + channel.retry_timeout_seconds = 60 + channel._put(queue, message) + call_kwargs = channel.publisher.publish.call_args[1] + assert isinstance(call_kwargs['retry'], Retry) + assert call_kwargs['retry']._timeout == 60 def test_put_fanout(self, channel): exchange = "test_exchange"