Skip to content

Commit 012a094

Browse files
authored
[change] Send VPN sync errors/recovery as generic_message notifications #1049
Aimed at zerotier and wireguard. Made existing code reusable. Closes #1049
1 parent 7d94540 commit 012a094

5 files changed

Lines changed: 132 additions & 80 deletions

File tree

openwisp_controller/config/tasks.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
from openwisp_utils.tasks import OpenwispCeleryTask
1111

12+
from .utils import handle_error_notification, handle_recovery_notification
13+
1214
logger = logging.getLogger(__name__)
1315

1416

@@ -130,18 +132,35 @@ def trigger_vpn_server_endpoint(endpoint, auth_token, vpn_id):
130132

131133
# Cache the configuration here makes downloading the configuration faster.
132134
vpn.get_cached_configuration()
133-
response = requests.post(
134-
endpoint,
135-
params={"key": auth_token},
136-
verify=False if getattr(settings, "DEBUG") else True,
137-
)
138-
if response.status_code == 200:
139-
logger.info(f"Triggered update webhook of VPN Server UUID: {vpn_id}")
135+
task_key = f"vpn_update_task:{vpn_id}"
136+
try:
137+
response = requests.post(
138+
endpoint,
139+
params={"key": auth_token},
140+
verify=not settings.DEBUG,
141+
timeout=30,
142+
)
143+
response.raise_for_status()
144+
except requests.RequestException as e:
145+
logger.warning(
146+
f"Failed to update VPN Server configuration. "
147+
f"Error: {str(e)}, "
148+
f"VPN Server UUID: {vpn_id}"
149+
)
150+
handle_error_notification(
151+
task_key,
152+
sleep_time=5,
153+
exception=e,
154+
instance=vpn,
155+
action="update",
156+
)
140157
else:
141-
logger.error(
142-
"Failed to update VPN Server configuration. "
143-
f"Response status code: {response.status_code}, "
144-
f"VPN Server UUID: {vpn_id}",
158+
logger.info(f"Triggered update webhook of VPN Server UUID: {vpn_id}")
159+
handle_recovery_notification(
160+
task_key,
161+
sleep_time=5,
162+
instance=vpn,
163+
action="update",
145164
)
146165

147166

openwisp_controller/config/tasks_zerotier.py

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
import logging
22
from http import HTTPStatus
3-
from time import sleep
43

54
from celery import shared_task
6-
from django.core.cache import cache
75
from django.core.exceptions import ObjectDoesNotExist
8-
from django.utils.translation import gettext as _
9-
from openwisp_notifications.signals import notify
106
from requests.exceptions import RequestException
117
from swapper import load_model
128

139
from openwisp_controller.config.api.zerotier_service import ZerotierService
1410
from openwisp_utils.tasks import OpenwispCeleryTask
1511

1612
from .settings import API_TASK_RETRY_OPTIONS
13+
from .utils import handle_error_notification, handle_recovery_notification
1714

1815
logger = logging.getLogger(__name__)
1916

@@ -27,48 +24,6 @@ class OpenwispApiTask(OpenwispCeleryTask):
2724
HTTPStatus.GATEWAY_TIMEOUT, # 504
2825
]
2926

