import copy
from dataclasses import dataclass
from dataclasses import field
from dataclasses import fields
from typing import Any
from typing import ClassVar
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from xsdata.exceptions import ParserError
from xsdata.exceptions import XmlContextError
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.models.elements import FindMode
from xsdata.formats.dataclass.models.elements import XmlMeta
from xsdata.formats.dataclass.models.elements import XmlVar
from xsdata.formats.dataclass.models.generics import AnyElement
from xsdata.formats.dataclass.models.generics import Namespaces
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.parsers.mixins import EventsHandler
from xsdata.formats.dataclass.parsers.mixins import PushParser
from xsdata.formats.dataclass.parsers.mixins import XmlHandler
from xsdata.formats.dataclass.parsers.mixins import XmlNode
from xsdata.formats.dataclass.parsers.utils import ParserUtils
from xsdata.models.enums import EventType
Parsed = Tuple[Optional[str], Any]
NoneStr = Optional[str]
[docs]@dataclass
class ElementNode(XmlNode):
"""
Element type node is equivalent to xml elements and is used to bind user
defined dataclasses.
:param meta: Model xml metadata
:param attrs: Key-value attribute mapping
:param ns_map: Prefix-URI Namespace mapping
:param config: Parser configuration
:param context: Model xml metadata builder
:param position: The node position of objects cache
"""
meta: XmlMeta
attrs: Dict
ns_map: Dict
config: ParserConfig
context: XmlContext
position: int
mixed: bool = False
FIND_MODES_ORDERED: ClassVar[Iterable[FindMode]] = (
FindMode.NOT_WILDCARD,
FindMode.WILDCARD,
)
[docs] def bind(self, qname: str, text: NoneStr, tail: NoneStr, objects: List) -> bool:
"""
Parse the given element attributes/text, find all child objects and
mixed content and initialize a new dataclass instance.
:return: A tuple of the object's qualified name and the new object.
"""
params: Dict = {}
ParserUtils.bind_element_attrs(params, self.meta, self.attrs, self.ns_map)
mixed_var = self.meta.find_var(mode=FindMode.MIXED_CONTENT)
if mixed_var:
ParserUtils.bind_mixed_content(params, mixed_var, self.position, objects)
ParserUtils.bind_wildcard_element(
params, mixed_var, text, tail, self.attrs, self.ns_map
)
else:
ParserUtils.bind_element_children(params, self.meta, self.position, objects)
ParserUtils.bind_element(
params, self.meta, text, tail, self.attrs, self.ns_map
)
objects.append((qname, self.meta.clazz(**params)))
if not mixed_var and self.mixed:
tail = ParserUtils.normalize_content(tail)
if tail:
objects.append((None, tail))
return True
[docs] def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
"""
Initialize the next node to be queued for the given starting element.
Search by the given element tag for a matching variable and create the next
node by the variable type.
:return: The next node to be queued.
:raises: ParserError if the element is unknown and parser config is strict.
"""
var = self.find_var(qname)
if not var:
if self.config.fail_on_unknown_properties:
raise ParserError(f"Unknown property {self.meta.qname}:{qname}")
return SkipNode()
if var.is_clazz_union:
return UnionNode(
var=var,
attrs=attrs,
ns_map=ns_map,
context=self.context,
position=position,
)
if var.clazz:
xsi_type = ParserUtils.parse_xsi_type(attrs, ns_map)
meta = self.context.fetch(var.clazz, self.meta.namespace, xsi_type)
mixed = self.meta.find_var(mode=FindMode.MIXED_CONTENT)
return ElementNode(
meta=meta,
config=self.config,
attrs=attrs,
ns_map=ns_map,
context=self.context,
position=position,
mixed=mixed is not None,
)
if var.is_any_type:
return WildcardNode(var=var, attrs=attrs, ns_map=ns_map, position=position)
return PrimitiveNode(var=var, ns_map=ns_map)
[docs] def find_var(self, qname: str) -> Optional[XmlVar]:
for mode in self.FIND_MODES_ORDERED:
var = self.meta.find_var(qname, mode)
if var:
return var
return None
[docs]@dataclass
class WildcardNode(XmlNode):
"""
Wildcard nodes are used for extensible elements that can hold any attribute
and content and don't have a specific dataclass or primitive type.
Notes:
In the future this node should check all known user defined models in the
target namespace and use that instead of the generic.
:param var: Class field xml var instance
:param attrs: Key-value attribute mapping
:param ns_map: Prefix-URI Namespace mapping
:param position: The node position of objects cache
"""
var: XmlVar
attrs: Dict
ns_map: Dict
position: int
[docs] def bind(self, qname: str, text: NoneStr, tail: NoneStr, objects: List) -> bool:
"""
Parse the given element attributes/text/tail, find all child objects
and mixed content and initialize a new generic element instance.
:return: A tuple of the object's qualified name and a new
:class:`xsdata.formats.dataclass.models.generics.AnyElement` instance.
"""
obj = AnyElement(
qname=qname,
text=ParserUtils.normalize_content(text),
tail=ParserUtils.normalize_content(tail),
ns_map=self.ns_map,
attributes=ParserUtils.parse_any_attributes(self.attrs, self.ns_map),
children=ParserUtils.fetch_any_children(self.position, objects),
)
objects.append((self.var.qname, obj))
return True
[docs] def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
"""Initialize the next wildcard node to be queued for the given
starting element."""
return WildcardNode(position=position, var=self.var, attrs=attrs, ns_map=ns_map)
[docs]@dataclass
class UnionNode(XmlNode):
"""
Union nodes are used for variables with more than one possible types where
at least one of them is a dataclass.
:param var: Class field xml var instance
:param attrs: Key-value attribute mapping
:param ns_map: Prefix-URI Namespace mapping
:param position: The node position of objects cache
:param context: Model xml context cache
:param level: Current node level
:param events: Record node events
"""
var: XmlVar
attrs: Dict
ns_map: Dict
position: int
context: XmlContext
level: int = field(default_factory=int)
events: List = field(default_factory=list)
[docs] def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
"""Skip all child nodes as we are going to parse the complete element
tree."""
self.level += 1
self.events.append(("start", qname, copy.deepcopy(attrs), ns_map))
return self
[docs] def bind(self, qname: str, text: NoneStr, tail: NoneStr, objects: List) -> bool:
"""
The handler will make multiple tries to bind the given element to one
of the available dataclass var types convert it to one of the available
primitive types.
The first shoe that fits wins!
:raise ParserError: When all attempts fail
:return: A tuple of the object's qualified name and the new object.
"""
self.events.append(("end", qname, text, tail))
if self.level > 0:
self.level -= 1
return False
self.events.insert(0, ("start", qname, copy.deepcopy(self.attrs), self.ns_map))
obj = None
max_score = -1
for clazz in self.var.types:
candidate = self.parse_class(clazz)
score = self.score_object(candidate)
if score > max_score:
max_score = score
obj = candidate
if obj:
objects.append((self.var.qname, obj))
return True
raise ParserError(f"Failed to parse union node: {self.var.qname}")
[docs] def parse_class(self, clazz: Type[T]) -> Optional[T]:
"""Initialize a new XmlParser and try to parse the given element."""
try:
parser = NodeParser(context=self.context)
return parser.parse(self.events, clazz)
except Exception:
return None
[docs] @classmethod
def score_object(cls, obj: Any) -> int:
"""Sum all not None field values for the given object."""
return (
sum(1 for var in fields(obj) if getattr(obj, var.name) is not None)
if obj
else -1
)
[docs]@dataclass
class PrimitiveNode(XmlNode):
"""
XmlNode for text elements with primitive values eg str, int, float.
:param var: Class field xml var instance
:param ns_map: Prefix-URI Namespace mapping
"""
var: XmlVar
ns_map: Dict
[docs] def bind(self, qname: str, text: NoneStr, tail: NoneStr, objects: List) -> bool:
"""
Parse the given element text according to the node possible types.
:return: A tuple of the object's qualified name and the new object.
"""
objects.append(
(
qname,
ParserUtils.parse_value(
text, self.var.types, self.var.default, self.ns_map, self.var.tokens
),
)
)
return True
[docs] def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
raise XmlContextError("Primitive node doesn't support child nodes!")
[docs]@dataclass
class SkipNode(XmlNode):
"""The skip node should be used when we want to skip parsing child
elements."""
[docs] def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
"""Skip the current child."""
return self
[docs] def bind(self, qname: str, text: NoneStr, tail: NoneStr, objects: List) -> bool:
"""Skip parsing the current element."""
return False
[docs]@dataclass
class NodeParser(PushParser):
"""
Bind xml nodes to dataclasses.
:param config: Parser configuration
:param context: Model metadata builder
:param handler: Override default XmlHandler
:param namespaces: Namespace registry for prefix-URI mappings
"""
config: ParserConfig = field(default_factory=ParserConfig)
context: XmlContext = field(default_factory=XmlContext)
handler: Type[XmlHandler] = field(default=EventsHandler)
namespaces: Namespaces = field(init=False, default_factory=Namespaces)
[docs] def parse(self, source: Any, clazz: Type[T]) -> T:
"""Parse the XML input stream and return the resulting object tree."""
handler = self.handler(clazz=clazz, parser=self)
result = handler.parse(source)
if isinstance(result, clazz):
return result
raise ParserError(f"Failed to create target class `{clazz.__name__}`")
[docs] def start(
self,
queue: List[XmlNode],
qname: str,
attrs: Dict,
ns_map: Dict,
objects: List[Parsed],
clazz: Type[T],
):
"""Queue the next xml node for parsing."""
try:
item = queue[-1]
child = item.child(qname, attrs, ns_map, len(objects))
except IndexError:
meta = self.context.build(clazz)
child = ElementNode(
position=0,
meta=meta,
config=self.config,
attrs=attrs,
ns_map=ns_map,
context=self.context,
)
queue.append(child)
[docs] def end(
self,
queue: List[XmlNode],
qname: str,
text: Optional[str],
tail: Optional[str],
objects: List[Parsed],
) -> Any:
"""
Parse the last xml node and bind any intermediate objects.
:return: The result of the binding process.
"""
obj = None
item = queue.pop()
if item.bind(qname, text, tail, objects):
obj = objects[-1][1]
return obj
[docs] def start_prefix_mapping(self, prefix: Optional[str], uri: str):
"""
Add the given prefix-URI to the namespaces registry.
Default namespaces have an empty prefix.
"""
self.namespaces.add(uri, prefix or "")
[docs]@dataclass
class RecordParser(NodeParser):
"""
Bind xml nodes to dataclasses with an events recorder.
:param events: List of pushed events
"""
events: List = field(default_factory=list)
[docs] def start(
self,
queue: List[XmlNode],
qname: str,
attrs: Dict,
ns_map: Dict,
objects: List[Parsed],
clazz: Type[T],
):
self.events.append((EventType.START, qname, copy.deepcopy(attrs), ns_map))
super().start(queue, qname, attrs, ns_map, objects, clazz)
[docs] def end(
self,
queue: List[XmlNode],
qname: str,
text: Optional[str],
tail: Optional[str],
objects: List[Parsed],
) -> Any:
self.events.append((EventType.END, qname, text, tail))
return super().end(queue, qname, text, tail, objects)
[docs] def start_prefix_mapping(self, prefix: Optional[str], uri: str):
self.events.append((EventType.START_NS, prefix, uri))
super().start_prefix_mapping(prefix, uri)