Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions libs/python/agent/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
Expand All @@ -26,6 +27,7 @@
import litellm.utils
from core.telemetry import is_telemetry_enabled, record_event
from litellm.responses.utils import Usage
from pydantic import BaseModel

from .adapters import (
AzureMLAdapter,
Expand Down Expand Up @@ -254,6 +256,28 @@ async def _predict_step_with_retry(
raise last_exc # unreachable, but satisfies type checkers


def _pydantic_model_to_response_format(output_type: Type[BaseModel]) -> dict:
"""Convert a Pydantic BaseModel class to a response_format dict for LLM APIs."""
return {
"type": "json_schema",
"json_schema": {
"name": output_type.__name__,
"schema": output_type.model_json_schema(),
},
}


def _extract_text_from_output_item(item: dict) -> Optional[str]:
"""Extract text content from a ResponseOutputMessageParam item."""
content = item.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "output_text":
return block.get("text", "")
elif hasattr(block, "type") and getattr(block, "type", None) == "output_text":
return getattr(block, "text", "")
return None


class ComputerAgent:
"""
Main agent class that automatically selects the appropriate agent loop
Expand All @@ -278,6 +302,7 @@ def __init__(
trust_remote_code: Optional[bool] = False,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
output_type: Optional[Type[BaseModel]] = None,
**additional_generation_kwargs,
):
"""
Expand All @@ -300,6 +325,7 @@ def __init__(
trust_remote_code: If set, trust remote code when loading local models. Disabled by default.
api_key: Optional API key override for the model provider
api_base: Optional API base URL override for the model provider
output_type: Optional Pydantic BaseModel class for structured outputs. When set, the LLM is instructed to return JSON conforming to the schema, and the final text response is parsed into an instance of this model.
**additional_generation_kwargs: Additional arguments passed to the model provider
"""
# If the loop is "human/human", we need to prefix a grounding model fallback
Expand All @@ -322,6 +348,7 @@ def __init__(
self.trust_remote_code = trust_remote_code
self.api_key = api_key
self.api_base = api_base
self.output_type = output_type

# == Add built-in callbacks ==

Expand Down Expand Up @@ -905,6 +932,7 @@ async def run(
stream: bool = False,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
output_type: Optional[Type[BaseModel]] = None,
**additional_generation_kwargs,
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Expand All @@ -915,6 +943,7 @@ async def run(
stream: Whether to stream the response
api_key: Optional API key override for the model provider
api_base: Optional API base URL override for the model provider
output_type: Optional Pydantic BaseModel class for structured outputs. Overrides the constructor-level output_type if provided.
**additional_generation_kwargs: Additional arguments passed to the model provider

Returns:
Expand All @@ -938,6 +967,13 @@ async def run(
if (api_base is not None) or (self.api_base is not None):
merged_kwargs["api_base"] = api_base if api_base is not None else self.api_base

# Resolve structured output type (run-level overrides constructor-level)
effective_output_type = output_type if output_type is not None else self.output_type
if effective_output_type is not None:
merged_kwargs["response_format"] = _pydantic_model_to_response_format(
effective_output_type
)

old_items = self._process_input(messages)
new_items = []

Expand Down Expand Up @@ -1063,6 +1099,24 @@ def contains_image_content(msgs):

await self._on_run_end(loop_kwargs, old_items, new_items)

# Parse structured output from the final assistant message
if effective_output_type is not None and new_items:
for item in reversed(new_items):
if item.get("role") == "assistant":
text_content = _extract_text_from_output_item(item)
if text_content:
parsed = effective_output_type.model_validate_json(text_content)
yield {
"output_parsed": parsed,
"output": [],
"usage": Usage(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
),
}
break
Comment on lines +1102 to +1118
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the main file
find . -type f -name "agent.py" | grep -E "agent/agent\.py$"

Repository: trycua/cua

Length of output: 88


🏁 Script executed:

# Check if the file exists and examine the specific lines
if [ -f "libs/python/agent/agent/agent.py" ]; then
  echo "=== Lines 1102-1118 in agent.py ==="
  sed -n '1102,1118p' "libs/python/agent/agent/agent.py"
  echo -e "\n=== Context: imports at top of file ==="
  head -50 "libs/python/agent/agent/agent.py" | grep -E "^from|^import"
fi

Repository: trycua/cua

Length of output: 1433


🏁 Script executed:

# Also check the gemini.py file around lines 712-713
if [ -f "libs/python/agent/agent/loops/gemini.py" ]; then
  echo "=== Lines 710-715 in gemini.py ==="
  sed -n '710,715p' "libs/python/agent/agent/loops/gemini.py"
fi

Repository: trycua/cua

Length of output: 329


🏁 Script executed:

# Search for model_validate_json usage in the agent.py file
rg "model_validate_json" "libs/python/agent/agent/agent.py" -B 3 -A 3

Repository: trycua/cua

Length of output: 430


🏁 Script executed:

# Check what ValidationError is being imported (if at all)
rg "ValidationError" "libs/python/agent/agent/agent.py"

Repository: trycua/cua

Length of output: 36


🏁 Script executed:

# Check the full context around the problematic code
if [ -f "libs/python/agent/agent/agent.py" ]; then
  echo "=== Broader context (lines 1090-1130) ==="
  sed -n '1090,1130p' "libs/python/agent/agent/agent.py"
fi

Repository: trycua/cua

Length of output: 1717


🏁 Script executed:

# Check what exceptions are imported/used in the file
rg "except|from pydantic|import.*Error" "libs/python/agent/agent/agent.py" | head -20

Repository: trycua/cua

Length of output: 724


🏁 Script executed:

# Check if there's a pattern of error handling elsewhere in the file for model_validate_json
rg "model_validate_json" "libs/python/agent/agent/" -B 2 -A 5

Repository: trycua/cua

Length of output: 735


🏁 Script executed:

# Verify the method signature of _extract_text_from_output_item to understand what it returns
rg "_extract_text_from_output_item" "libs/python/agent/agent/agent.py" -B 2 -A 10

Repository: trycua/cua

Length of output: 1172


🏁 Script executed:

# Let's verify what exception pydantic's model_validate_json raises
python3 << 'EOF'
from pydantic import BaseModel, ValidationError

class TestModel(BaseModel):
    name: str
    age: int

# Try invalid JSON
try:
    TestModel.model_validate_json("not valid json")
except Exception as e:
    print(f"Exception type: {type(e).__name__}")
    print(f"Exception module: {type(e).__module__}")
    print(f"Is ValidationError: {isinstance(e, ValidationError)}")
EOF

Repository: trycua/cua

Length of output: 157


🏁 Script executed:

# Check if there are any tests that verify this behavior
rg "model_validate_json" "libs/python/agent/" --type py -l

Repository: trycua/cua

Length of output: 86


🏁 Script executed:

# Look for any test files that test the structured output parsing
find "libs/python/agent" -type f -name "*test*.py" | xargs grep -l "output_parsed\|effective_output_type" 2>/dev/null

Repository: trycua/cua

Length of output: 38


🏁 Script executed:

# Check the function signature and broader error handling context
rg "async def.*predict" "libs/python/agent/agent/agent.py" -A 100 | head -150

Repository: trycua/cua

Length of output: 6508


🏁 Script executed:

# Look specifically for the function containing lines 1102-1118
sed -n '1050,1130p' "libs/python/agent/agent/agent.py"

Repository: trycua/cua

Length of output: 3191


🏁 Script executed:

# Check if there's a try/except wrapping the structured output parsing
sed -n '1000,1130p' "libs/python/agent/agent/agent.py" | grep -E "try:|except|def "

Repository: trycua/cua

Length of output: 132


🏁 Script executed:

# Verify what specific exception is raised and check imports more carefully
python3 << 'EOF'
from pydantic import ValidationError, BaseModel

class TestModel(BaseModel):
    name: str
    age: int

# Try invalid JSON to see exception details
try:
    TestModel.model_validate_json("not valid json")
except ValidationError as e:
    print(f"ValidationError imported from: pydantic")
    print(f"Exception path: {e.__class__.__module__}.{e.__class__.__name__}")
EOF

Repository: trycua/cua

Length of output: 154


🏁 Script executed:

# Check if ValidationError is imported in agent.py
grep -n "ValidationError" "libs/python/agent/agent/agent.py"

Repository: trycua/cua

Length of output: 36


🏁 Script executed:

# Check what happens when non-JSON text is sent to model_validate_json
python3 << 'EOF'
from pydantic import BaseModel

class TestModel(BaseModel):
    name: str
    age: int

# Try with plain text
try:
    result = TestModel.model_validate_json("this is not json")
except Exception as e:
    print(f"Exception raised: {type(e).__name__}")
    print(f"Can it be caught with 'except ValidationError'? {e.__class__.__name__}")
EOF

Repository: trycua/cua

Length of output: 151


🏁 Script executed:

# Look at the entire function to understand the control flow better
sed -n '980,1120p' "libs/python/agent/agent/agent.py"

Repository: trycua/cua

Length of output: 6016


Guard structured parsing when the final assistant text isn't valid JSON.

This block unconditionally calls model_validate_json(...) without error handling. When a provider like Gemini ignores or drops the schema request (as confirmed in libs/python/agent/agent/loops/gemini.py at lines 712-713), the run fails at the very end with an unhandled ValidationError, even though prior results have already been yielded to the caller. Catch the validation failure and skip parsing when the last assistant message is not valid JSON.

Proposed fix
-from pydantic import BaseModel
+from pydantic import BaseModel, ValidationError
@@
         if effective_output_type is not None and new_items:
             for item in reversed(new_items):
                 if item.get("role") == "assistant":
                     text_content = _extract_text_from_output_item(item)
-                    if text_content:
-                        parsed = effective_output_type.model_validate_json(text_content)
-                        yield {
-                            "output_parsed": parsed,
-                            "output": [],
-                            "usage": Usage(
-                                prompt_tokens=0,
-                                completion_tokens=0,
-                                total_tokens=0,
-                            ),
-                        }
+                    if not text_content:
+                        break
+                    try:
+                        parsed = effective_output_type.model_validate_json(text_content)
+                    except ValidationError:
+                        break
+                    yield {
+                        "output_parsed": parsed,
+                        "output": [],
+                        "usage": Usage(
+                            prompt_tokens=0,
+                            completion_tokens=0,
+                            total_tokens=0,
+                        ),
+                    }
                     break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Parse structured output from the final assistant message
if effective_output_type is not None and new_items:
for item in reversed(new_items):
if item.get("role") == "assistant":
text_content = _extract_text_from_output_item(item)
if text_content:
parsed = effective_output_type.model_validate_json(text_content)
yield {
"output_parsed": parsed,
"output": [],
"usage": Usage(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
),
}
break
# Parse structured output from the final assistant message
if effective_output_type is not None and new_items:
for item in reversed(new_items):
if item.get("role") == "assistant":
text_content = _extract_text_from_output_item(item)
if not text_content:
break
try:
parsed = effective_output_type.model_validate_json(text_content)
except ValidationError:
break
yield {
"output_parsed": parsed,
"output": [],
"usage": Usage(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
),
}
break
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/python/agent/agent/agent.py` around lines 1102 - 1118, The
final-assistant structured parsing calls
effective_output_type.model_validate_json(...) without handling failures; wrap
that call in a try/except so a malformed/non-JSON assistant text does not raise
out of the generator—catch the validation/parsing exception (e.g.,
pydantic.ValidationError or JSON/ValueError) around the model_validate_json call
in the block that iterates reversed(new_items) and, on exception, skip yielding
"output_parsed" for that item (allow the function to continue/return normally);
you can optionally log the error but must not re-raise it so previous yields
remain valid.


async def predict_click(
self, instruction: str, image_b64: Optional[str] = None
) -> Optional[Tuple[int, int]]:
Expand Down
3 changes: 3 additions & 0 deletions libs/python/agent/agent/loops/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,9 @@ async def predict_step(
# Create client with CUA routing support (detects cua/ prefix automatically)
client, model = _create_gemini_client(model, genai, kwargs)

# Pop response_format — structured outputs not yet supported for Gemini
kwargs.pop("response_format", None)

# Extract Gemini 3-specific parameters
# thinking_level: Use types.ThinkingLevel enum values (e.g., "LOW", "HIGH", "MEDIUM", "MINIMAL")
# media_resolution: Use types.MediaResolution enum values (e.g., "MEDIA_RESOLUTION_LOW", "MEDIA_RESOLUTION_HIGH")
Expand Down
10 changes: 10 additions & 0 deletions libs/python/agent/agent/loops/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ async def predict_step(
# Prepare tools for OpenAI API
openai_tools = await _prepare_tools_for_openai(tools, model=model)

# Translate response_format to Responses API text.format parameter
response_format = kwargs.pop("response_format", None)
text_format = None
if response_format is not None:
text_format = {"format": response_format}

# Prepare API call kwargs
api_kwargs = {
"model": model,
Expand All @@ -237,6 +243,10 @@ async def predict_step(
**kwargs,
}

# Add text format for structured outputs if specified
if text_format is not None:
api_kwargs["text"] = text_format

# Call API start hook
if _on_api_start:
await _on_api_start(api_kwargs)
Expand Down
72 changes: 72 additions & 0 deletions libs/python/agent/tests/test_computer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,78 @@ def test_agent_response_type_exists(self):
assert AgentResponse is not None


class TestComputerAgentStructuredOutputs:
"""Test ComputerAgent structured outputs support."""

@patch("agent.agent.litellm")
def test_agent_initialization_with_output_type(self, mock_litellm, disable_telemetry):
"""Test that agent can be initialized with output_type parameter."""
from pydantic import BaseModel

from agent import ComputerAgent

class MyOutput(BaseModel):
title: str
score: int

agent = ComputerAgent(
model="anthropic/claude-sonnet-4-5-20250929",
output_type=MyOutput,
)

assert agent is not None
assert agent.output_type is MyOutput

@patch("agent.agent.litellm")
def test_agent_initialization_without_output_type(self, mock_litellm, disable_telemetry):
"""Test that output_type defaults to None."""
from agent import ComputerAgent

agent = ComputerAgent(model="anthropic/claude-sonnet-4-5-20250929")
assert agent.output_type is None

def test_pydantic_model_to_response_format(self):
"""Test conversion of Pydantic model to response_format dict."""
from pydantic import BaseModel

from agent.agent import _pydantic_model_to_response_format

class TestModel(BaseModel):
name: str
value: int

result = _pydantic_model_to_response_format(TestModel)

assert result["type"] == "json_schema"
assert result["json_schema"]["name"] == "TestModel"
assert "properties" in result["json_schema"]["schema"]
assert "name" in result["json_schema"]["schema"]["properties"]
assert "value" in result["json_schema"]["schema"]["properties"]

def test_extract_text_from_output_item_dict(self):
"""Test extracting text from a dict-based output item."""
from agent.agent import _extract_text_from_output_item

item = {
"role": "assistant",
"type": "message",
"content": [
{"type": "output_text", "text": '{"title": "test", "score": 42}'}
],
}

result = _extract_text_from_output_item(item)
assert result == '{"title": "test", "score": 42}'

def test_extract_text_from_output_item_no_text(self):
"""Test extracting text from an item with no text content."""
from agent.agent import _extract_text_from_output_item

item = {"role": "assistant", "type": "message", "content": []}
result = _extract_text_from_output_item(item)
assert result is None


class TestComputerAgentIntegration:
"""Test ComputerAgent integration with Computer tool (SRP: Integration within package)."""

Expand Down
Loading