Source code for xsdata.formats.dataclass.parsers.json

import json
import warnings
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Type
from typing import Union

from xsdata.exceptions import ConverterWarning
from xsdata.exceptions import ParserError
from xsdata.formats.bindings import AbstractParser
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.models.elements import XmlMeta
from xsdata.formats.dataclass.models.elements import XmlVar
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.parsers.utils import ParserUtils
from xsdata.formats.dataclass.typing import get_args
from xsdata.formats.dataclass.typing import get_origin
from xsdata.utils import collections
from xsdata.utils.constants import EMPTY_MAP


[docs]@dataclass class JsonParser(AbstractParser): """ Json parser for dataclasses. :param config: Parser configuration :param context: Model context provider :param load_factory: Replace the default json.load call with another implementation """ config: ParserConfig = field(default_factory=ParserConfig) context: XmlContext = field(default_factory=XmlContext) load_factory: Callable = field(default=json.load)
[docs] def parse(self, source: Any, clazz: Optional[Type[T]] = None) -> T: """Parse the input stream or filename and return the resulting object tree.""" data = self.load_json(source) tp = self.verify_type(clazz, data) with warnings.catch_warnings(): if self.config.fail_on_converter_warnings: warnings.filterwarnings("error", category=ConverterWarning) try: if not isinstance(data, list): return self.bind_dataclass(data, tp) return [self.bind_dataclass(obj, tp) for obj in data] # type: ignore except ConverterWarning as e: raise ParserError(e)
def load_json(self, source: Any) -> Union[Dict, List]: if not hasattr(source, "read"): with open(source, "rb") as fp: return self.load_factory(fp) return self.load_factory(source) def verify_type(self, clazz: Optional[Type[T]], data: Union[Dict, List]) -> Type[T]: if clazz is None: return self.detect_type(data) try: origin = get_origin(clazz) list_type = False if origin is List: list_type = True args = get_args(clazz) if len(args) != 1 or not self.context.class_type.is_model(args[0]): raise TypeError() clazz = args[0] elif origin is not None: raise TypeError() except TypeError: raise ParserError(f"Invalid clazz argument: {clazz}") if list_type != isinstance(data, list): if list_type: raise ParserError("Document is object, expected array") raise ParserError("Document is array, expected object") return clazz # type: ignore def detect_type(self, data: Union[Dict, List]) -> Type[T]: if not data: raise ParserError("Document is empty, can not detect type") keys = data[0].keys() if isinstance(data, list) else data.keys() clazz: Optional[Type[T]] = self.context.find_type_by_fields(set(keys)) if clazz: return clazz raise ParserError(f"Unable to locate model with properties({list(keys)})")
[docs] def bind_dataclass(self, data: Dict, clazz: Type[T]) -> T: """Recursively build the given model from the input dict data.""" if set(data.keys()) == self.context.class_type.derived_keys: return self.bind_derived_dataclass(data, clazz) meta = self.context.build(clazz) xml_vars = meta.get_all_vars() params = {} for key, value in data.items(): is_array = collections.is_array(value) var = self.find_var(xml_vars, key, is_array) if var is None and self.config.fail_on_unknown_properties: raise ParserError(f"Unknown property {clazz.__qualname__}.{key}") if var and var.init: params[var.name] = self.bind_value(meta, var, value) try: return self.config.class_factory(clazz, params) except TypeError as e: raise ParserError(e)
def bind_derived_dataclass(self, data: Dict, clazz: Type[T]) -> Any: qname = data["qname"] xsi_type = data["type"] params = data["value"] generic = self.context.class_type.derived_element if clazz is generic: real_clazz: Optional[Type[T]] = None if xsi_type: real_clazz = self.context.find_type(xsi_type) if real_clazz is None: raise ParserError( f"Unable to locate derived model " f"with properties({list(params.keys())})" ) value = self.bind_dataclass(params, real_clazz) else: value = self.bind_dataclass(params, clazz) return generic(qname=qname, type=xsi_type, value=value)
[docs] def bind_best_dataclass(self, data: Dict, classes: Iterable[Type[T]]) -> T: """Attempt to bind the given data to one possible models, if more than one is successful return the object with the highest score.""" obj = None keys = set(data.keys()) max_score = -1.0 for clazz in classes: if not self.context.class_type.is_model(clazz): continue if self.context.local_names_match(keys, clazz): candidate = self.bind_optional_dataclass(data, clazz) score = self.context.class_type.score_object(candidate) if score > max_score: max_score = score obj = candidate if obj: return obj raise ParserError( f"Failed to bind object with properties({list(data.keys())}) " f"to any of the {[cls.__qualname__ for cls in classes]}" )
[docs] def bind_optional_dataclass(self, data: Dict, clazz: Type[T]) -> Optional[T]: """Recursively build the given model from the input dict data but fail on any converter warnings.""" try: with warnings.catch_warnings(): warnings.filterwarnings("error", category=ConverterWarning) return self.bind_dataclass(data, clazz) except Exception: return None
[docs] def bind_value( self, meta: XmlMeta, var: XmlVar, value: Any, recursive: bool = False ) -> Any: """Main entry point for binding values.""" # xs:anyAttributes get it out of the way, it's the mapping exception! if var.is_attributes: return dict(value) # Repeating element, recursively bind the values if not recursive and var.list_element and isinstance(value, list): assert var.factory is not None return var.factory(self.bind_value(meta, var, val, True) for val in value) # If not dict this is an text or tokens value. if not isinstance(value, dict): return self.bind_text(meta, var, value) keys = value.keys() if keys == self.context.class_type.any_keys: # Bind data to AnyElement dataclass return self.bind_dataclass(value, self.context.class_type.any_element) if keys == self.context.class_type.derived_keys: # Bind data to AnyElement dataclass return self.bind_derived_value(meta, var, value) # Bind data to a user defined dataclass return self.bind_complex_type(meta, var, value)
[docs] def bind_text(self, meta: XmlMeta, var: XmlVar, value: Any) -> Any: """Bind text/tokens value entrypoint.""" if var.is_elements: # Compound field we need to match the value to one of the choice elements check_subclass = self.context.class_type.is_model(value) choice = var.find_value_choice(value, check_subclass) if choice: return self.bind_text(meta, choice, value) raise ParserError( f"Failed to bind '{value}' " f"to {meta.clazz.__qualname__}.{var.name} field" ) if var.any_type or var.is_wildcard: # field can support any object return the value as it is return value # Convert value according to the field types return ParserUtils.parse_value( value=value, types=var.types, default=var.default, ns_map=EMPTY_MAP, tokens_factory=var.tokens_factory, format=var.format, )
[docs] def bind_complex_type(self, meta: XmlMeta, var: XmlVar, data: Dict) -> Any: """Bind data to a user defined dataclass.""" if var.is_clazz_union: # Union of dataclasses return self.bind_best_dataclass(data, var.types) if var.elements: # Compound field with multiple choices return self.bind_best_dataclass(data, var.element_types) if var.any_type or var.is_wildcard: # xs:anyType element, check all meta classes return self.bind_best_dataclass(data, meta.element_types) assert var.clazz is not None subclasses = set(self.context.get_subclasses(var.clazz)) if subclasses: # field annotation is an abstract/base type subclasses.add(var.clazz) return self.bind_best_dataclass(data, subclasses) return self.bind_dataclass(data, var.clazz)
[docs] def bind_derived_value(self, meta: XmlMeta, var: XmlVar, data: Dict) -> T: """Bind derived element entry point.""" qname = data["qname"] xsi_type = data["type"] params = data["value"] if var.elements: choice = var.find_choice(qname) if choice is None: raise ParserError( f"Unable to locate compound element" f" {meta.clazz.__qualname__}.{var.name}[{qname}]" ) return self.bind_derived_value(meta, choice, data) if not isinstance(params, dict): value = self.bind_text(meta, var, params) elif var.clazz: value = self.bind_complex_type(meta, var, params) elif xsi_type: clazz: Optional[Type] = self.context.find_type(xsi_type) if clazz is None: raise ParserError(f"Unable to locate xsi:type `{xsi_type}`") value = self.bind_dataclass(params, clazz) else: value = self.bind_best_dataclass(params, meta.element_types) generic = self.context.class_type.derived_element return generic(qname=qname, value=value, type=xsi_type)
@classmethod def find_var( cls, xml_vars: Sequence[XmlVar], local_name: str, is_list: bool = False ) -> Optional[XmlVar]: for var in xml_vars: if var.local_name == local_name: var_is_list = var.list_element or var.tokens if is_list == var_is_list or var.clazz is None: return var return None
@dataclass class DictConverter(JsonParser): def convert(self, data: Dict, clazz: Type[T]) -> T: return self.bind_dataclass(data, clazz)