Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions autogen/beta/tools/shell/environment/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
)


_SHELL_OPERATORS = (">", ">>", "|", ";", "&&", "||", "`")


def matches(pattern: str, command: str) -> bool:
"""Return True if *command* starts with *pattern* as a whole word or prefix.

Expand All @@ -58,6 +61,14 @@ def matches(pattern: str, command: str) -> bool:
return rest == "" or rest[0] == " "


def contains_shell_operator(command: str) -> bool:
"""Return True if *command* contains shell operators that could bypass
the allowed-command whitelist (redirection, pipes, chaining, or
backtick substitution).
"""
return any(op in command for op in _SHELL_OPERATORS)


def check_ignore(command: str, workdir: Path, patterns: list[str]) -> str | None:
"""Return ``"Access denied: <path>"`` if any literal path in *command* matches *patterns*.

Expand Down
9 changes: 6 additions & 3 deletions autogen/beta/tools/shell/environment/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import tempfile
from pathlib import Path

from .base import READONLY_COMMANDS, ShellEnvironment, check_ignore, matches
from .base import READONLY_COMMANDS, ShellEnvironment, check_ignore, contains_shell_operator, matches


class LocalShellEnvironment(ShellEnvironment):
Expand Down Expand Up @@ -119,8 +119,11 @@ def run(self, command: str) -> str:
Applies allowed/blocked filtering, ignore-pattern checks, then runs
the command via :func:`subprocess.run`.
"""
if self._allowed is not None and not any(matches(p, command) for p in self._allowed):
return f"Command not allowed: {command!r}"
if self._allowed is not None:
if not any(matches(p, command) for p in self._allowed):
return f"Command not allowed: {command!r}"
if contains_shell_operator(command):
return f"Command not allowed (shell operators are not permitted in restricted mode): {command!r}"

if self._blocked is not None and any(matches(p, command) for p in self._blocked):
return f"Command not allowed: {command!r}"
Expand Down
17 changes: 14 additions & 3 deletions test/beta/tools/test_local_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,24 @@ def _make_config(self, command: str, final_reply: str = "done") -> TestConfig:

@pytest.mark.asyncio
async def test_allowed_permits_matching_command(self, tmp_path: Path) -> None:
output = tmp_path / "out.txt"
shell = LocalShellTool(environment=LocalShellEnvironment(path=tmp_path, allowed=["touch"]))
# "touch" is allowed and the command contains no shell operators —
# the file MUST be created. Mirrors test_allowed_blocks_non_matching_command,
# which uses the same touch pattern to assert rejection.
agent = Agent("a", config=self._make_config(f"touch {output}"), tools=[shell])
await agent.ask("run it")
assert output.exists(), "touch was allowed but file was not created"

@pytest.mark.asyncio
async def test_allowed_blocks_shell_redirect_bypass(self, tmp_path: Path) -> None:
output = tmp_path / "out.txt"
shell = LocalShellTool(environment=LocalShellEnvironment(path=tmp_path, allowed=["echo"]))
# Even though "echo" is allowed, shell redirection (`>`) must be blocked
# to prevent bypassing the whitelist by writing to arbitrary files.
agent = Agent("a", config=self._make_config(f"echo hello > {output}"), tools=[shell])
await agent.ask("run it")
# Command was allowed — file must exist with expected content
assert output.exists(), "echo was allowed but file was not created"
assert output.read_text().strip() == "hello"
assert not output.exists(), "shell redirect bypass was not blocked"

@pytest.mark.asyncio
async def test_allowed_blocks_non_matching_command(self, tmp_path: Path) -> None:
Expand Down
Loading