Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 133 additions & 25 deletions agent_sdks/python/src/a2ui/core/schema/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,56 +272,164 @@ def validate(
root_id: Optional[str] = None,
strict_integrity: bool = True,
) -> None:
"""Validates an A2UI messages against the schema.

Args:
a2ui_json: The A2UI message(s) to validate.
root_id: Optional root component ID.
strict_integrity: If True, performs full topology and integrity checks.
If False, only performs schema validation and basic syntax checks.
"""
"""Validates an A2UI messages against the schema."""
messages = a2ui_json if isinstance(a2ui_json, list) else [a2ui_json]

# Basic schema validation
errors = list(self._validator.iter_errors(messages))
if errors:
error = errors[0]
msg = f"Validation failed: {error.message}"
if error.context:
msg += "\nContext failures:"
for sub_error in error.context:
msg += f"\n - {sub_error.message}"
if self.version == VERSION_0_9:
self._validate_0_9_custom(messages, root_id, strict_integrity)
else:
# Fallback to old behavior for v0.8
errors = list(self._validator.iter_errors(messages))
if errors:
error = errors[0]
msg = f"Validation failed: {error.message}"
if error.context:
msg += "\nContext failures:"
for sub_error in error.context:
msg += f"\n - {sub_error.message}"
raise ValueError(msg)

for message in messages:
if not isinstance(message, dict):
continue

components = None
surface_id = None
if "surfaceUpdate" in message: # v0.8
components = message["surfaceUpdate"].get(COMPONENTS)
surface_id = message["surfaceUpdate"].get("surfaceId")

if components:
ref_map = extract_component_ref_fields(self._catalog)
root_id = _find_root_id(messages, surface_id)
_validate_component_integrity(
root_id, components, ref_map, skip_root_check=not strict_integrity
)
analyze_topology(
root_id, components, ref_map, raise_on_orphans=strict_integrity
)

_validate_recursion_and_paths(message)

def _validate_0_9_custom(
self,
messages: List[Dict[str, Any]],
root_id: Optional[str] = None,
strict_integrity: bool = True,
) -> None:
all_errors = []
for idx, message in enumerate(messages):
if not isinstance(message, dict):
all_errors.append(f"messages[{idx}]: Is not an object")
continue

if "createSurface" in message:
val = self._get_sub_validator("CreateSurfaceMessage")
all_errors.extend(self._get_formatted_errors(val, message, f"messages[{idx}]"))
elif "updateComponents" in message:
all_errors.extend(self._get_update_components_errors(message, f"messages[{idx}]"))
elif "updateDataModel" in message:
val = self._get_sub_validator("UpdateDataModelMessage")
all_errors.extend(self._get_formatted_errors(val, message, f"messages[{idx}]"))
else:
keys = list(message.keys())
all_errors.append(f"messages[{idx}]: Unknown message type with keys {keys}")

if all_errors:
msg = "Validation failed:\n" + "\n".join(f" - {err}" for err in all_errors)
raise ValueError(msg)

# Integrity checks
for message in messages:
if not isinstance(message, dict):
continue

components = None
surface_id = None
if "surfaceUpdate" in message: # v0.8
components = message["surfaceUpdate"].get(COMPONENTS)
surface_id = message["surfaceUpdate"].get("surfaceId")
elif "updateComponents" in message and isinstance(
if "updateComponents" in message and isinstance(
message["updateComponents"], dict
): # v0.9
):
components = message["updateComponents"].get(COMPONENTS)
surface_id = message["updateComponents"].get("surfaceId")

if components:
ref_map = extract_component_ref_fields(self._catalog)
root_id = _find_root_id(messages, surface_id)
# Always check for basic integrity (duplicates)
_validate_component_integrity(
root_id, components, ref_map, skip_root_check=not strict_integrity
)
# Always check topology (cycles), but only raise on orphans if strict_integrity is True
analyze_topology(
root_id, components, ref_map, raise_on_orphans=strict_integrity
)

