import io
import json
import re
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
from lxml.etree import Element
from lxml.etree import iterparse
from lxml.etree import QName
from lxml.etree import tostring
from xsdata.formats.dataclass.mixins import ClassMeta
from xsdata.formats.dataclass.mixins import ClassVar
from xsdata.formats.dataclass.mixins import ModelInspect
from xsdata.formats.dataclass.models import AnyElement
from xsdata.formats.mixins import AbstractParser
from xsdata.formats.mixins import AbstractXmlParser
from xsdata.models.enums import EventType
T = TypeVar("T")
[docs]@dataclass
class JsonParser(AbstractParser, ModelInspect):
[docs] def parse(self, source: io.BytesIO, clazz: Type[T]) -> T:
"""Parse the JSON input stream and return the resulting object tree."""
ctx = json.load(source)
return self.parse_context(ctx, clazz)
[docs] def parse_context(self, data: Dict, clazz: Type[T]) -> T:
"""
Recursively build the given model from the input dict data.
:raise TypeError: When parsing fails for any reason
"""
params = {}
if isinstance(data, list) and len(data) == 1:
data = data[0]
for var in self.class_meta(clazz).vars.values():
value = self.get_value(data, var)
if value is None:
continue
elif var.is_list:
params[var.name] = [self.bind_value(var, val) for val in value]
else:
params[var.name] = self.bind_value(var, value)
try:
return clazz(**params) # type: ignore
except Exception:
raise TypeError("Parsing failed")
[docs] def bind_value(self, var: ClassVar, value) -> Any:
"""
Bind value according to the class var.
The return value can be:
- a dataclass instance
- a dictionary with unknown attributes
- a list of unknown elements
- an enumeration
- a primitive value
"""
if var.is_dataclass:
return self.parse_context(value, var.type)
elif var.is_any_attribute:
return dict(value)
elif var.is_any_element:
return (
value
if isinstance(value, str)
else self.parse_context(value, AnyElement)
)
else:
return self.parse_value(var.type, value)
[docs] @staticmethod
def get_value(data: Dict, field: ClassVar):
"""Find the field value in the given dictionary or return the default
field value."""
if field.qname.localname in data:
value = data[field.qname.localname]
elif field.name in data:
value = data[field.name]
elif callable(field.default):
value = field.default()
else:
value = field.default
if field.is_list and not isinstance(value, list):
value = [value]
return value
[docs]@dataclass(frozen=True)
class QueueItem:
type: Type
index: int
meta: Optional[ClassMeta] = field(default=None)
position: int = field(default_factory=int)
[docs]@dataclass
class XmlParser(AbstractXmlParser, ModelInspect):
index: int = field(default_factory=int)
queue: List[Optional[QueueItem]] = field(init=False, default_factory=list)
namespace: Optional[str] = field(init=False, default=None)
objects: List[Tuple[QName, Any]] = field(init=False, default_factory=list)
[docs] def parse_context(self, context: iterparse, clazz: Type[T]) -> T:
"""
Dispatch elements to handlers as they arrive and are fully parsed.
Initialize queue with clazz metadata and reset pending objects list.
:raises ValueError: When the requested type doesn't match the result object
"""
meta = self.class_meta(clazz)
self.objects = []
self.queue = [QueueItem(type=clazz, index=0, meta=meta)]
return super(XmlParser, self).parse_context(context, clazz)
[docs] def start_node(self, element: Element):
"""
Prepare metadata queue to bind the given element to a dataclass object.
In order:
- If last item in queue is None assume we are inside mixed content
- If element qname is not a var in the last item in the queue.
- Check if the first element element is root and skip the rest.
- Check if the last queue item supports mixed content and setup the queue to
bypass direct data binding to a new dataclass object.
- If element qname is a var in the last item in the queue append its class
metadata to the queue for the upcoming data binding.
:raises Value: When the parser doesn't know how to handle the given element.
"""
qname = element.tag
item = self.queue[-1]
if not item or not item.meta:
return self.queue.append(None)
if item.meta and qname not in item.meta.vars:
if item.meta.qname == qname:
self.index += 1
self.emit_event(EventType.START, qname, item=item, element=element)
return None # root
elif item.meta.mixed:
return self.queue.append(None)
else:
raise ValueError(
f"{item.meta.qname} does not support mixed content: {qname}"
)
var = item.meta.vars[qname]
meta = self.class_meta(var.type, item.meta.qname) if var.is_dataclass else None
queue_item = QueueItem(
type=var.type, index=self.index, meta=meta, position=len(self.objects)
)
self.queue.append(queue_item)
self.index += 1
self.emit_event(EventType.START, qname, item=item, element=element)
[docs] def end_node(self, element: Element) -> Optional[T]:
"""
Build an objects tree for the given element.
Construct a dataclass instance with the attributes of the given element and if
any pending objects that belong to the model. Otherwise parse as a primitive
type the element's text content.
:returns object: A dataclass object or a python primitive value.
:raises ValueError: When parser has no data bind strategy for the given object.
"""
item = self.queue.pop()
if item is None:
return None
elif item.meta:
attr_params = self.bind_element_attrs(item.meta, element)
text_params = self.bind_element_text(item.meta, element)
any_params = self.bind_element_any(item.meta, element)
children = self.fetch_class_children(item)
obj = item.type(**attr_params, **text_params, **any_params, **children)
elif item.type:
obj = self.parse_value(item.type, element.text)
else:
raise ValueError(f"Failed to create object from {element.tag}")
self.objects.append((QName(element.tag), obj))
self.emit_event(EventType.END, element.tag, obj=obj, element=element)
return obj
[docs] def emit_event(self, event: str, name: str, **kwargs):
"""Call if exist the parser's hook for the given element and event."""
local_name = QName(name).localname
method_name = f"{event}_{local_name}"
if hasattr(self, method_name):
getattr(self, method_name)(**kwargs)
[docs] def fetch_class_children(self, item: QueueItem) -> Dict[str, Any]:
"""
Return a dictionary of qualified object names and their values for the
given queue item.
:raises ValueError: if queue item type is primitive.
"""
if not item.meta:
raise ValueError("Queue item is not a dataclass!")
params: Dict[str, Any] = defaultdict(list)
while len(self.objects) > item.position:
qname, value = self.objects.pop(item.position)
arg = item.meta.vars[qname]
if arg.is_list:
params[arg.name].append(value)
else:
params[arg.name] = value
return params
[docs] def bind_element_attrs(self, metadata: ClassMeta, element: Element) -> Dict:
"""Parse the given element's attributes and any text content and return
a dictionary of field names and values based on the given class
metadata."""
params = dict()
any_attr = metadata.any_attribute
for qname, value in element.attrib.items():
if qname in metadata.vars:
var = metadata.vars[qname]
params[var.name] = self.parse_value(var.type, value)
elif any_attr:
if any_attr.name not in params:
params[any_attr.name] = dict()
params[any_attr.name][qname] = value
return params
[docs] def bind_element_text(self, metadata: ClassMeta, element: Element):
params = dict()
text_var = metadata.any_text
if text_var and element.text is not None:
params[text_var.name] = self.parse_value(text_var.type, element.text)
return params
[docs] def bind_element_any(self, metadata: ClassMeta, element: Element):
params = dict()
any_var = metadata.any_element
if any_var:
text = element.text.strip() if element.text else None
tail = element.tail.strip() if element.tail else None
any_values = list(map(self.parse_any_element, element))
if text:
any_values.insert(0, text)
if tail:
any_values.append(tail)
if any_values:
params[any_var.name] = any_values
return params
[docs] @classmethod
def parse_any_element(cls, element: Element):
text = element.text.strip() if element.text else None
tail = element.tail.strip() if element.tail else None
return AnyElement(
qname=element.tag,
text=text or None,
tail=tail or None,
children=list(map(cls.parse_any_element, element)),
attributes={k: v for k, v in element.attrib.items()},
)
[docs] @classmethod
def parse_mixed_content(cls, element: Element):
"""Parse element mixed content by preserving the raw string."""
xml = tostring(element, pretty_print=True).decode()
start_root = xml.find(">")
end_root = xml.rfind("<")
return re.sub(r"\s+", " ", xml[start_root + 1 : end_root]).strip()