-
Notifications
You must be signed in to change notification settings - Fork 1.1k
clean up output text of validator #1077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -272,56 +272,186 @@ def validate( | |
| root_id: Optional[str] = None, | ||
| strict_integrity: bool = True, | ||
| ) -> None: | ||
| """Validates an A2UI messages against the schema. | ||
|
|
||
| Args: | ||
| a2ui_json: The A2UI message(s) to validate. | ||
| root_id: Optional root component ID. | ||
| strict_integrity: If True, performs full topology and integrity checks. | ||
| If False, only performs schema validation and basic syntax checks. | ||
| """ | ||
| """Validates an A2UI messages against the schema.""" | ||
| messages = a2ui_json if isinstance(a2ui_json, list) else [a2ui_json] | ||
|
|
||
| # Basic schema validation | ||
| errors = list(self._validator.iter_errors(messages)) | ||
| if errors: | ||
| error = errors[0] | ||
| msg = f"Validation failed: {error.message}" | ||
| if error.context: | ||
| msg += "\nContext failures:" | ||
| for sub_error in error.context: | ||
| msg += f"\n - {sub_error.message}" | ||
| if self.version == VERSION_0_9: | ||
| self._validate_0_9_custom(messages, root_id, strict_integrity) | ||
| else: | ||
| # Fallback to old behavior for v0.8 | ||
| errors = list(self._validator.iter_errors(messages)) | ||
| if errors: | ||
| error = errors[0] | ||
| msg = f"Validation failed: {error.message}" | ||
| if error.context: | ||
| msg += "\nContext failures:" | ||
| for sub_error in error.context: | ||
| msg += f"\n - {sub_error.message}" | ||
| raise ValueError(msg) | ||
|
|
||
| for message in messages: | ||
| if not isinstance(message, dict): | ||
| continue | ||
|
|
||
| components = None | ||
| surface_id = None | ||
| if "surfaceUpdate" in message: # v0.8 | ||
| components = message["surfaceUpdate"].get(COMPONENTS) | ||
| surface_id = message["surfaceUpdate"].get("surfaceId") | ||
|
|
||
| if components: | ||
| ref_map = extract_component_ref_fields(self._catalog) | ||
| root_id = _find_root_id(messages, surface_id) | ||
| _validate_component_integrity( | ||
| root_id, components, ref_map, skip_root_check=not strict_integrity | ||
| ) | ||
| analyze_topology( | ||
| root_id, components, ref_map, raise_on_orphans=strict_integrity | ||
| ) | ||
|
|
||
| _validate_recursion_and_paths(message) | ||
|
|
||
| def _validate_0_9_custom( | ||
| self, | ||
| messages: List[Dict[str, Any]], | ||
| root_id: Optional[str] = None, | ||
| strict_integrity: bool = True, | ||
| ) -> None: | ||
| all_errors = [] | ||
| for idx, message in enumerate(messages): | ||
| if not isinstance(message, dict): | ||
| all_errors.append(f"messages[{idx}]: Is not an object") | ||
| continue | ||
|
|
||
| if "createSurface" in message: | ||
| val = self._get_sub_validator("CreateSurfaceMessage") | ||
| all_errors.extend(self._get_formatted_errors(val, message, f"messages[{idx}]")) | ||
| elif "updateComponents" in message: | ||
| all_errors.extend( | ||
| self._get_update_components_errors(message, f"messages[{idx}]") | ||
| ) | ||
| elif "updateDataModel" in message: | ||
| val = self._get_sub_validator("UpdateDataModelMessage") | ||
| all_errors.extend(self._get_formatted_errors(val, message, f"messages[{idx}]")) | ||
| elif "deleteSurface" in message: | ||
| val = self._get_sub_validator("DeleteSurfaceMessage") | ||
| all_errors.extend(self._get_formatted_errors(val, message, f"messages[{idx}]")) | ||
| else: | ||
| keys = list(message.keys()) | ||
| all_errors.append(f"messages[{idx}]: Unknown message type with keys {keys}") | ||
|
|
||
| if all_errors: | ||
| msg = "Validation failed:\n" + "\n".join(f" - {err}" for err in all_errors) | ||
| raise ValueError(msg) | ||
|
|
||
| # Integrity checks | ||
| for message in messages: | ||
| if not isinstance(message, dict): | ||
| continue | ||
|
|
||
| components = None | ||
| surface_id = None | ||
| if "surfaceUpdate" in message: # v0.8 | ||
| components = message["surfaceUpdate"].get(COMPONENTS) | ||
| surface_id = message["surfaceUpdate"].get("surfaceId") | ||
| elif "updateComponents" in message and isinstance( | ||
| if "updateComponents" in message and isinstance( | ||
| message["updateComponents"], dict | ||
| ): # v0.9 | ||
| ): | ||
| components = message["updateComponents"].get(COMPONENTS) | ||
| surface_id = message["updateComponents"].get("surfaceId") | ||
|
|
||
| if components: | ||
| ref_map = extract_component_ref_fields(self._catalog) | ||
| root_id = _find_root_id(messages, surface_id) | ||
| # Always check for basic integrity (duplicates) | ||
| _validate_component_integrity( | ||
| root_id, components, ref_map, skip_root_check=not strict_integrity | ||
| ) | ||
| # Always check topology (cycles), but only raise on orphans if strict_integrity is True | ||
| analyze_topology( | ||
| root_id, components, ref_map, raise_on_orphans=strict_integrity | ||
| ) | ||
|
|
||
| _validate_recursion_and_paths(message) | ||
|
Comment on lines
348
to
369
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block for performing integrity and topology checks is nearly identical to the logic in the |
||
|
|
||
| def _get_sub_validator(self, def_name: str) -> Draft202012Validator: | ||
| sub_schema = self._catalog.s2c_schema.get("$defs", {}).get(def_name) | ||
| if not sub_schema: | ||
| raise ValueError(f"Definition {def_name} not found in schema") | ||
| return Draft202012Validator(sub_schema, registry=self._validator._registry) | ||
|
|
||
| def _get_formatted_errors( | ||
| self, validator: Draft202012Validator, instance: Any, base_path: str | ||
| ) -> List[str]: | ||
| errors = list(validator.iter_errors(instance)) | ||
| formatted = [] | ||
| for err in errors: | ||
| path_str = ".".join(str(p) for p in err.path) | ||
| full_path = f"{base_path}.{path_str}" if path_str else base_path | ||
|
|
||
| message = err.message | ||
| if ( | ||
| ( | ||
| "Unevaluated properties are not allowed" in message | ||
| or "Additional properties are not allowed" in message | ||
| ) | ||
| and "(" in message | ||
| and ")" in message | ||
| ): | ||
| message = message[message.find("(") + 1 : message.rfind(")")] | ||
|
|
||
| formatted.append(f"{full_path}: {message}") | ||
| return formatted | ||
|
|
||
| def _get_update_components_errors( | ||
| self, message: Dict[str, Any], path: str | ||
| ) -> List[str]: | ||
| errors = [] | ||
| if "version" not in message or message["version"] != "v0.9": | ||
| errors.append(f"{path}: Invalid version, expected 'v0.9'") | ||
|
|
||
| uc = message.get("updateComponents") | ||
| if not isinstance(uc, dict): | ||
| errors.append(f"{path}: Expected updateComponents to be an object") | ||
| return errors | ||
|
|
||
| if "surfaceId" not in uc or not isinstance(uc["surfaceId"], str): | ||
| errors.append(f"{path}.updateComponents: Invalid or missing surfaceId") | ||
|
|
||
| components = uc.get("components") | ||
| if not isinstance(components, list): | ||
| errors.append(f"{path}.updateComponents: Expected components to be an array") | ||
| return errors | ||
|
|
||
| for idx, comp in enumerate(components): | ||
| comp_id = comp.get("id") | ||
| comp_path = ( | ||
| f"{path}.updateComponents.components[id='{comp_id}']" | ||
| if comp_id | ||
| else f"{path}.updateComponents.components[{idx}]" | ||
| ) | ||
| errors.extend(self._get_single_component_errors(comp, comp_path)) | ||
|
|
||
| return errors | ||
|
|
||
| def _get_single_component_errors(self, comp: Dict[str, Any], path: str) -> List[str]: | ||
| if not isinstance(comp, dict): | ||
| return [f"{path}: Component is not an object"] | ||
|
|
||
| comp_type = comp.get("component") | ||
| if not comp_type: | ||
| return [f"{path}: Missing 'component' field"] | ||
|
|
||
| catalog = self._catalog.catalog_schema | ||
| if not catalog or "components" not in catalog: | ||
| return [f"{path}: Catalog schema or components missing"] | ||
|
|
||
| comp_schema = catalog["components"].get(comp_type) | ||
| if not comp_schema: | ||
| return [f"{path}: Unknown component: {comp_type}"] | ||
|
|
||
| temp_schema = { | ||
| "$schema": "https://json-schema.org/draft/2020-12/schema", | ||
| "$ref": f"catalog.json#/components/{comp_type}", | ||
| } | ||
|
|
||
| validator = Draft202012Validator(temp_schema, registry=self._validator._registry) | ||
|
Comment on lines
+447
to
+452
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Creating a new cache = self.__dict__.setdefault("_comp_validator_cache", {})
if comp_type not in cache:
temp_schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$ref": f"catalog.json#/components/{comp_type}"
}
cache[comp_type] = Draft202012Validator(
temp_schema, registry=self._validator._registry
)
validator = cache[comp_type] |
||
| return self._get_formatted_errors(validator, comp, path) | ||
|
|
||
|
|
||
| def _find_root_id( | ||
| messages: List[Dict[str, Any]], surface_id: Optional[str] = None | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.