|
| 1 | +"""JSON schema support for Odin.""" |
| 2 | +import json |
| 3 | +from typing import Any, Dict, Final, List, Sequence, TextIO, Tuple, Type, Union |
| 4 | + |
| 5 | +import odin |
| 6 | +import odin.validators |
| 7 | +from odin.registration import get_child_resources |
| 8 | +from odin.resources import ResourceBase, ResourceOptions |
| 9 | +from odin.utils import getmeta |
| 10 | + |
| 11 | +SCHEMA_DIALECT: Final[str] = "https://json-schema.org/draft/2020-12/schema" |
| 12 | +FIELD_SCHEMAS = { |
| 13 | + odin.StringField: ("string", {}), |
| 14 | + odin.BooleanField: ("boolean", {}), |
| 15 | + odin.IntegerField: ("integer", {}), |
| 16 | + odin.FloatField: ("number", {}), |
| 17 | + odin.ListField: ("array", {}), |
| 18 | + odin.DictField: ("object", {}), |
| 19 | + odin.DateField: ("string", {"format": "date"}), |
| 20 | + odin.TimeField: ("string", {"format": "time"}), |
| 21 | + odin.DateTimeField: ("string", {"format": "date-time"}), |
| 22 | + odin.EmailField: ("string", {"format": "email"}), |
| 23 | + odin.IPv4Field: ("string", {"format": "ipv4"}), |
| 24 | + odin.IPv6Field: ("string", {"format": "ipv6"}), |
| 25 | + odin.IPv46Field: ("string", {"format": ["ipv4", "ipv6"]}), |
| 26 | + odin.PathField: ("string", {}), |
| 27 | + odin.RegexField: ("string", {"format": "regex"}), |
| 28 | + odin.UrlField: ("string", {"format": "uri"}), |
| 29 | + odin.UUIDField: ("string", {"format": "uuid"}), |
| 30 | +} |
| 31 | +VALIDATOR_SCHEMAS = { |
| 32 | + odin.validators.MaxValueValidator: {}, |
| 33 | + odin.validators.MinValueValidator: {}, |
| 34 | + odin.validators.LengthValidator: {}, |
| 35 | + odin.validators.MaxLengthValidator: {}, |
| 36 | + odin.validators.MinLengthValidator: {}, |
| 37 | +} |
| 38 | + |
| 39 | + |
| 40 | +class JSONSchema: |
| 41 | + """JSON Schema representation of an Odin resource.""" |
| 42 | + |
| 43 | + def __init__( |
| 44 | + self, resource: Type[ResourceBase], *, require_type_field: bool = True |
| 45 | + ): |
| 46 | + self.resource = resource |
| 47 | + self.require_type_field = require_type_field |
| 48 | + |
| 49 | + self.defs = {} |
| 50 | + |
| 51 | + def to_dict(self) -> Dict[str, Any]: |
| 52 | + """Convert the schema to a dictionary.""" |
| 53 | + meta = getmeta(self.resource) |
| 54 | + |
| 55 | + schema = { |
| 56 | + "$schema": SCHEMA_DIALECT, |
| 57 | + "$id": f"urn:jsonschema:{meta.resource_name}", |
| 58 | + } |
| 59 | + schema.update(self._resource_to_schema(meta)) |
| 60 | + schema["$defs"] = self.defs |
| 61 | + |
| 62 | + return schema |
| 63 | + |
| 64 | + def _resource_to_schema(self, meta: ResourceOptions) -> Dict[str, Any]: |
| 65 | + """Convert a resource to a JSON schema.""" |
| 66 | + schema = { |
| 67 | + "type": "object", |
| 68 | + "properties": self._fields_to_properties(meta), |
| 69 | + "required": self._required_fields(meta), |
| 70 | + "additionalProperties": False, |
| 71 | + } |
| 72 | + return schema |
| 73 | + |
| 74 | + def _required_fields(self, meta: ResourceOptions) -> Sequence[str]: |
| 75 | + """Get a list of required fields.""" |
| 76 | + required = [field.name for field in meta.fields if not field.null] |
| 77 | + if self.require_type_field: |
| 78 | + required.append(meta.type_field) |
| 79 | + return required |
| 80 | + |
| 81 | + def _fields_to_properties(self, meta: ResourceOptions) -> Dict[str, Any]: |
| 82 | + """Convert a set of fields to JSON schema properties.""" |
| 83 | + properties = {meta.type_field: {"const": meta.resource_name}} |
| 84 | + for field in meta.fields: |
| 85 | + properties[field.name] = self._field_to_schema(field) |
| 86 | + return properties |
| 87 | + |
| 88 | + def _field_to_schema(self, field: odin.Field) -> Dict[str, Any]: |
| 89 | + """Convert a field to a JSON schema.""" |
| 90 | + if isinstance(field, odin.CompositeField): |
| 91 | + schema = self._composite_field_to_schema(field) |
| 92 | + |
| 93 | + else: |
| 94 | + type_def, extra_schema = self._field_type(field) |
| 95 | + schema = {"type": type_def} |
| 96 | + schema.update(extra_schema) |
| 97 | + |
| 98 | + if field.doc_text: |
| 99 | + schema["description"] = field.doc_text |
| 100 | + if field.choices: |
| 101 | + schema.setdefault("enum", field.choice_values) |
| 102 | + |
| 103 | + return schema |
| 104 | + |
| 105 | + def _field_type( |
| 106 | + self, field: odin.Field |
| 107 | + ) -> Tuple[Union[str, List[str]], Dict[str, Any]]: |
| 108 | + """Get the type of a field.""" |
| 109 | + |
| 110 | + field_type = type(field) |
| 111 | + if field_type in FIELD_SCHEMAS: |
| 112 | + type_name, schema = FIELD_SCHEMAS[field_type] |
| 113 | + |
| 114 | + elif isinstance(field, odin.EnumField): |
| 115 | + type_name = "string" |
| 116 | + schema = {"enum": tuple(str(item.value) for item in field.enum_type)} |
| 117 | + |
| 118 | + elif isinstance(field, odin.TypedListField): |
| 119 | + type_name = "array" |
| 120 | + schema = {"items": self._field_to_schema(field.field)} |
| 121 | + |
| 122 | + elif isinstance(field, odin.TypedDictField): |
| 123 | + type_name = "object" |
| 124 | + schema = {"additionalProperties": self._field_to_schema(field.value_field)} |
| 125 | + |
| 126 | + else: |
| 127 | + for field_type, field_info in FIELD_SCHEMAS.items(): |
| 128 | + if isinstance(field, field_type): |
| 129 | + type_name, schema = field_info |
| 130 | + break |
| 131 | + |
| 132 | + else: |
| 133 | + raise ValueError(f"Unknown field type: {field_type}") |
| 134 | + |
| 135 | + return ([type_name, "null"] if field.null else type_name), schema |
| 136 | + |
| 137 | + def _composite_field_to_schema(self, field: odin.CompositeField) -> Dict[str, Any]: |
| 138 | + """Convert a composite field to a JSON schema.""" |
| 139 | + |
| 140 | + # Handle abstract resources |
| 141 | + child_resources = get_child_resources(field.of) |
| 142 | + if child_resources: |
| 143 | + schema = { |
| 144 | + "oneOf": [ |
| 145 | + self._schema_def(child_resource) |
| 146 | + for child_resource in child_resources |
| 147 | + ] |
| 148 | + } |
| 149 | + else: |
| 150 | + schema = self._schema_def(field.of) |
| 151 | + |
| 152 | + if isinstance(field, odin.ListOf): |
| 153 | + schema = {"type": "array", "items": schema} |
| 154 | + |
| 155 | + elif isinstance(field, odin.DictOf): |
| 156 | + schema = {"type": "object", "additionalProperties": schema} |
| 157 | + |
| 158 | + return schema |
| 159 | + |
| 160 | + def _schema_def(self, resource: Type[ResourceBase]) -> Dict[str, str]: |
| 161 | + """Convert a resource to a JSON schema definition.""" |
| 162 | + meta = getmeta(resource) |
| 163 | + ref = meta.resource_name |
| 164 | + if ref not in self.defs: |
| 165 | + self.defs[ref] = None # Placeholder to prevent recursion |
| 166 | + self.defs[ref] = self._resource_to_schema(meta) |
| 167 | + return {"$ref": f"#/$defs/{ref}"} |
| 168 | + |
| 169 | + |
| 170 | +def dumps(resource: Type[ResourceBase]) -> str: |
| 171 | + """Dump a JSON schema for the given resource.""" |
| 172 | + schema = JSONSchema(resource).to_dict() |
| 173 | + return json.dumps(schema, indent=2) |
| 174 | + |
| 175 | + |
| 176 | +def dump(resource: Type[ResourceBase], fp: TextIO): |
| 177 | + """Dump a JSON schema for the given resource.""" |
| 178 | + schema = JSONSchema(resource).to_dict() |
| 179 | + json.dump(schema, fp, indent=2) |
0 commit comments