import json
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Dict, List, Optional, Type
from lxml.etree import Element, QName, iterparse
from xsdata.formats.dataclass.mixins import Field, ModelInspect
from xsdata.formats.mixins import AbstractParser
from xsdata.models.enums import EventType
[docs]@dataclass
class JsonParser(AbstractParser, ModelInspect):
[docs] def parse(self, source: BytesIO, clazz: Type) -> Type:
"""Parse the JSON input stream and return the resulting object tree."""
ctx = json.load(source)
return self.parse_context(ctx, clazz)
[docs] def parse_context(self, data: Dict, model: Type) -> Type:
"""
Recursively build the given model from the input dict data.
:raise TypeError: When parsing fails for any reason
"""
params = {}
if isinstance(data, list) and len(data) == 1:
data = data[0]
for field in self.fields(model):
value = self.parse_value(data, field)
if not value:
params[field.name] = value
elif field.is_dataclass:
params[field.name] = (
[self.parse_context(val, field.type) for val in value]
if field.is_list
else self.parse_context(value, field.type)
)
else:
params[field.name] = (
list(map(field.type, value))
if field.is_list
else field.type(value)
)
try:
return model(**params)
except Exception:
raise TypeError("Parsing failed")
[docs] @staticmethod
def parse_value(data: Dict, field: Field):
"""Find the field value in the given dictionary or return the default
field value."""
if field.local_name in data:
value = data[field.local_name]
if field.is_list and type(value) is not list:
value = [value]
elif callable(field.default):
value = field.default()
else:
value = field.default
return value
[docs]@dataclass
class XmlParser(AbstractParser, ModelInspect):
[docs] def parse(self, source: BytesIO, clazz: Type) -> Type:
"""Parse the XML input stream and return the resulting object tree."""
ctx = iterparse(source=source, events=(EventType.START, EventType.END))
return self.parse_context(ctx, clazz)
[docs] def parse_context(self, context: iterparse, clazz: Type) -> Type:
"""Build the given model from the iterparse event data."""
_, root = next(context)
namespace = self.class_meta(clazz).namespace
queue = [self.class_ns_fields(clazz, namespace)]
objects = [self.build_object(clazz, namespace, root)]
for event, element in context:
if event == EventType.START:
field = self.find_field(queue, namespace, element)
obj = self.build_object_from_field(field, namespace, element)
objects.append(obj)
elif event == EventType.END:
obj = self.end_element(objects, queue, element)
element.clear()
return obj
[docs] def find_field(
self, queue: List[Dict], namespace: Optional[str], element: Element,
) -> Field:
"""
Find the current field from the fields queue.
If the next field is also a dataclass append its fields map to
the queue for the next event
"""
field = queue[-1][element.tag]
if field.is_dataclass:
class_fields = self.class_ns_fields(field.type, namespace)
queue.append(class_fields)
return field
[docs] def build_object_from_field(
self, field: Field, namespace: Optional[str], element: Element
) -> Type:
"""Bind the current element to a dataclass or simply parse its text
value."""
if not field.is_dataclass:
return self.parse_value(field.type, element.text)
return self.build_object(field.type, namespace, element)
[docs] def build_object(
self, clazz: Type, namespace: Optional[str], element: Element
) -> Type:
"""Create a new class instance by the current element attributes and
text."""
params = {}
for qname, field in self.class_ns_fields(clazz, namespace).items():
if field.is_text and element.text:
params[field.name] = self.parse_value(field.type, element.text)
elif qname in element.attrib:
params[field.name] = self.parse_value(
field.type, element.attrib[qname]
)
return clazz(**params)
[docs] def end_element(
self, objects: List[Type], queue: List[Dict], element: Element
) -> Type:
"""
Finalize and return the last item of the objects list.
Steps:
* Pop the last item of the objects
* If the object is a dataclass pop the fields queue which should be
the current object's fields map
* If the object is not the last in the list assign or append it
to the correct parent field
"""
obj = objects.pop()
if self.is_dataclass(obj):
queue.pop()
if len(objects):
field = queue[-1][element.tag]
if field.is_list:
getattr(objects[-1], field.name).append(obj)
else:
setattr(objects[-1], field.name, obj)
return obj
[docs] def class_ns_fields(
self, clazz: Type, namespace: Optional[str]
) -> Dict[str, Field]:
"""Returns the given class fields indexed by their namespace qualified
names for easier match."""
res: Dict = dict()
for field in self.fields(clazz):
if field.is_element and field.namespace == "":
res[field.local_name] = field
if field.is_attribute and field.namespace is None:
res[field.local_name] = field
else:
qname = QName(field.namespace or namespace, field.local_name)
res[qname.text] = field
return res
[docs] @classmethod
def parse_value(cls, tp: Type, value: Any) -> Any:
"""Convert xml string values to python primite types."""
if hasattr(tp, "__origin__"):
for tp_arg in tp.__args__:
try:
return cls.parse_value(tp_arg, value)
except ValueError:
pass
return value
return value == "true" if tp is bool else tp(value)