_validate_recursion_and_paths(message)
Comment on lines 348 to 369
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block for performing integrity and topology checks is nearly identical to the logic in the validate method (lines 292-312). To improve maintainability and reduce duplication, consider refactoring this into a shared helper method that handles component extraction and validation for both protocol versions.


def _get_sub_validator(self, def_name: str) -> Draft202012Validator:
sub_schema = self._catalog.s2c_schema["$defs"][def_name]
return Draft202012Validator(sub_schema, registry=self._validator._registry)

def _get_formatted_errors(self, validator: Draft202012Validator, instance: Any, base_path: str) -> List[str]:
errors = list(validator.iter_errors(instance))
formatted = []
for err in errors:
path_str = ".".join(str(p) for p in err.path)
full_path = f"{base_path}.{path_str}" if path_str else base_path

message = err.message
if ("Unevaluated properties are not allowed" in message or "Additional properties are not allowed" in message) and "(" in message and ")" in message:
message = message[message.find("(")+1 : message.rfind(")")]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

String-based parsing of error messages to extract property names is brittle and depends on the internal implementation details of the jsonschema library's error formatting. Consider using a regular expression or inspecting the ValidationError object's attributes for a more robust solution.

Suggested change
if ("Unevaluated properties are not allowed" in message or "Additional properties are not allowed" in message) and "(" in message and ")" in message:
message = message[message.find("(")+1 : message.rfind(")")]
if err.validator in ("additionalProperties", "unevaluatedProperties"):
match = re.search(r"\(([^)]+)\)", message)
if match:
message = match.group(1)


formatted.append(f"{full_path}: {message}")
return formatted

def _get_update_components_errors(self, message: Dict[str, Any], path: str) -> List[str]:
errors = []
if "version" not in message or message["version"] != "v0.9":
errors.append(f"{path}: Invalid version, expected 'v0.9'")

uc = message.get("updateComponents")
if not isinstance(uc, dict):
errors.append(f"{path}: Expected updateComponents to be an object")
return errors

if "surfaceId" not in uc or not isinstance(uc["surfaceId"], str):
errors.append(f"{path}.updateComponents: Invalid or missing surfaceId")

components = uc.get("components")
if not isinstance(components, list):
errors.append(f"{path}.updateComponents: Expected components to be an array")
return errors

for idx, comp in enumerate(components):
comp_id = comp.get("id")
comp_path = f"{path}.updateComponents.components[id='{comp_id}']" if comp_id else f"{path}.updateComponents.components[{idx}]"
errors.extend(self._get_single_component_errors(comp, comp_path))

return errors

def _get_single_component_errors(self, comp: Dict[str, Any], path: str) -> List[str]:
if not isinstance(comp, dict):
return [f"{path}: Component is not an object"]

comp_type = comp.get("component")
if not comp_type:
return [f"{path}: Missing 'component' field"]

catalog = self._catalog.catalog_schema
if not catalog or "components" not in catalog:
return [f"{path}: Catalog schema or components missing"]

comp_schema = catalog["components"].get(comp_type)
if not comp_schema:
return [f"{path}: Unknown component: {comp_type}"]

temp_schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$ref": f"catalog.json#/components/{comp_type}"
}

validator = Draft202012Validator(temp_schema, registry=self._validator._registry)
Comment on lines +447 to +452
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Creating a new Draft202012Validator instance for every component in a loop is inefficient and will cause significant performance degradation for large payloads. These validators should be cached by comp_type.

    cache = self.__dict__.setdefault("_comp_validator_cache", {})
    if comp_type not in cache:
      temp_schema = {
          "$schema": "https://json-schema.org/draft/2020-12/schema",
          "$ref": f"catalog.json#/components/{comp_type}"
      }
      cache[comp_type] = Draft202012Validator(
          temp_schema, registry=self._validator._registry
      )

    validator = cache[comp_type]

return self._get_formatted_errors(validator, comp, path)


def _find_root_id(
messages: List[Dict[str, Any]], surface_id: Optional[str] = None
Expand Down
Loading