Source code for xsdata.formats.dataclass.parsers.nodes

from dataclasses import dataclass
from dataclasses import field
from dataclasses import fields
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union

from lxml.etree import _Element
from lxml.etree import _ElementTree
from lxml.etree import Element
from lxml.etree import iterwalk
from lxml.etree import QName

from xsdata.exceptions import ParserError
from xsdata.exceptions import XmlContextError
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.models.elements import FindMode
from xsdata.formats.dataclass.models.elements import XmlMeta
from xsdata.formats.dataclass.models.elements import XmlVar
from xsdata.formats.dataclass.models.generics import Namespaces
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.parsers.utils import ParserUtils
from xsdata.models.enums import EventType

Parsed = Tuple[Optional[QName], Any]
ParsedObjects = List[Tuple[QName, Any]]
XmlNodes = List["XmlNode"]


[docs]@dataclass(frozen=True) class XmlNode: """ A generic interface for xml nodes that need to implement the two public methods to be used in an event based parser with start/end element events. The parser needs to maintain a queue for these nodes and a list of objects that these nodes return. :param position: The current objects size, when the node is created. """ position: int
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> "XmlNode": """ Initialize the next node to be queued, when a new xml element starts. This entry point is responsible to create the next node type with all the necessary information on how to bind the incoming input data. """ raise NotImplementedError("Not Implemented")
[docs] def parse_element(self, element: Element, objects: List[Any]) -> Parsed: """ Parse the current element bind child objects and return the result. This entry point is called when an xml element ends and is responsible to parse the current element attributes/text, bind any children objects and initialize a new object. :return: A tuple of the object's qualified name and the new object. """ raise NotImplementedError(f"Not Implemented {element.tag}.")
[docs]@dataclass(frozen=True) class ElementNode(XmlNode): """ Element type node is equivalent to xml elements and is used to bind user defined dataclasses. :param meta: xml metadata of a dataclass model. :param config: Parser config instance passed down from the root node. """ meta: XmlMeta config: ParserConfig
[docs] def parse_element(self, element: Element, objects: List[Any]) -> Parsed: """ Parse the given element attributes/text, find all child objects and mixed content and initialize a new dataclass instance. :return: A tuple of the object's qualified name and the new object. """ params: Dict = {} ParserUtils.bind_element_attrs(params, self.meta, element) var = self.meta.find_var(mode=FindMode.MIXED_CONTENT) if var: ParserUtils.bind_mixed_content(params, var, self.position, objects) ParserUtils.bind_wildcard_text(params, var, element) else: ParserUtils.bind_element_children(params, self.meta, self.position, objects) ParserUtils.bind_element_text(params, self.meta, element) qname = QName(element.tag) obj = self.meta.clazz(**params) return qname, obj
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: """ Initialize the next node to be queued for the given starting element. Search by the given element tag for a matching variable and create the next node by the variable type. :return: The next node to be queued. :raises: XmlContextError if the element is unknown and parser config is strict. """ qname = QName(element.tag) var = self.meta.find_var(qname, FindMode.NOT_WILDCARD) if not var: var = self.meta.find_var(qname, FindMode.WILDCARD) if not var: if self.config.fail_on_unknown_properties: raise ParserError(f"Unknown property {self.meta.qname}:{qname}") return SkipNode(position=position) if var.is_clazz_union: return UnionNode(position=position, var=var, ctx=ctx) if var.clazz: xsi_type = ParserUtils.parse_xsi_type(element) meta = ctx.fetch(var.clazz, self.meta.qname.namespace, xsi_type) return ElementNode(position=position, meta=meta, config=self.config) if var.is_any_type: return WildcardNode(position=position, var=var) return PrimitiveNode(position=position, var=var)
[docs]@dataclass(frozen=True) class RootNode(ElementNode):
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: """Override parent to return itself if the current element is root.""" if element.getparent() is None: return self return super().next_node(element, position, ctx)
[docs]@dataclass(frozen=True) class WildcardNode(XmlNode): """ Wildcard nodes are used for extensible elements that can hold any attribute and content and don't have a specific dataclass or primitive type. Notes: In the future this node should check all known user defined models in the target namespace and use that instead of the generic. :param var: xml var instance """ var: XmlVar
[docs] def parse_element(self, element: Element, objects: List[Any]) -> Parsed: """ Parse the given element attributes/text/tail, find all child objects and mixed content and initialize a new generic element instance. :return: A tuple of the object's qualified name and a new :class:`xsdata.formats.dataclass.models.generics.AnyElement` instance. """ obj = ParserUtils.parse_any_element(element) obj.children = ParserUtils.fetch_any_children(self.position, objects) return self.var.qname, obj
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: """ Initialize the next wildcard node to be queued for the given starting element. Notes: Wildcard nodes can only queue other wildcard nodes. """ return WildcardNode(position=position, var=self.var)
[docs]@dataclass(frozen=True) class UnionNode(XmlNode): """Union nodes are used for variables with more than one possible types where at least one of them is a dataclass.""" var: XmlVar ctx: XmlContext
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: """Skip all child nodes as we are going to parse the complete element tree.""" return SkipNode(position=position)
[docs] def parse_element(self, element: Element, objects: List[Any]) -> Parsed: """ The handler will make multiple tries to bind the given element to one of the available dataclass var types convert it to one of the available primitive types. The first shoe that fits wins! :raise ParserError: When all attempts fail :return: A tuple of the object's qualified name and the new object. """ parent = element.getparent() if parent is not None: parent.remove(element) # detach element from parent obj = None max_score = -1 for clazz in self.var.types: candidate = self.parse_class(element, clazz) score = self.score_object(candidate) if score > max_score: max_score = score obj = candidate if obj: return self.var.qname, obj raise ParserError(f"Failed to parse union node: {self.var.qname}")
[docs] def parse_class(self, element: Element, clazz: Type[T]) -> Optional[T]: """Initialize a new XmlParser and try to parse the given element.""" try: parser = NodeParser(context=self.ctx) return parser.parse(element, clazz) except Exception: return None
[docs] @classmethod def score_object(cls, obj: Any) -> int: """Sum all not None field values for the given object.""" return ( sum(1 for var in fields(obj) if getattr(obj, var.name) is not None) if obj else -1 )
[docs]@dataclass(frozen=True) class PrimitiveNode(XmlNode): """ XmlNode for text elements with primitive values eg str, int, float. :param var: xml var instance """ var: XmlVar
[docs] def parse_element(self, element: Element, objects: List) -> Parsed: """ Parse the given element text according to the node possible types. :return: A tuple of the object's qualified name and the new object. """ qname = QName(element.tag) value = element.text ns_map = element.nsmap obj = ParserUtils.parse_value( value, self.var.types, self.var.default, ns_map, self.var.is_tokens ) return qname, obj
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: raise XmlContextError("Primitive node doesn't support child nodes!")
[docs]@dataclass(frozen=True) class SkipNode(XmlNode): """The skip node should be used when we want to skip parsing child elements."""
[docs] def next_node(self, element: Element, position: int, ctx: XmlContext) -> XmlNode: """Skip the current child.""" return SkipNode(position=position)
[docs] def parse_element(self, element: Element, objects: List[Any]) -> Parsed: """Skip parsing the current element.""" return None, None
[docs]@dataclass class NodeParser: """ Xml parsing and binding for dataclasses. :param config: Parser configuration :param context: Model metadata builder :param namespaces: Store the prefix/namespace as they are parsed. """ config: ParserConfig = field(default_factory=ParserConfig) context: XmlContext = field(default_factory=XmlContext) namespaces: Namespaces = field(init=False, default_factory=Namespaces)
[docs] def parse(self, source: Union[_Element, _ElementTree], clazz: Type[T]) -> T: events = EventType.START, EventType.END, EventType.START_NS context = iterwalk(source, events=events) return self.parse_context(context, clazz)
[docs] def parse_context(self, context: Iterable, clazz: Type[T]) -> T: """ Dispatch elements to handlers as they arrive and are fully parsed. :raises ParserError: When the requested type doesn't match the result object """ obj = None meta = self.context.build(clazz) objects: ParsedObjects = [] queue: XmlNodes = [RootNode(position=0, meta=meta, config=self.config)] self.namespaces.clear() for event, element in context: if event == EventType.START_NS: self.add_namespace(element) if event == EventType.START: self.queue(element, queue, objects) elif event == EventType.END: obj = self.dequeue(element, queue, objects) if not obj: raise ParserError(f"Failed to create target class `{clazz.__name__}`") return obj
[docs] def add_namespace(self, namespace: Tuple): """Add the given namespace in the registry.""" prefix, uri = namespace self.namespaces.add(uri, prefix)
[docs] def queue(self, element: Element, queue: XmlNodes, objects: ParsedObjects): """Queue the next xml node for parsing based on the given element qualified name.""" item = queue[-1] position = len(objects) queue.append(item.next_node(element, position, self.context))
[docs] def dequeue(self, element: Element, queue: XmlNodes, objects: ParsedObjects) -> Any: """ Use the last xml node to parse the given element and bind any child objects. :return: Any: A dataclass instance or a python primitive value or None """ item = queue.pop() result = item.parse_element(element, objects) if not isinstance(item, SkipNode): objects.append(result) return result[1]