Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions kombu/transport/azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
* ``retry_backoff_factor`` - Azure SDK exponential backoff factor.
Default ``0.8``
* ``retry_backoff_max`` - Azure SDK retry total time. Default ``120``
* ``use_lock_renewal`` - Use Azure SDK Auto Lock Renewal. Works only if receive mode ``PEEK_LOCK`` is in use.
* ``max_lock_renewal_duration`` - Azure SDK time in seconds that locks registered to a renewer
should be maintained for. Default ``3600.0`` (1 hour)
"""

from __future__ import annotations
Expand All @@ -64,9 +67,9 @@
import azure.core.exceptions
import azure.servicebus.exceptions
import isodate
from azure.servicebus import (ServiceBusClient, ServiceBusMessage,
ServiceBusReceiveMode, ServiceBusReceiver,
ServiceBusSender)
from azure.servicebus import (AutoLockRenewer, ServiceBusClient,
ServiceBusMessage, ServiceBusReceiveMode,
ServiceBusReceiver, ServiceBusSender)
from azure.servicebus.management import ServiceBusAdministrationClient

try:
Expand Down Expand Up @@ -120,6 +123,11 @@ class Channel(virtual.Channel):
default_retry_backoff_factor: float = 0.8
# Max time to backoff (is the default from service bus repo)
default_retry_backoff_max: int = 120
#: .. versionadded:: 5.7
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure If this is the right place to add the versionadded annotation here

default_use_lock_renewal: bool = False
Comment thread
tuschla marked this conversation as resolved.
#: .. versionadded:: 5.7
default_max_lock_renewal_duration: float = 3600.0 # in seconds (1 hour)

domain_format: str = 'kombu%(vhost)s'
_queue_cache: dict[str, SendReceive] = {}
_noack_queues: set[str] = set()
Expand All @@ -136,6 +144,8 @@ def __init__(self, *args, **kwargs):

self.qos.restore_at_shutdown = False

self._renewer = None

def _try_parse_connection_string(self) -> None:
self._namespace, self._credential = Transport.parse_uri(
self.conninfo.hostname)
Expand Down Expand Up @@ -202,9 +212,16 @@ def _get_asb_receiver(
cache_key = queue_cache_key or queue
queue_obj = self._queue_cache.get(cache_key, None)
if queue_obj is None or queue_obj.receiver is None:
auto_lock_renewer = None
if self.use_lock_renewal and recv_mode == ServiceBusReceiveMode.PEEK_LOCK:
if self._renewer is None:
self._renewer = AutoLockRenewer(
max_lock_renewal_duration=self.max_lock_renewal_duration
)
auto_lock_renewer = self._renewer
receiver = self.queue_service.get_queue_receiver(
queue_name=queue, receive_mode=recv_mode,
keep_alive=self.uamqp_keep_alive_interval)
keep_alive=self.uamqp_keep_alive_interval, auto_lock_renewer=auto_lock_renewer)
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
return queue_obj

Expand Down Expand Up @@ -346,6 +363,9 @@ def close(self) -> None:
# receivers and senders spawn threads so clean them up
if not self.closed:
self.closed = True
if self._renewer:
self._renewer.close()
self._renewer = None
for queue_obj in self._queue_cache.values():
queue_obj.close()
self._queue_cache.clear()
Expand Down Expand Up @@ -427,6 +447,20 @@ def retry_backoff_max(self) -> int:
return self.transport_options.get(
'retry_backoff_max', self.default_retry_backoff_max)

@cached_property
def use_lock_renewal(self) -> bool:
return self.transport_options.get(
'use_lock_renewal', self.default_use_lock_renewal
)

@cached_property
def max_lock_renewal_duration(self) -> float:
return float(
self.transport_options.get(
'max_lock_renewal_duration', self.default_max_lock_renewal_duration
)
)


class Transport(virtual.Transport):
"""Azure Service Bus transport."""
Expand Down
56 changes: 56 additions & 0 deletions t/unit/transport/test_azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,59 @@ def test_returning_da():
def test_returning_mi():
conn = Connection(URL_CREDS_MI, transport=azureservicebus.Transport)
assert conn.as_uri(True) == URL_CREDS_MI_FQ


def test_lock_renewal_config_initialization():
options = {
"use_lock_renewal": True,
"max_lock_renewal_duration": 1234.0
}
conn = Connection(URL_CREDS_MI_FQ, transport=azureservicebus.Transport, transport_options=options)
channel = conn.channel()

assert channel.use_lock_renewal is True
assert channel.max_lock_renewal_duration == 1234.0


@patch('kombu.transport.azureservicebus.AutoLockRenewer')
def test_get_asb_receiver_logic(mock_renewer_cls, mock_queue):
"""Test for creation, reuse, and only present in peek lock mode."""
# Create a fresh Connection/channel with lock renewal enabled via transport_options,
# mirroring real usage instead of mutating transport_options after creation.
conn = Connection(
URL_CREDS_MI_FQ,
transport=azureservicebus.Transport,
transport_options={"use_lock_renewal": True},
)
channel = conn.channel()
channel.queue_service.get_queue_receiver = MagicMock()

# test if renewer is created
channel._get_asb_receiver("first_queue", recv_mode=ServiceBusReceiveMode.PEEK_LOCK)
mock_renewer_cls.assert_called_once()
Comment thread
tuschla marked this conversation as resolved.
mock_renewer_cls.assert_called_once_with(max_lock_renewal_duration=channel.max_lock_renewal_duration)
assert channel.queue_service.get_queue_receiver.call_args.kwargs[
"auto_lock_renewer"
] == mock_renewer_cls.return_value

# test for reuse of the first AutoLockRenewer
channel._get_asb_receiver("second_queue", recv_mode=ServiceBusReceiveMode.PEEK_LOCK)
assert mock_renewer_cls.call_count == 1

# check if renewer is not present if recv_mode is not peek lock
channel._get_asb_receiver("third_queue", recv_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE)
assert channel.queue_service.get_queue_receiver.call_args.kwargs["auto_lock_renewer"] is None


def test_channel_close_is_safe(mock_queue):
"""Make sure renewer close is safe to call multiple times."""
channel = mock_queue.channel
renewer_mock = MagicMock()
channel._renewer = renewer_mock

channel.close()
renewer_mock.close.assert_called_once()
assert channel._renewer is None

channel.close()
assert renewer_mock.close.call_count == 1