Source code for xsdata.formats.dataclass.parsers

import io
import json
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar

from lxml.etree import Element
from lxml.etree import iterparse
from lxml.etree import QName

from xsdata.formats.dataclass.mixins import ClassMeta
from xsdata.formats.dataclass.mixins import ClassVar
from xsdata.formats.dataclass.mixins import ModelInspect
from xsdata.formats.dataclass.models import AnyElement
from xsdata.formats.mixins import AbstractParser
from xsdata.formats.mixins import AbstractXmlParser
from xsdata.models.enums import EventType
from xsdata.utils import text

T = TypeVar("T")


[docs]@dataclass class JsonParser(AbstractParser, ModelInspect):
[docs] def parse(self, source: io.BytesIO, clazz: Type[T]) -> T: """Parse the JSON input stream and return the resulting object tree.""" ctx = json.load(source) return self.parse_context(ctx, clazz)
[docs] def parse_context(self, data: Dict, clazz: Type[T]) -> T: """ Recursively build the given model from the input dict data. :raise TypeError: When parsing fails for any reason """ params = {} if isinstance(data, list) and len(data) == 1: data = data[0] for var in self.class_meta(clazz).vars.values(): value = self.get_value(data, var) if value is None: continue elif var.is_list: params[var.name] = [self.bind_value(var, val) for val in value] else: params[var.name] = self.bind_value(var, value) try: return clazz(**params) # type: ignore except Exception: raise TypeError("Parsing failed")
[docs] def bind_value(self, var: ClassVar, value) -> Any: """ Bind value according to the class var. The return value can be: - a dataclass instance - a dictionary with unknown attributes - a list of unknown elements - an enumeration - a primitive value """ if var.is_dataclass: return self.parse_context(value, var.clazz) elif var.is_any_attribute: return dict(value) elif var.is_any_element: return ( value if isinstance(value, str) else self.parse_context(value, AnyElement) ) else: return self.parse_value(var.types, value, var.default)
[docs] @staticmethod def get_value(data: Dict, field: ClassVar): """Find the field value in the given dictionary or return the default field value.""" if field.qname.localname in data: value = data[field.qname.localname] elif field.name in data: value = data[field.name] else: return None if field.is_list and not isinstance(value, list): value = [value] return value
[docs]@dataclass(frozen=True) class QueueItem: index: int position: int default: Any = field(default=None) qname: Optional[str] = field(default=None) meta: Optional[ClassMeta] = field(default=None) types: List[Type] = field(default_factory=list)
[docs]@dataclass class XmlParser(AbstractXmlParser, ModelInspect): index: int = field(default_factory=int) queue: List[Optional[QueueItem]] = field(init=False, default_factory=list) namespace: Optional[str] = field(init=False, default=None) objects: List[Tuple[QName, Any]] = field(init=False, default_factory=list)
[docs] def parse_context(self, context: iterparse, clazz: Type[T]) -> T: """ Dispatch elements to handlers as they arrive and are fully parsed. Initialize queue with clazz metadata and reset pending objects list. :raises ValueError: When the requested type doesn't match the result object """ meta = self.class_meta(clazz) self.objects = [] self.queue = [QueueItem(index=0, position=0, meta=meta, default=None)] return super(XmlParser, self).parse_context(context, clazz)
[docs] def start_node(self, element: Element): """ Queue the necessary metadata to bind the given element when it's fully parsed. Checks for the last item in queue: - Is none or has none meta -> inside a wildcard - Element tag exists in known variables -> dataclass or primitive - Element tag equals meta qualified name -> root element - Element tag unknown but queue supports wildcard -> start a wildcard :raises Value: When the parser doesn't know how to handle the given element. """ qname = element.tag item = self.queue[-1] if not item or not item.meta: # Assuming inside a wildcard return self.queue.append(None) elif qname in item.meta.vars: # A dataclass or a primitive field queue_item = self.create_queue_item(item.meta.vars[qname], item.meta.qname) self.queue.append(queue_item) elif item.meta.qname == qname: # root pass elif item.meta.any_element: # Wildcard jackpot queue_item = self.create_wildcard_queue_item(item.meta.any_element.qname) self.queue.append(queue_item) else: # Unknown :) raise ValueError( f"{item.meta.qname} does not support mixed content: {qname}" ) self.index += 1 self.emit_event(EventType.START, qname, item=item, element=element)
[docs] def create_queue_item(self, var: ClassVar, parent_qname: QName): meta = None types = var.types if var.is_dataclass: meta = self.class_meta(var.clazz, parent_qname) types = [] return QueueItem( meta=meta, index=self.index, types=types, position=len(self.objects), default=var.default, )
[docs] def create_wildcard_queue_item(self, parent_qname: QName): return QueueItem( index=self.index, position=len(self.objects), qname=parent_qname, )
[docs] def end_node(self, element: Element) -> Optional[T]: """ Build an objects tree for the given element. Construct a dataclass instance with the attributes of the given element and any pending objects that belong to the model. Otherwise parse as a primitive type the element's text content. :returns object: A dataclass object or a python primitive value. """ item = self.queue.pop() if not item: # Assuming inside a wildcard return None qname = QName(element.tag) if item.meta: # dataclass attr_params = self.bind_element_attrs(item.meta, element) text_params = self.bind_element_text(item.meta, element) children = self.fetch_class_children(item, element) obj = item.meta.clazz(**attr_params, **text_params, **children) elif item.types: # primitive obj = self.parse_value(item.types, element.text, item.default) elif item.qname: # wildcard obj = self.parse_any_element(element) obj.children = self.fetch_any_children(item) qname = item.qname else: # unknown :) raise ValueError(f"Failed to create object from {element.tag}") self.objects.append((qname, obj)) self.emit_event(EventType.END, element.tag, obj=obj, element=element) return obj
[docs] def emit_event(self, event: str, name: str, **kwargs): """Call if exist the parser's hook for the given element and event.""" local_name = text.snake_case(QName(name).localname) method_name = f"{event}_{local_name}" if hasattr(self, method_name): getattr(self, method_name)(**kwargs)
[docs] def fetch_class_children(self, item: QueueItem, element: Element) -> Dict[str, Any]: """ Return a dictionary of qualified object names and their values for the given queue item. :raises ValueError: if queue item type is primitive. """ if not item.meta: raise ValueError("Queue item is not a dataclass!") params: Dict[str, Any] = defaultdict(list) while len(self.objects) > item.position: qname, value = self.objects.pop(item.position) arg = item.meta.vars[qname] if not arg.init: continue if value is None: value = "" if arg.is_list: params[arg.name].append(value) else: params[arg.name] = value if item.meta.mixed and item.meta.any_element: var = item.meta.any_element txt, tail = self.element_text_and_tail(element) if text: params[var.name].insert(0, txt) if tail: params[var.name].append(tail) return params
[docs] def fetch_any_children(self, item: QueueItem) -> List[object]: children = [] while len(self.objects) > item.position: _, value = self.objects.pop(item.position) children.append(value) return children
[docs] def bind_element_attrs(self, metadata: ClassMeta, element: Element) -> Dict: """Parse the given element's attributes and any text content and return a dictionary of field names and values based on the given class metadata.""" params = dict() any_attr = metadata.any_attribute for qname, value in element.attrib.items(): if qname in metadata.vars: var = metadata.vars[qname] if var.init: params[var.name] = self.parse_value(var.types, value, var.default) elif any_attr: if any_attr.name not in params: params[any_attr.name] = dict() params[any_attr.name][qname] = value return params
[docs] def bind_element_text(self, metadata: ClassMeta, element: Element): params = dict() var = metadata.any_text if var and element.text is not None and var.init: params[var.name] = self.parse_value(var.types, element.text, var.default) return params
[docs] @classmethod def parse_any_element(cls, element: Element) -> Optional[AnyElement]: if not isinstance(element.tag, str): return None else: txt, tail = cls.element_text_and_tail(element) return AnyElement( qname=element.tag, text=txt or None, tail=tail or None, attributes={k: v for k, v in element.attrib.items()}, )
[docs] @classmethod def element_text_and_tail(cls, element: Element) -> Tuple: return ( element.text.strip() if element.text else None, element.tail.strip() if element.tail else None, )