30-
def _send_api_task_notification(self, type, **kwargs):
31-
vpn = kwargs.get("instance")
32-
action = kwargs.get("action").replace("_", " ")
33-
status_code = kwargs.get("status_code")
34-
# Adding some delay here to prevent overlapping
35-
# of the django success message container
36-
# with the ow-notification container
37-
# https://github.com/openwisp/openwisp-notifications/issues/264
38-
sleep(2)
39-
message_map = {
40-
"error": {
41-
"verb": _("encountered an unrecoverable error"),
42-
"message": _(
43-
"Unable to perform {action} operation on the "
44-
"{target} VPN server due to an "
45-
"unrecoverable error "
46-
"(status code: {status_code})"
47-
),
48-
"level": "error",
49-
},
50-
"recovery": {
51-
"verb": _("has been completed successfully"),
52-
"message": _("The {action} operation on {target} {verb}."),
53-
"level": "info",
54-
},
55-
}
56-
meta = message_map[type]
57-
notify.send(
58-
type="generic_message",
59-
sender=vpn,
60-
target=vpn,
61-
action=action,
62-
verb=meta["verb"],
63-
message=meta["message"].format(
64-
action=action,
65-
target=str(vpn),
66-
status_code=status_code,
67-
verb=meta["verb"],
68-
),
69-
level=meta["level"],
70-
)
71-
7227
def handle_api_call(self, fn, *args, send_notification=True, **kwargs):
7328
"""
7429
This method handles API calls and their responses
@@ -105,13 +60,10 @@ def handle_api_call(self, fn, *args, send_notification=True, **kwargs):
10560
response.raise_for_status()
10661
logger.info(info_msg)
10762
if send_notification:
108-
task_result = cache.get(task_key)
109-
if task_result == "error":
110-
self._send_api_task_notification("recovery", **kwargs)
111-
cache.set(task_key, "success", None)
63+
handle_recovery_notification(task_key, **kwargs)
11264
except RequestException as e:
11365
if response.status_code in self._RECOVERABLE_API_CODES:
114-
retry_logger = logger.warn
66+
retry_logger = logger.warning
11567
# When retry limit is reached, use error logging
11668
if self.request.retries == self.max_retries:
11769
retry_logger = logger.error
@@ -122,12 +74,7 @@ def handle_api_call(self, fn, *args, send_notification=True, **kwargs):
12274
raise e
12375
logger.error(f"{err_msg}, Error: {e}")
12476
if send_notification:
125-
task_result = cache.get(task_key)
126-
if task_result in (None, "success"):
127-
cache.set(task_key, "error", None)
128-
self._send_api_task_notification(
129-
"error", status_code=response.status_code, **kwargs
130-
)
77+
handle_error_notification(task_key, exception=e, **kwargs)
13178
return (response, updated_config) if updated_config else response
13279

13380

@@ -150,6 +97,8 @@ def trigger_zerotier_server_update(self, config, vpn_id):
15097
network_id,
15198
instance=vpn,
15299
action="update",
100+
# notification kwargs
101+
sleep_time=5,
153102
info=(
154103
f"Successfully updated the configuration of "
155104
f"ZeroTier VPN Server with UUID: {vpn_id}"
@@ -189,6 +138,8 @@ def trigger_zerotier_server_update_member(self, vpn_id, ip=None, node_id=None):
189138
member_ip,
190139
instance=vpn,
191140
action="update_member",
141+
# notification kwargs
142+
sleep_time=5,
192143
info=(
193144
f"Successfully updated ZeroTier network member: {node_id}, "
194145
f"ZeroTier network: {network_id}, "
@@ -216,7 +167,7 @@ def trigger_zerotier_server_remove_member(self, node_id=None, **vpn_kwargs):
216167
network_id = vpn_kwargs.get("network_id")
217168
try:
218169
vpn = Vpn.objects.get(pk=vpn_id)
219-
notification_kwargs = dict(instance=vpn, action="remove_member")
170+
notification_kwargs = dict(instance=vpn, action="remove_member", sleep_time=5)
220171
# When a ZeroTier VPN server is deleted
221172
# and this is followed by the deletion of ZeroTier VPN clients
222173
# we won't have access to the VPN server instance. Therefore, we should
@@ -264,6 +215,7 @@ def trigger_zerotier_server_join(self, vpn_id):
264215
network_id,
265216
instance=vpn,
266217
action="network_join",
218+
sleep_time=5,
267219
info=(
268220
f"Successfully joined the ZeroTier network: {network_id}, "
269221
f"ZeroTier VPN Server UUID: {vpn_id}"

openwisp_controller/config/tests/test_notifications.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class TestNotifications(
3131
app_label = "config"
3232
_ZT_SERVICE_REQUESTS = "openwisp_controller.config.api.zerotier_service.requests"
3333
_ZT_API_TASKS_INFO_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.info"
34-
_ZT_API_TASKS_WARN_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.warn"
34+
_ZT_API_TASKS_WARN_LOGGER = (
35+
"openwisp_controller.config.tasks_zerotier.logger.warning"
36+
)
3537
_ZT_API_TASKS_ERR_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.error"
3638
# As the locmem cache does not support the redis backend cache.keys() method
3739
_ZT_API_TASKS_LOCMEM_CACHE_KEYS = f"{settings.CACHES['default']['BACKEND']}.keys"

openwisp_controller/config/tests/test_vpn.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from subprocess import CalledProcessError, TimeoutExpired
44
from unittest import mock
55

6+
import requests
67
from celery.exceptions import Retry, SoftTimeLimitExceeded
78
from django.conf import settings
89
from django.core.exceptions import ValidationError
910
from django.db.models.signals import post_save
1011
from django.db.utils import IntegrityError
11-
from django.http.response import HttpResponse, HttpResponseNotFound
1212
from django.test import TestCase, TransactionTestCase
1313
from requests.exceptions import ConnectionError, RequestException, Timeout
1414
from swapper import load_model
@@ -728,9 +728,12 @@ def test_trigger_vpn_server_endpoint_invalid_vpn_id(self):
728728

729729

730730
class TestWireguardTransaction(BaseTestVpn, TestWireguardVpnMixin, TransactionTestCase):
731+
mock_response = mock.Mock(spec=requests.Response)
732+
mock_response.status_code = 200
733+
mock_response.raise_for_status = mock.Mock()
734+
731735
@mock.patch(
732-
"openwisp_controller.config.tasks.requests.post",
733-
return_value=HttpResponse(status=200),
736+
"openwisp_controller.config.tasks.requests.post", return_value=mock_response
734737
)
735738
def test_auto_peer_configuration(self, *args):
736739
self.assertEqual(IpAddress.objects.count(), 0)
@@ -824,6 +827,9 @@ def test_update_vpn_server_configuration(self):
824827
f"Cannot update configuration of {vpn.name} VPN server, "
825828
"webhook endpoint and authentication token are empty."
826829
)
830+
success_response = mock.Mock(spec=requests.Response)
831+
success_response.status_code = 200
832+
success_response.raise_for_status = mock.Mock()
827833

828834
with self.subTest("Webhook endpoint and authentication endpoint is present"):
829835
vpn.webhook_endpoint = "https://example.com"
@@ -834,24 +840,29 @@ def test_update_vpn_server_configuration(self):
834840
with mock.patch(
835841
"openwisp_controller.config.tasks.logger.info"
836842
) as mocked_logger, mock.patch(
837-
"requests.post", return_value=HttpResponse()
843+
"requests.post", return_value=success_response
838844
):
839845
post_save.send(
840846
instance=vpn_client, sender=vpn_client._meta.model, created=False
841847
)
842848
mocked_logger.assert_called_once_with(
843849
f"Triggered update webhook of VPN Server UUID: {vpn.pk}"
844850
)
851+
fail_response = mock.Mock(spec=requests.Response)
852+
fail_response.status_code = 404
853+
fail_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
854+
"Not Found"
855+
)
845856

846-
with mock.patch("logging.Logger.error") as mocked_logger, mock.patch(
847-
"requests.post", return_value=HttpResponseNotFound()
857+
with mock.patch("logging.Logger.warning") as mocked_logger, mock.patch(
858+
"requests.post", return_value=fail_response
848859
):
849860
post_save.send(
850861
instance=vpn_client, sender=vpn_client._meta.model, created=False
851862
)
852863
mocked_logger.assert_called_once_with(
853864
"Failed to update VPN Server configuration. "
854-
f"Response status code: 404, VPN Server UUID: {vpn.pk}"
865+
f"Error: Not Found, VPN Server UUID: {vpn.pk}"
855866
)
856867

857868
def test_vpn_peers_changed(self):
@@ -985,9 +996,12 @@ def test_auto_client(self):
985996
class TestVxlanTransaction(
986997
BaseTestVpn, TestVxlanWireguardVpnMixin, TransactionTestCase
987998
):
999+
mock_response = mock.Mock(spec=requests.Response)
1000+
mock_response.status_code = 200
1001+
mock_response.raise_for_status = mock.Mock()
1002+
9881003
@mock.patch(
989-
"openwisp_controller.config.tasks.requests.post",
990-
return_value=HttpResponse(status=200),
1004+
"openwisp_controller.config.tasks.requests.post", return_value=mock_response
9911005
)
9921006
def test_auto_peer_configuration(self, *args):
9931007
self.assertEqual(IpAddress.objects.count(), 0)
@@ -1460,7 +1474,9 @@ class TestZeroTierTransaction(
14601474
):
14611475
_ZT_SERVICE_REQUESTS = "openwisp_controller.config.api.zerotier_service.requests"
14621476
_ZT_API_TASKS_INFO_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.info"
1463-
_ZT_API_TASKS_WARN_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.warn"
1477+
_ZT_API_TASKS_WARN_LOGGER = (
1478+
"openwisp_controller.config.tasks_zerotier.logger.warning"
1479+
)
14641480
_ZT_API_TASKS_ERR_LOGGER = "openwisp_controller.config.tasks_zerotier.logger.error"
14651481
# As the locmem cache does not support the redis backend cache.keys() method
14661482
_ZT_API_TASKS_LOCMEM_CACHE_KEYS = f"{settings.CACHES['default']['BACKEND']}.keys"

openwisp_controller/config/utils.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import logging
2+
import time
23

4+
from django.core.cache import cache
35
from django.core.exceptions import ValidationError
46
from django.db.models import Q
57
from django.http import Http404, HttpResponse
68
from django.shortcuts import get_object_or_404 as base_get_object_or_404
79
from django.urls import path, re_path
10+
from django.utils.translation import gettext_lazy as _
11+
from openwisp_notifications.signals import notify
812
from openwisp_notifications.utils import _get_object_link
913

1014
logger = logging.getLogger(__name__)
@@ -206,3 +210,62 @@ def get_default_templates_queryset(
206210
def get_config_error_notification_target_url(obj, field, absolute_url=True):
207211
url = _get_object_link(obj._related_object(field), absolute_url)
208212
return f"{url}#config-group"
213+
214+
215+
def send_api_task_notification(type, sleep_time=False, **kwargs):
216+
"""
217+
The sleep_time argument is needed to avoid triggering the toast
218+
notification in the admin while the page is reloading.
219+
"""
220+
if sleep_time:
221+
time.sleep(sleep_time)
222+
vpn = kwargs.get("instance")
223+
action = kwargs.get("action", "").replace("_", " ")
224+
exception = kwargs.get("exception")
225+
message_map = {
226+
"error": {
227+
"verb": _("encountered an unrecoverable error"),
228+
"message": _(
229+
"Unable to perform {action} operation on the "
230+
"{target} VPN server due to an "
231+
"unrecoverable error "
232+
"({error_type})"
233+
),
234+
"level": "error",
235+
},
236+
"success": {
237+
"verb": _("has been completed successfully"),
238+
"message": _("The {action} operation on {target} {verb}."),
239+
"level": "info",
240+
},
241+
}
242+
meta = message_map[type]
243+
notify.send(
244+
sender=vpn,
245+
target=vpn,
246+
type="generic_message",
247+
action_object=vpn,
248+
verb=meta["verb"],
249+
message=meta["message"].format(
250+
action=action,
251+
target=str(vpn),
252+
error_type=exception.__class__.__name__ if exception else "",
253+
verb=meta["verb"],
254+
),
255+
description=str(exception) if exception else "",
256+
level=meta["level"],
257+
)
258+
259+
260+
def handle_recovery_notification(task_key, sleep_time=False, **kwargs):
261+
task_result = cache.get(task_key)
262+
if task_result == "error":
263+
send_api_task_notification("success", sleep_time=sleep_time, **kwargs)
264+
cache.set(task_key, "success", timeout=None)
265+
266+
267+
def handle_error_notification(task_key, sleep_time=False, **kwargs):
268+
cached_value = cache.get(task_key)
269+
if cached_value != "error":
270+
cache.set(task_key, "error", timeout=None)
271+
send_api_task_notification("error", sleep_time=sleep_time, **kwargs)

0 commit comments

Comments
 (0)