diff --git a/autogen/beta/tools/shell/environment/base.py b/autogen/beta/tools/shell/environment/base.py index 254c28a4884..acc7733ca91 100644 --- a/autogen/beta/tools/shell/environment/base.py +++ b/autogen/beta/tools/shell/environment/base.py @@ -45,6 +45,9 @@ ) +_SHELL_OPERATORS = (">", ">>", "|", ";", "&&", "||", "`") + + def matches(pattern: str, command: str) -> bool: """Return True if *command* starts with *pattern* as a whole word or prefix. @@ -58,6 +61,14 @@ def matches(pattern: str, command: str) -> bool: return rest == "" or rest[0] == " " +def contains_shell_operator(command: str) -> bool: + """Return True if *command* contains shell operators that could bypass + the allowed-command whitelist (redirection, pipes, chaining, or + backtick substitution). + """ + return any(op in command for op in _SHELL_OPERATORS) + + def check_ignore(command: str, workdir: Path, patterns: list[str]) -> str | None: """Return ``"Access denied: "`` if any literal path in *command* matches *patterns*. diff --git a/autogen/beta/tools/shell/environment/local.py b/autogen/beta/tools/shell/environment/local.py index 01dba032782..c25977c2edc 100644 --- a/autogen/beta/tools/shell/environment/local.py +++ b/autogen/beta/tools/shell/environment/local.py @@ -9,7 +9,7 @@ import tempfile from pathlib import Path -from .base import READONLY_COMMANDS, ShellEnvironment, check_ignore, matches +from .base import READONLY_COMMANDS, ShellEnvironment, check_ignore, contains_shell_operator, matches class LocalShellEnvironment(ShellEnvironment): @@ -119,8 +119,11 @@ def run(self, command: str) -> str: Applies allowed/blocked filtering, ignore-pattern checks, then runs the command via :func:`subprocess.run`. """ - if self._allowed is not None and not any(matches(p, command) for p in self._allowed): - return f"Command not allowed: {command!r}" + if self._allowed is not None: + if not any(matches(p, command) for p in self._allowed): + return f"Command not allowed: {command!r}" + if contains_shell_operator(command): + return f"Command not allowed (shell operators are not permitted in restricted mode): {command!r}" if self._blocked is not None and any(matches(p, command) for p in self._blocked): return f"Command not allowed: {command!r}" diff --git a/test/beta/tools/test_local_shell.py b/test/beta/tools/test_local_shell.py index dc26b308d62..14ba0f53761 100644 --- a/test/beta/tools/test_local_shell.py +++ b/test/beta/tools/test_local_shell.py @@ -128,13 +128,24 @@ def _make_config(self, command: str, final_reply: str = "done") -> TestConfig: @pytest.mark.asyncio async def test_allowed_permits_matching_command(self, tmp_path: Path) -> None: + output = tmp_path / "out.txt" + shell = LocalShellTool(environment=LocalShellEnvironment(path=tmp_path, allowed=["touch"])) + # "touch" is allowed and the command contains no shell operators — + # the file MUST be created. Mirrors test_allowed_blocks_non_matching_command, + # which uses the same touch pattern to assert rejection. + agent = Agent("a", config=self._make_config(f"touch {output}"), tools=[shell]) + await agent.ask("run it") + assert output.exists(), "touch was allowed but file was not created" + + @pytest.mark.asyncio + async def test_allowed_blocks_shell_redirect_bypass(self, tmp_path: Path) -> None: output = tmp_path / "out.txt" shell = LocalShellTool(environment=LocalShellEnvironment(path=tmp_path, allowed=["echo"])) + # Even though "echo" is allowed, shell redirection (`>`) must be blocked + # to prevent bypassing the whitelist by writing to arbitrary files. agent = Agent("a", config=self._make_config(f"echo hello > {output}"), tools=[shell]) await agent.ask("run it") - # Command was allowed — file must exist with expected content - assert output.exists(), "echo was allowed but file was not created" - assert output.read_text().strip() == "hello" + assert not output.exists(), "shell redirect bypass was not blocked" @pytest.mark.asyncio async def test_allowed_blocks_non_matching_command(self, tmp_path: Path) -> None: