import sys
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Optional
from urllib.parse import urljoin
from lxml.etree import Element
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.parsers.nodes import ParsedObjects
from xsdata.formats.dataclass.parsers.nodes import XmlNodes
from xsdata.formats.dataclass.parsers.xml import XmlParser
from xsdata.models import xsd
from xsdata.models.enums import FormType
from xsdata.models.enums import Mode
from xsdata.models.enums import Namespace
[docs]@dataclass
class SchemaParser(XmlParser):
"""
A simple parser to convert an xsd schema to an easy to handle data
structure based on dataclasses.
The parser is a dummy as possible but it will try to normalize
certain things like apply parent properties to children.
:param location:
:param element_form:
:param attribute_form:
:param target_namespace:
:param default_attributes:
:param default_open_content:
"""
location: Optional[str] = field(default=None)
element_form: Optional[FormType] = field(init=False, default=None)
attribute_form: Optional[FormType] = field(init=False, default=None)
target_namespace: Optional[str] = field(default=None)
default_attributes: Optional[str] = field(default=None)
default_open_content: Optional[xsd.DefaultOpenContent] = field(default=None)
[docs] def dequeue(self, element: Element, queue: XmlNodes, objects: ParsedObjects) -> Any:
"""Override parent method to set element index and namespaces map."""
obj: Any = super().dequeue(element, queue, objects)
self.set_index(element, obj)
self.set_namespace_map(element, obj)
return obj
[docs] def start_schema(self, element: Element):
"""Collect the schema's default form for attributes and elements for
later usage."""
self.element_form = element.attrib.get("elementFormDefault", None)
self.attribute_form = element.attrib.get("attributeFormDefault", None)
self.default_attributes = element.attrib.get("defaultAttributes", None)
[docs] def set_schema_namespaces(self, obj: xsd.Schema, element: Element):
"""Set the given schema's target namespace and add the default
namespaces if the are missing xsi, xlink, xml, xs."""
obj.target_namespace = obj.target_namespace or self.target_namespace
self.set_namespace_map(element, obj)
[docs] @staticmethod
def set_namespace_map(element: Element, obj: Any):
"""Add common namespaces like xml, xsi, xlink if they are missing."""
if hasattr(obj, "ns_map"):
obj.ns_map = {prefix: uri for prefix, uri in element.nsmap.items() if uri}
ns_list = obj.ns_map.values()
ns_common = (
Namespace.XS,
Namespace.XSI,
Namespace.XML,
Namespace.XLINK,
)
obj.ns_map.update(
{ns.prefix: ns.uri for ns in ns_common if ns.uri not in ns_list}
)
[docs] @staticmethod
def set_index(element: Element, obj: Any):
if hasattr(obj, "index"):
obj.index = element.sourceline
[docs] @staticmethod
def add_default_imports(obj: xsd.Schema):
"""Add missing imports to the standard schemas if the namespace is
declared and."""
imp_namespaces = [imp.namespace for imp in obj.imports]
xsi_ns = Namespace.XSI.uri
if xsi_ns in obj.ns_map.values() and xsi_ns not in imp_namespaces:
obj.imports.insert(0, xsd.Import(namespace=xsi_ns))
[docs] def resolve_schemas_locations(self, obj: xsd.Schema):
"""Resolve the locations of the schema overrides, redefines, includes
and imports relatively to the schema location."""
if not self.location:
return
obj.location = self.location
for over in obj.overrides:
over.location = self.resolve_path(over.schema_location)
for red in obj.redefines:
red.location = self.resolve_path(red.schema_location)
for inc in obj.includes:
inc.location = self.resolve_path(inc.schema_location)
for imp in obj.imports:
imp.location = self.resolve_local_path(imp.schema_location, imp.namespace)
[docs] def resolve_path(self, location: Optional[str]) -> Optional[str]:
"""Resolve the given location string relatively the schema location
path."""
return urljoin(self.location, location) if self.location and location else None
[docs] def resolve_local_path(
self, location: Optional[str], namespace: Optional[str]
) -> Optional[str]:
"""Resolve the given namespace to one of the local standard schemas or
fallback to the external file path."""
common_ns = Namespace.get_enum(namespace)
local_path = common_ns.location if common_ns else None
return local_path if local_path else self.resolve_path(location)
[docs] def end_attribute(self, obj: T, element: Element):
"""Assign the schema's default form for attributes if the given
attribute form is None."""
if isinstance(obj, xsd.Attribute) and obj.form is None and self.attribute_form:
obj.form = FormType(self.attribute_form)
[docs] def end_complex_type(self, obj: T, element: Element):
"""Prepend an attribute group reference when default attributes
apply."""
if not isinstance(obj, xsd.ComplexType):
return
if obj.default_attributes_apply and self.default_attributes:
attribute_group = xsd.AttributeGroup(ref=self.default_attributes)
obj.attribute_groups.insert(0, attribute_group)
if not obj.open_content:
obj.open_content = self.default_open_content
[docs] def end_default_open_content(self, obj: T, element: Element):
"""Set the instance default open content to be used later as a property
for all extensions and restrictions."""
if isinstance(obj, xsd.DefaultOpenContent):
if obj.any and obj.mode == Mode.SUFFIX:
obj.any.index = sys.maxsize
self.default_open_content = obj
[docs] def end_element(self, obj: T, element: Element):
"""Assign the schema's default form for elements if the given element
form is None."""
if isinstance(obj, xsd.Element) and obj.form is None and self.element_form:
obj.form = FormType(self.element_form)
[docs] def end_extension(self, obj: T, element: Element):
"""Set the open content if any to the given extension."""
if isinstance(obj, xsd.Extension) and not obj.open_content:
obj.open_content = self.default_open_content
[docs] @classmethod
def end_open_content(cls, obj: T, element: Element):
"""Adjust the index to trick later processors into putting attributes
derived from this open content last in classes."""
if isinstance(obj, xsd.OpenContent):
if obj.any and obj.mode == Mode.SUFFIX:
obj.any.index = sys.maxsize
[docs] def end_restriction(self, obj: T, element: Element):
"""Set the open content if any to the given restriction."""
if isinstance(obj, xsd.Restriction) and not obj.open_content:
obj.open_content = self.default_open_content
[docs] def end_schema(self, obj: T, element: Element):
"""Normalize various properties for the schema and it's children."""
if isinstance(obj, xsd.Schema):
self.set_schema_forms(obj)
self.set_schema_namespaces(obj, element)
self.add_default_imports(obj)
self.resolve_schemas_locations(obj)