Source code for xsdata.formats.dataclass.parsers.handlers.lxml

from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional

from lxml import etree

from xsdata.exceptions import XmlHandlerError
from xsdata.formats.dataclass.parsers.mixins import XmlHandler
from xsdata.models.enums import EventType

EVENTS = (EventType.START, EventType.END, EventType.START_NS)


[docs]@dataclass class LxmlEventHandler(XmlHandler): """ Event handler based on :class:`lxml.etree.iterparse` api. :param parser: The parser instance to feed with events :param clazz: The target binding model. If None the parser will auto locate it from the active xml context instance :param queue: The XmlNode queue :param objects: The list of intermediate parsed objects, eg [(qname, object)] """
[docs] def parse(self, source: Any) -> Any: """ Parse an XML document from a system identifier or an InputSource. The xml parser will ignore comments, recover from errors. The parser will parse the whole document and then walk down the tree if the process xinclude is enabled. """ if self.parser.config.process_xinclude: tree = etree.parse(source, base_url=self.parser.config.base_url) # nosec tree.xinclude() ctx = etree.iterwalk(tree, EVENTS) else: ctx = etree.iterparse(source, EVENTS, recover=True, remove_comments=True) return self.process_context(ctx)
[docs] def process_context(self, context: Iterable) -> Any: """Iterate context and push the events to main parser.""" obj = None for event, element in context: if event == EventType.START: self.parser.start( self.clazz, self.queue, self.objects, element.tag, element.attrib, element.nsmap, ) elif event == EventType.END: obj = self.parser.end( self.queue, self.objects, element.tag, element.text, element.tail, ) element.clear() elif event == EventType.START_NS: prefix, uri = element self.parser.start_prefix_mapping(prefix, uri) else: raise XmlHandlerError(f"Unhandled event: `{event}`.") return obj
[docs]@dataclass class LxmlSaxHandler(XmlHandler): """ Sax content handler based on :class:`lxml.etree.XMLParser` api. :param parser: The parser instance to feed with events :param clazz: The target binding model. If None the parser will auto locate it from the active xml context instance :param queue: The XmlNode queue :param objects: The list of intermediate parsed objects, eg [(qname, object)] """ # Scope vars data_frames: List = field(init=False, default_factory=list) flush_next: Optional[str] = field(init=False, default=None)
[docs] def parse(self, source: Any) -> Any: """ Parse an XML document from a system identifier or an InputSource. The xml parser will ignore comments, recover from errors and clean duplicate namespace prefixes. """ if self.parser.config.process_xinclude: raise XmlHandlerError( f"{type(self).__name__} doesn't support xinclude elements." ) parser = etree.XMLParser( target=self, recover=True, remove_comments=True, ns_clean=True, resolve_entities=False, no_network=True, ) return etree.parse(source, parser=parser) # nosec
[docs] def start(self, qname: str, attrs: Dict, ns_map: Dict): """ Start element notification receiver. The receiver will flush any previous active element, append a new data frame to collect data content for the next active element and notify the main parser to prepare for next binding instruction. :param qname: Qualified name :param attrs: Attribute key-value map :param ns_map: Namespace prefix-URI map """ self.flush() self.data_frames.append(([], [])) self.parser.start( self.clazz, self.queue, self.objects, qname, attrs, self.start_ns_bulk(ns_map), )
[docs] def end(self, qname: str): """ End element notification receiver. The receiver will flush any previous active element and set the next element to be flushed. :param qname: Qualified name """ self.flush() self.flush_next = qname
[docs] def close(self) -> Any: """ Close document notification receiver. The receiver will flush any previous active element and return the first item in the objects stack. """ try: self.flush() return self.objects[0][1] except IndexError: return None
[docs] def flush(self): """ Flush element notification receiver. The receiver will check if there is an active element present, collect and join the data frames for text/tail content and notify the main parser to finish the binding process for the element. """ if self.flush_next: data = self.data_frames.pop() text = "".join(data[0]) if data[0] else None tail = "".join(data[1]) if data[1] else None self.parser.end(self.queue, self.objects, self.flush_next, text, tail) self.flush_next = None
[docs] def data(self, data: str): """ Data notification receiver. The receiver will append the given data content in the current data frame either in the text position 0 or in the tail position 1 whether the element has ended or not. :param data: Text or tail content """ index = 0 if self.flush_next is None else 1 self.data_frames[-1][index].append(data)