Source code for xsdata.formats.dataclass.parsers.handlers.native

from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Tuple
from xml import sax
from xml.etree.ElementTree import iterparse

from xsdata.exceptions import XmlHandlerError
from xsdata.formats.dataclass.parsers.handlers import LxmlSaxHandler
from xsdata.formats.dataclass.parsers.mixins import XmlHandler
from xsdata.models.enums import EventType
from xsdata.utils.namespaces import build_qname

EVENTS = (EventType.START, EventType.END, EventType.START_NS)


[docs]@dataclass class XmlEventHandler(XmlHandler): """Content handler based on xml.ElementTree iterparse api.""" ns_map: Dict = field(default_factory=dict, init=False)
[docs] def parse(self, source: Any) -> Any: """ Parse an XML document from a system identifier or an InputSource. :raises XmlHandlerError: If process xinclude config is enabled. """ if self.parser.config.process_xinclude: raise XmlHandlerError( f"{type(self).__name__} doesn't support xinclude elements." ) return self.process_context(iterparse(source, EVENTS)) # nosec
[docs] def process_context(self, context: Iterable) -> Any: """Iterate context and push the events to main parser.""" obj = None self.ns_map = {} for event, element in context: if event == EventType.START: self.parser.start( self.clazz, self.queue, self.objects, element.tag, element.attrib, self.start_ns_bulk(self.ns_map), ) self.ns_map = {} elif event == EventType.END: obj = self.parser.end( self.queue, self.objects, element.tag, element.text, element.tail, ) element.clear() elif event == EventType.START_NS: prefix, uri = element self.ns_map[prefix] = uri else: raise XmlHandlerError(f"Unhandled event: `{event}`.") return obj
[docs]@dataclass class XmlSaxHandler(LxmlSaxHandler, sax.handler.ContentHandler): """ Xml sax content handler. :param ns_map: Active element prefix-URI Namespace mapping """ ns_map: Dict = field(default_factory=dict)
[docs] def parse(self, source: Any) -> Any: """ Parse an XML document from a system identifier or an InputSource. :raises XmlHandlerError: If process xinclude config is enabled. """ if self.parser.config.process_xinclude: raise XmlHandlerError( f"{type(self).__name__} doesn't support xinclude elements." ) parser = sax.make_parser() # nosec parser.setFeature(sax.handler.feature_namespaces, True) parser.setContentHandler(self) parser.parse(source) return self.close()
[docs] def startElementNS(self, name: Tuple[Optional[str], str], qname: Any, attrs: Dict): """ Start element notification receiver. The receiver will flush any previous active element, append a new data frame to collect data content for the next active element and notify the main parser to prepare for next binding instruction. Converts name and attribute keys to fully qualified tags to respect the main parser api, eg (foo, bar) -> {foo}bar """ tag = build_qname(name[0], name[1]) attrs = {build_qname(key[0], key[1]): value for key, value in attrs.items()} self.start(tag, attrs, self.ns_map) self.ns_map = {}
[docs] def endElementNS(self, name: Tuple, qname: Any): """ End element notification receiver. The receiver will flush any previous active element and set the next element to be flushed. Converts name and attribute keys to fully qualified tags to respect the main parser api, eg (foo, bar) -> {foo}bar """ self.end(build_qname(name[0], name[1]))
[docs] def characters(self, content: str): """Proxy for the data notification receiver.""" self.data(content)
[docs] def startPrefixMapping(self, prefix: str, uri: str): """Start element prefix-URI namespace mapping.""" self.ns_map[prefix] = uri