[docs]@dataclassclassJsonParser(AbstractParser):""" Json parser for dataclasses. :param config: Parser configuration :param context: Model context provider :param load_factory: Replace the default json.load call with another implementation """config:ParserConfig=field(default_factory=ParserConfig)context:XmlContext=field(default_factory=XmlContext)load_factory:Callable=field(default=json.load)
[docs]defparse(self,source:Any,clazz:Optional[Type[T]]=None)->T:"""Parse the input stream or filename and return the resulting object tree."""data=self.load_json(source)tp=self.verify_type(clazz,data)withwarnings.catch_warnings():ifself.config.fail_on_converter_warnings:warnings.filterwarnings("error",category=ConverterWarning)try:ifnotisinstance(data,list):returnself.bind_dataclass(data,tp)return[self.bind_dataclass(obj,tp)forobjindata]# type: ignoreexceptConverterWarningase:raiseParserError(e)
defload_json(self,source:Any)->Union[Dict,List]:ifnothasattr(source,"read"):withopen(source,"rb")asfp:returnself.load_factory(fp)returnself.load_factory(source)defverify_type(self,clazz:Optional[Type[T]],data:Union[Dict,List])->Type[T]:ifclazzisNone:returnself.detect_type(data)try:origin=get_origin(clazz)list_type=FalseiforiginisList:list_type=Trueargs=get_args(clazz)iflen(args)!=1ornotself.context.class_type.is_model(args[0]):raiseTypeError()clazz=args[0]eliforiginisnotNone:raiseTypeError()exceptTypeError:raiseParserError(f"Invalid clazz argument: {clazz}")iflist_type!=isinstance(data,list):iflist_type:raiseParserError("Document is object, expected array")raiseParserError("Document is array, expected object")returnclazz# type: ignoredefdetect_type(self,data:Union[Dict,List])->Type[T]:ifnotdata:raiseParserError("Document is empty, can not detect type")keys=data[0].keys()ifisinstance(data,list)elsedata.keys()clazz:Optional[Type[T]]=self.context.find_type_by_fields(set(keys))ifclazz:returnclazzraiseParserError(f"Unable to locate model with properties({list(keys)})")
[docs]defbind_dataclass(self,data:Dict,clazz:Type[T])->T:"""Recursively build the given model from the input dict data."""ifset(data.keys())==self.context.class_type.derived_keys:returnself.bind_derived_dataclass(data,clazz)meta=self.context.build(clazz)xml_vars=meta.get_all_vars()params={}forkey,valueindata.items():is_array=collections.is_array(value)var=self.find_var(xml_vars,key,is_array)ifvarisNoneandself.config.fail_on_unknown_properties:raiseParserError(f"Unknown property {clazz.__qualname__}.{key}")ifvarandvar.init:params[var.name]=self.bind_value(meta,var,value)try:returnself.config.class_factory(clazz,params)exceptTypeErrorase:raiseParserError(e)
defbind_derived_dataclass(self,data:Dict,clazz:Type[T])->Any:qname=data["qname"]xsi_type=data["type"]params=data["value"]generic=self.context.class_type.derived_elementifclazzisgeneric:real_clazz:Optional[Type[T]]=Noneifxsi_type:real_clazz=self.context.find_type(xsi_type)ifreal_clazzisNone:raiseParserError(f"Unable to locate derived model "f"with properties({list(params.keys())})")value=self.bind_dataclass(params,real_clazz)else:value=self.bind_dataclass(params,clazz)returngeneric(qname=qname,type=xsi_type,value=value)
[docs]defbind_best_dataclass(self,data:Dict,classes:Iterable[Type[T]])->T:"""Attempt to bind the given data to one possible models, if more than one is successful return the object with the highest score."""obj=Nonekeys=set(data.keys())max_score=-1.0forclazzinclasses:ifnotself.context.class_type.is_model(clazz):continueifself.context.local_names_match(keys,clazz):candidate=self.bind_optional_dataclass(data,clazz)score=self.context.class_type.score_object(candidate)ifscore>max_score:max_score=scoreobj=candidateifobj:returnobjraiseParserError(f"Failed to bind object with properties({list(data.keys())}) "f"to any of the {[cls.__qualname__forclsinclasses]}")
[docs]defbind_optional_dataclass(self,data:Dict,clazz:Type[T])->Optional[T]:"""Recursively build the given model from the input dict data but fail on any converter warnings."""try:withwarnings.catch_warnings():warnings.filterwarnings("error",category=ConverterWarning)returnself.bind_dataclass(data,clazz)exceptException:returnNone
[docs]defbind_value(self,meta:XmlMeta,var:XmlVar,value:Any,recursive:bool=False)->Any:"""Main entry point for binding values."""# xs:anyAttributes get it out of the way, it's the mapping exception!ifvar.is_attributes:returndict(value)# Repeating element, recursively bind the valuesifnotrecursiveandvar.list_elementandisinstance(value,list):assertvar.factoryisnotNonereturnvar.factory(self.bind_value(meta,var,val,True)forvalinvalue)# If not dict this is an text or tokens value.ifnotisinstance(value,dict):returnself.bind_text(meta,var,value)keys=value.keys()ifkeys==self.context.class_type.any_keys:# Bind data to AnyElement dataclassreturnself.bind_dataclass(value,self.context.class_type.any_element)ifkeys==self.context.class_type.derived_keys:# Bind data to AnyElement dataclassreturnself.bind_derived_value(meta,var,value)# Bind data to a user defined dataclassreturnself.bind_complex_type(meta,var,value)
[docs]defbind_text(self,meta:XmlMeta,var:XmlVar,value:Any)->Any:"""Bind text/tokens value entrypoint."""ifvar.is_elements:# Compound field we need to match the value to one of the choice elementscheck_subclass=self.context.class_type.is_model(value)choice=var.find_value_choice(value,check_subclass)ifchoice:returnself.bind_text(meta,choice,value)raiseParserError(f"Failed to bind '{value}' "f"to {meta.clazz.__qualname__}.{var.name} field")ifvar.any_typeorvar.is_wildcard:# field can support any object return the value as it isreturnvalue# Convert value according to the field typesreturnParserUtils.parse_value(value=value,types=var.types,default=var.default,ns_map=EMPTY_MAP,tokens_factory=var.tokens_factory,format=var.format,)
[docs]defbind_complex_type(self,meta:XmlMeta,var:XmlVar,data:Dict)->Any:"""Bind data to a user defined dataclass."""ifvar.is_clazz_union:# Union of dataclassesreturnself.bind_best_dataclass(data,var.types)ifvar.elements:# Compound field with multiple choicesreturnself.bind_best_dataclass(data,var.element_types)ifvar.any_typeorvar.is_wildcard:# xs:anyType element, check all meta classesreturnself.bind_best_dataclass(data,meta.element_types)assertvar.clazzisnotNonesubclasses=set(self.context.get_subclasses(var.clazz))ifsubclasses:# field annotation is an abstract/base typesubclasses.add(var.clazz)returnself.bind_best_dataclass(data,subclasses)returnself.bind_dataclass(data,var.clazz)
[docs]defbind_derived_value(self,meta:XmlMeta,var:XmlVar,data:Dict)->T:"""Bind derived element entry point."""qname=data["qname"]xsi_type=data["type"]params=data["value"]ifvar.elements:choice=var.find_choice(qname)ifchoiceisNone:raiseParserError(f"Unable to locate compound element"f" {meta.clazz.__qualname__}.{var.name}[{qname}]")returnself.bind_derived_value(meta,choice,data)ifnotisinstance(params,dict):value=self.bind_text(meta,var,params)elifvar.clazz:value=self.bind_complex_type(meta,var,params)elifxsi_type:clazz:Optional[Type]=self.context.find_type(xsi_type)ifclazzisNone:raiseParserError(f"Unable to locate xsi:type `{xsi_type}`")value=self.bind_dataclass(params,clazz)else:value=self.bind_best_dataclass(params,meta.element_types)generic=self.context.class_type.derived_elementreturngeneric(qname=qname,value=value,type=xsi_type)