From be34ea1344d3e03cf3a6bb0b844f5d5ebd514309 Mon Sep 17 00:00:00 2001 From: Rob Jarawan <32302742+robjarawan@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:01:48 -0400 Subject: [PATCH 1/3] Fix rabbitmq_admin __main__ using stdlib urlparse for percent-encoded passwords After commit 72a8d193 removed unquote() calls from exec_rabbitmqadmin, the function relies on UrlParseResult.password to return the decoded value. The __main__ block used urllib.parse.urlparse() (plain ParseResult) which does not decode percent-encoding, so passwords like 'pass%23word' were passed verbatim to rabbitmqadmin instead of 'pass#word', breaking auth for anyone invoking the script directly with an encoded password. Switch __main__ to _urlparse() so url.password returns the decoded value consistently with the non-main code paths. Add three tests: two confirming decoded passwords reach the subprocess, one pinning the stdlib urlparse behaviour to document why _urlparse is needed. --- sarracenia/rabbitmq_admin.py | 3 +- tests/sarracenia/rabbitmq_admin_test.py | 48 +++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sarracenia/rabbitmq_admin.py b/sarracenia/rabbitmq_admin.py index ed25201d1..997742932 100755 --- a/sarracenia/rabbitmq_admin.py +++ b/sarracenia/rabbitmq_admin.py @@ -291,7 +291,8 @@ def user_access(url, user): if __name__ == "__main__": - url = urllib.parse.urlparse(sys.argv[1]) + from sarracenia.config.credentials import _urlparse + url = _urlparse(sys.argv[1]) print(exec_rabbitmqadmin(url, "list queue names")[1]) import json diff --git a/tests/sarracenia/rabbitmq_admin_test.py b/tests/sarracenia/rabbitmq_admin_test.py index 0de998cd0..dcf62c149 100644 --- a/tests/sarracenia/rabbitmq_admin_test.py +++ b/tests/sarracenia/rabbitmq_admin_test.py @@ -1,6 +1,50 @@ import pytest from tests.conftest import * -#from unittest.mock import Mock +from unittest.mock import patch, MagicMock import sarracenia.config -import sarracenia.rabbitmq_admin \ No newline at end of file +import sarracenia.rabbitmq_admin +from sarracenia.rabbitmq_admin import exec_rabbitmqadmin +from sarracenia.config.credentials import _urlparse + + +class Test_ExecRabbitmqadmin: + """Regression tests for PR #989 / #1677 credential handling in rabbitmq_admin.""" + + def test_plain_password_reaches_command(self): + """Plain password is passed to exec_rabbitmqadmin without modification.""" + url = _urlparse('amqp://admin:secret@localhost/') + with patch('sarracenia.rabbitmq_admin.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout=b'[]') + exec_rabbitmqadmin(url, 'list exchanges name') + call_args = mock_run.call_args[0][0] + assert 'secret' in call_args + + def test_encoded_hash_password_decoded_for_command(self): + """Password with '%23' must be decoded to '#' before being passed to rabbitmqadmin. + + Regression: exec_rabbitmqadmin used urllib.parse.urlparse() in __main__ + (plain ParseResult) and then dropped the unquote() call, so percent-encoded + passwords were passed verbatim ('pass%23word') instead of decoded ('pass#word'). + """ + url = _urlparse('amqp://admin:pass%23word@localhost/') + assert url.password == 'pass#word', "UrlParseResult must decode %23 to #" + + with patch('sarracenia.rabbitmq_admin.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout=b'[]') + exec_rabbitmqadmin(url, 'list exchanges name') + call_args = mock_run.call_args[0][0] + # Decoded password must appear; encoded form must not + assert 'pass#word' in ' '.join(call_args) or 'pass#word' in call_args + assert 'pass%23word' not in ' '.join(call_args) + + def test_plain_urlparse_would_pass_encoded_password(self): + """Demonstrates the bug: stdlib urlparse returns encoded password from url.password. + + This test documents why the __main__ block must use _urlparse, not urlparse. + Plain ParseResult.password does NOT decode percent-encoding. + """ + import urllib.parse + url_broken = urllib.parse.urlparse('amqp://admin:pass%23word@localhost/') + assert url_broken.password == 'pass%23word', \ + "stdlib urlparse must NOT decode — confirms why _urlparse is needed in __main__" From e9b0037e0f52cb1d9ff6c0430ece4f629bbc7632 Mon Sep 17 00:00:00 2001 From: Rob Jarawan <32302742+robjarawan@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:04:18 -0400 Subject: [PATCH 2/3] Fix shell injection and password corruption in exec_rabbitmqadmin Two bugs in exec_rabbitmqadmin: 1. The legacy (Python < 3.5) path used subprocess.getstatusoutput() which runs with shell=True. url.hostname, url.username, url.password, and options were all string-concatenated into the shell command. A password containing ';', '$(', or backticks executed arbitrary commands. 2. The modern path did command.replace("'", "").split(): this stripped ALL single quotes from the entire command string (including those inside the options value) then split on whitespace. A password with a single quote was silently corrupted; a password or option value with a space was split into multiple arguments. Fix: build a list from the start and pass it to subprocess.run() directly. The password is always a standalone list element, never shell-parsed. shlex.split() is used only for the options string (operator-controlled values like "list exchanges name" or "declare user name=X"). Drop the Python < 3.5 code path; Sarracenia requires Python 3.6+. Add 7 tests covering: list-based dispatch, decoded passwords, passwords with single quotes, passwords with shell metacharacters, option tokenisation, AMQPS flag injection, and the stdlib-urlparse regression. --- sarracenia/rabbitmq_admin.py | 70 +++++++++-------------- tests/sarracenia/rabbitmq_admin_test.py | 76 ++++++++++++++++++------- 2 files changed, 83 insertions(+), 63 deletions(-) diff --git a/sarracenia/rabbitmq_admin.py b/sarracenia/rabbitmq_admin.py index 997742932..10de7bdb6 100755 --- a/sarracenia/rabbitmq_admin.py +++ b/sarracenia/rabbitmq_admin.py @@ -3,6 +3,7 @@ rabbitmq administration bindings, to allow sr to invoke broker management functions. """ +import shlex import sys import urllib, urllib.parse import base64 @@ -28,51 +29,34 @@ def exec_rabbitmqadmin(url, options, simulate=False): """ invoke rabbitmqadmin using a sub-process, with the given options. """ + cmdlst = [ + rabbitmqadmin, + '--host', url.hostname, + '--user', url.username, + '-p', url.password, + '--format', 'raw_json', + ] + if url.scheme == 'amqps': + cmdlst += ['--ssl', '--port=15671'] + cmdlst += shlex.split(options) + + logger.debug('exec_rabbitmqadmin host=%s options=%s', url.hostname, options) + + if simulate: + print(f"dry_run: {' '.join(shlex.quote(a) for a in cmdlst)}") + return 0, None try: - command = rabbitmqadmin - command += ' --host \'' + url.hostname - command += '\' --user \'' + url.username - command += '\' -p \'' + url.password - command += '\' --format raw_json ' - if url.scheme == 'amqps': - command += ' --ssl --port=15671 ' - command += ' ' + options - - logger.debug('command = %s', command) - if sys.version_info.major < 3 or (sys.version_info.major == 3 - and sys.version_info.minor < 5): - if logger: logger.debug("using subprocess.getstatusoutput") - - if simulate: - print(f"dry_run: {' '.join(command)}") - return 0, None - - return subprocess.getstatusoutput(command) - else: - cmdlin = command.replace("'", '') - cmdlst = cmdlin.split() - if logger: - logger.debug('using subprocess.run cmdlst=%s', ' '.join(cmdlst)) - - if simulate: - print(f"dry_run: {cmdlin}") - return 0, None - - rclass = subprocess.run(cmdlst, stdout=subprocess.PIPE) - if rclass.returncode == 0: - output = rclass.stdout - if type(output) == bytes: output = output.decode("utf-8") - return rclass.returncode, output - return rclass.returncode, None - except: - if sys.version_info.major < 3 or (sys.version_info.major == 3 - and sys.version_info.minor < 5): - if logger: logger.error( f"trying run command {command}" ) - else: - if logger: - logger.error( f"trying run command {' '.join(cmdlst)}" ) - if logger: logger.debug('Exception details:', exc_info=True) + rclass = subprocess.run(cmdlst, stdout=subprocess.PIPE) + if rclass.returncode == 0: + output = rclass.stdout + if isinstance(output, bytes): + output = output.decode("utf-8") + return rclass.returncode, output + return rclass.returncode, None + except Exception: + logger.error('exec_rabbitmqadmin failed for host=%s options=%s', url.hostname, options) + logger.debug('Exception details:', exc_info=True) return 0, None diff --git a/tests/sarracenia/rabbitmq_admin_test.py b/tests/sarracenia/rabbitmq_admin_test.py index dcf62c149..77f672904 100644 --- a/tests/sarracenia/rabbitmq_admin_test.py +++ b/tests/sarracenia/rabbitmq_admin_test.py @@ -11,32 +11,68 @@ class Test_ExecRabbitmqadmin: """Regression tests for PR #989 / #1677 credential handling in rabbitmq_admin.""" - def test_plain_password_reaches_command(self): - """Plain password is passed to exec_rabbitmqadmin without modification.""" - url = _urlparse('amqp://admin:secret@localhost/') + def _run_and_get_cmdlst(self, url, options='list exchanges name'): with patch('sarracenia.rabbitmq_admin.subprocess.run') as mock_run: mock_run.return_value = MagicMock(returncode=0, stdout=b'[]') - exec_rabbitmqadmin(url, 'list exchanges name') - call_args = mock_run.call_args[0][0] - assert 'secret' in call_args + exec_rabbitmqadmin(url, options) + return mock_run.call_args[0][0] - def test_encoded_hash_password_decoded_for_command(self): - """Password with '%23' must be decoded to '#' before being passed to rabbitmqadmin. + def test_password_passed_as_separate_argument(self): + """Password must be a standalone element in the argument list, not shell-interpolated. - Regression: exec_rabbitmqadmin used urllib.parse.urlparse() in __main__ - (plain ParseResult) and then dropped the unquote() call, so percent-encoded - passwords were passed verbatim ('pass%23word') instead of decoded ('pass#word'). + The old implementation built a shell command string and used getstatusoutput + (shell=True) or replace()+split(), both of which allowed injection or corruption. + The fix uses subprocess.run with a list — password is never shell-parsed. """ + url = _urlparse('amqp://admin:secret@localhost/') + cmdlst = self._run_and_get_cmdlst(url) + assert isinstance(cmdlst, list), "subprocess.run must receive a list, not a string" + # '-p' followed immediately by the password as its own element + assert '-p' in cmdlst + password_index = cmdlst.index('-p') + 1 + assert cmdlst[password_index] == 'secret' + + def test_encoded_hash_password_decoded_for_command(self): + """Password '%23' must be decoded to '#' as a separate argument.""" url = _urlparse('amqp://admin:pass%23word@localhost/') - assert url.password == 'pass#word', "UrlParseResult must decode %23 to #" + cmdlst = self._run_and_get_cmdlst(url) + password_index = cmdlst.index('-p') + 1 + assert cmdlst[password_index] == 'pass#word' + assert 'pass%23word' not in cmdlst - with patch('sarracenia.rabbitmq_admin.subprocess.run') as mock_run: - mock_run.return_value = MagicMock(returncode=0, stdout=b'[]') - exec_rabbitmqadmin(url, 'list exchanges name') - call_args = mock_run.call_args[0][0] - # Decoded password must appear; encoded form must not - assert 'pass#word' in ' '.join(call_args) or 'pass#word' in call_args - assert 'pass%23word' not in ' '.join(call_args) + def test_password_with_single_quote_not_corrupted(self): + """Password containing a single quote must not be stripped or split. + + The old replace("'","").split() approach silently dropped the quote, + causing authentication to fail for any password containing "'". + """ + url = _urlparse("amqp://admin:it's@localhost/") + cmdlst = self._run_and_get_cmdlst(url) + password_index = cmdlst.index('-p') + 1 + assert cmdlst[password_index] == "it's" + + def test_no_shell_injection_via_password(self): + """A password containing shell metacharacters is passed as a single safe argument.""" + url = _urlparse('amqp://admin:secret;id@localhost/') + cmdlst = self._run_and_get_cmdlst(url) + assert isinstance(cmdlst, list) + password_index = cmdlst.index('-p') + 1 + assert cmdlst[password_index] == 'secret;id' + + def test_options_tokenised_correctly(self): + """Options string is split into tokens, not concatenated into a shell string.""" + url = _urlparse('amqp://admin:secret@localhost/') + cmdlst = self._run_and_get_cmdlst(url, 'list exchanges name') + assert 'list' in cmdlst + assert 'exchanges' in cmdlst + assert 'name' in cmdlst + + def test_amqps_adds_ssl_flag(self): + """AMQPS scheme adds --ssl and --port=15671 to the argument list.""" + url = _urlparse('amqps://admin:secret@broker.example.com/') + cmdlst = self._run_and_get_cmdlst(url) + assert '--ssl' in cmdlst + assert '--port=15671' in cmdlst def test_plain_urlparse_would_pass_encoded_password(self): """Demonstrates the bug: stdlib urlparse returns encoded password from url.password. @@ -47,4 +83,4 @@ def test_plain_urlparse_would_pass_encoded_password(self): import urllib.parse url_broken = urllib.parse.urlparse('amqp://admin:pass%23word@localhost/') assert url_broken.password == 'pass%23word', \ - "stdlib urlparse must NOT decode — confirms why _urlparse is needed in __main__" + "stdlib urlparse must NOT decode -- confirms why _urlparse is needed in __main__" From 04d77371421844035ad807ceabc7b6d3fea7faaa Mon Sep 17 00:00:00 2001 From: Rob Jarawan <32302742+robjarawan@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:07:32 -0400 Subject: [PATCH 3/3] Replace eval() with json.loads() in run_rabbitmqadmin eval() on broker-supplied output is a remote code execution vector: a compromised broker or network MITM can return a Python expression instead of JSON and execute arbitrary code in the sr3 process. exec_rabbitmqadmin already passes --format raw_json, so the output is always a JSON array. json.loads() is the correct parser; it raises JSONDecodeError on non-JSON input rather than executing it. Also: remove the dead duplicate None/empty check above the parse block (already handled by the earlier guard), remove the now-redundant local 'import json' from the __main__ block, and promote json to a module-level import. Bare except: in run_rabbitmqadmin tightened to except Exception:. Add 4 tests: valid JSON round-trip, malicious eval payload safely rejected, non-JSON error text returns [], and failed subprocess exit code returns []. --- sarracenia/rabbitmq_admin.py | 19 +++++------- tests/sarracenia/rabbitmq_admin_test.py | 39 ++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/sarracenia/rabbitmq_admin.py b/sarracenia/rabbitmq_admin.py index 10de7bdb6..5ed6bf081 100755 --- a/sarracenia/rabbitmq_admin.py +++ b/sarracenia/rabbitmq_admin.py @@ -3,6 +3,7 @@ rabbitmq administration bindings, to allow sr to invoke broker management functions. """ +import json import shlex import sys import urllib, urllib.parse @@ -279,8 +280,6 @@ def user_access(url, user): url = _urlparse(sys.argv[1]) print(exec_rabbitmqadmin(url, "list queue names")[1]) - import json - lex = list( map(lambda x: x['name'], json.loads(exec_rabbitmqadmin(url, "list exchanges name")[1]))) @@ -309,17 +308,13 @@ def run_rabbitmqadmin(url, options, simulate=False): logger.error("run_rabbitmqadmin invocation failed") return [] - if answer == None or len(answer) == 0: return [] - - lst = [] try: - lst = eval(answer) - except: - pass - - return lst + return json.loads(answer) + except json.JSONDecodeError: + logger.error('run_rabbitmqadmin: non-JSON response from rabbitmqadmin: %s', answer[:200]) + return [] - except: - logger.error(f"sr_rabbit/run_rabbitmqadmin failed with option '{options}'") + except Exception: + logger.error("sr_rabbit/run_rabbitmqadmin failed with option '%s'", options) logger.debug('Exception details: ', exc_info=True) return [] diff --git a/tests/sarracenia/rabbitmq_admin_test.py b/tests/sarracenia/rabbitmq_admin_test.py index 77f672904..c5c7dd9bf 100644 --- a/tests/sarracenia/rabbitmq_admin_test.py +++ b/tests/sarracenia/rabbitmq_admin_test.py @@ -4,7 +4,7 @@ import sarracenia.config import sarracenia.rabbitmq_admin -from sarracenia.rabbitmq_admin import exec_rabbitmqadmin +from sarracenia.rabbitmq_admin import exec_rabbitmqadmin, run_rabbitmqadmin from sarracenia.config.credentials import _urlparse @@ -74,6 +74,43 @@ def test_amqps_adds_ssl_flag(self): assert '--ssl' in cmdlst assert '--port=15671' in cmdlst +class Test_RunRabbitmqadmin: + """Tests for run_rabbitmqadmin JSON parsing (eval -> json.loads fix).""" + + def _run_with_response(self, stdout_bytes, returncode=0): + url = _urlparse('amqp://admin:secret@localhost/') + with patch('sarracenia.rabbitmq_admin.subprocess.run') as mock_run: + mock_run.return_value = MagicMock(returncode=returncode, stdout=stdout_bytes) + return run_rabbitmqadmin(url, 'list exchanges name') + + def test_valid_json_parsed_to_list(self): + """Valid JSON array from rabbitmqadmin is returned as a Python list.""" + result = self._run_with_response(b'[{"name": "xpublic"}, {"name": "xs_user"}]') + assert result == [{'name': 'xpublic'}, {'name': 'xs_user'}] + + def test_malicious_eval_payload_not_executed(self): + """A response that would execute code under eval() is safe under json.loads(). + + eval('[x for x in ().__class__.__bases__[0].__subclasses__()]') would + return a list of all Python classes — a code-execution gadget. + json.loads() raises JSONDecodeError instead. + """ + malicious = b'[x for x in ().__class__.__bases__[0].__subclasses__()]' + result = self._run_with_response(malicious) + assert result == [] + + def test_non_json_response_returns_empty_list(self): + """Non-JSON output (e.g. rabbitmqadmin error text) returns [] without raising.""" + result = self._run_with_response(b'Error: blah blah blah') + assert result == [] + + def test_failed_subprocess_returns_empty_list(self): + """Non-zero exit code from rabbitmqadmin returns [] without raising.""" + result = self._run_with_response(b'', returncode=1) + assert result == [] + + +class Test_StdlibUrlparseRegression: def test_plain_urlparse_would_pass_encoded_password(self): """Demonstrates the bug: stdlib urlparse returns encoded password from url.password.