Source code for xsdata.codegen.mappers.definitions

from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple

from xsdata.codegen.models import Attr
from xsdata.codegen.models import AttrType
from xsdata.codegen.models import Class
from xsdata.codegen.models import Status
from xsdata.formats.dataclass.models.generics import AnyElement
from xsdata.logger import logger
from xsdata.models.enums import DataType
from xsdata.models.enums import Namespace
from xsdata.models.enums import Tag
from xsdata.models.wsdl import Binding
from xsdata.models.wsdl import BindingMessage
from xsdata.models.wsdl import BindingOperation
from xsdata.models.wsdl import Definitions
from xsdata.models.wsdl import Part
from xsdata.models.wsdl import PortType
from xsdata.models.wsdl import PortTypeMessage
from xsdata.models.wsdl import PortTypeOperation
from xsdata.models.wsdl import ServicePort
from xsdata.models.xsd import Element
from xsdata.utils import collections
from xsdata.utils import text
from xsdata.utils.collections import first
from xsdata.utils.namespaces import build_qname
from xsdata.utils.namespaces import split_qname


[docs]class DefinitionsMapper: """ Map a definitions instance to message and service classes. Currently only SOAP 1.1 bindings with rpc/document style is supported. """
[docs] @classmethod def map(cls, definitions: Definitions) -> List[Class]: """Step 1: Main mapper entry point.""" return [ obj for service in definitions.services for port in service.ports for obj in cls.map_port(definitions, port) ]
[docs] @classmethod def map_port(cls, definitions: Definitions, port: ServicePort) -> Iterator[Class]: """Step 2: Match a ServicePort to a Binding and PortType object and delegate the process to the next entry point.""" binding = definitions.find_binding(text.suffix(port.binding)) port_type = definitions.find_port_type(text.suffix(binding.type)) elements = collections.concat(binding.extended_elements, port.extended_elements) config = cls.attributes(elements) yield from cls.map_binding(definitions, binding, port_type, config)
[docs] @classmethod def map_binding( cls, definitions: Definitions, binding: Binding, port_type: PortType, config: Dict, ) -> Iterator[Class]: """Step 3: Match every BindingOperation to a PortTypeOperation and delegate the process for each operation to the next entry point.""" for operation in binding.unique_operations(): cfg = config.copy() cfg.update(cls.attributes(operation.extended_elements)) port_operation = port_type.find_operation(operation.name) yield from cls.map_binding_operation( definitions, operation, port_operation, cfg, port_type.name )
[docs] @classmethod def map_binding_operation( cls, definitions: Definitions, binding_operation: BindingOperation, port_type_operation: PortTypeOperation, config: Dict, name: str, ) -> Iterator[Class]: """Step 4: Convert a BindingOperation to a service class and delegate the process of all the message classes to the next entry point.""" attrs = [ cls.build_attr(key, DataType.STRING.qname, native=True, default=config[key]) for key in sorted(config.keys(), key=len) if config[key] ] style = config.get("style", "document") name = f"{name}_{binding_operation.name}" namespace = cls.operation_namespace(config) operation_messages = cls.map_binding_operation_messages( definitions, binding_operation, port_type_operation, name, style, namespace ) for message_class in operation_messages: yield message_class # Only Envelope classes need to be added in service input/output if message_class.meta_name: message_type = message_class.name.split("_")[-1] attrs.append(cls.build_attr(message_type, message_class.qname)) yield Class( qname=build_qname(definitions.target_namespace, name), status=Status.PROCESSED, type=type(binding_operation), module=definitions.module, ns_map=binding_operation.ns_map, attrs=attrs, )
[docs] @classmethod def map_binding_operation_messages( cls, definitions: Definitions, operation: BindingOperation, port_type_operation: PortTypeOperation, name: str, style: str, namespace: Optional[str], ) -> Iterator[Class]: """Step 5: Map the BindingOperation messages to classes.""" messages: List[Tuple[str, BindingMessage, PortTypeMessage]] = [] if operation.input: messages.append(("input", operation.input, port_type_operation.input)) if operation.output: messages.append(("output", operation.output, port_type_operation.output)) for suffix, binding_message, port_type_message in messages: if style == "rpc": yield cls.build_message_class(definitions, port_type_message) target = cls.build_envelope_class( definitions, binding_message, port_type_message, f"{name}_{suffix}", style, namespace, ) if suffix == "output": cls.build_envelope_fault(definitions, port_type_operation, target) yield target
[docs] @classmethod def build_envelope_fault( cls, definitions: Definitions, port_type_operation: PortTypeOperation, target: Class, ): """Build inner fault class with default fields.""" ns_map: Dict = {} body = next(inner for inner in target.inner if inner.name == "Body") fault_class = cls.build_inner_class(body, "Fault", target.namespace) detail_attrs: List[Attr] = [] for fault in port_type_operation.faults: message = definitions.find_message(text.suffix(fault.message)) detail_attrs.extend(cls.build_parts_attributes(message.parts, ns_map)) default_fields = ["faultcode", "faultstring", "faultactor"] if detail_attrs: detail = cls.build_inner_class(fault_class, "detail", namespace="") detail.attrs.extend(detail_attrs) else: default_fields.append("detail") collections.prepend( fault_class.attrs, *[ cls.build_attr(f, DataType.STRING.qname, native=True, namespace="") for f in default_fields ], )
[docs] @classmethod def build_envelope_class( cls, definitions: Definitions, binding_message: BindingMessage, port_type_message: PortTypeMessage, name: str, style: str, namespace: Optional[str], ) -> Class: """Step 6.1: Build Envelope class for the given binding message with attributes from the port type message.""" target = Class( qname=build_qname(definitions.target_namespace, name), meta_name="Envelope", type=type(binding_message), module=definitions.module, ns_map=binding_message.ns_map, namespace=namespace, ) message = port_type_message.message for ext in binding_message.extended_elements: assert ext.qname is not None local_name = split_qname(ext.qname)[1].title() inner = cls.build_inner_class(target, local_name) if style == "rpc" and local_name == "Body": namespace = ext.attributes.get("namespace") attrs = cls.map_port_type_message(port_type_message, namespace) else: attrs = cls.map_binding_message_parts( definitions, message, ext, inner.ns_map ) inner.attrs.extend(attrs) return target
[docs] @classmethod def build_message_class( cls, definitions: Definitions, port_type_message: PortTypeMessage ) -> Class: """Step 6.2: Build the input/output message class of an rpc style operation.""" message_name = text.suffix(port_type_message.message) definition_message = definitions.find_message(message_name) ns_map = definition_message.ns_map.copy() return Class( qname=build_qname(definitions.target_namespace, message_name), status=Status.PROCESSED, type=Element, module=definitions.module, ns_map=ns_map, attrs=list(cls.build_parts_attributes(definition_message.parts, ns_map)), )
[docs] @classmethod def build_inner_class( cls, target: Class, name: str, namespace: Optional[str] = None ) -> Class: """ Build or retrieve an inner class for the given target class by the given name. This helper will also create a forward reference attribute for the parent class. """ inner = first(inner for inner in target.inner if inner.name == name) if not inner: inner = Class( qname=build_qname(name), type=BindingMessage, module=target.module, ns_map=target.ns_map.copy(), ) attr = cls.build_attr(name, inner.qname, forward=True, namespace=namespace) target.inner.append(inner) target.attrs.append(attr) return inner
[docs] @classmethod def map_port_type_message( cls, message: PortTypeMessage, namespace: Optional[str] ) -> Iterator[Attr]: """Build an attribute for the given port type message.""" prefix, name = text.split(message.message) source_namespace = message.ns_map.get(prefix) yield cls.build_attr( name, qname=build_qname(source_namespace, name), namespace=namespace )
[docs] @classmethod def map_binding_message_parts( cls, definitions: Definitions, message: str, extended: AnyElement, ns_map: Dict ) -> Iterator[Attr]: """Find a Message instance and map its parts to attributes according to the the extensible element..""" parts = [] if "part" in extended.attributes: parts.append(extended.attributes["part"]) elif "parts" in extended.attributes: parts.extend(extended.attributes["parts"].split()) if "message" in extended.attributes: message_name = split_qname(extended.attributes["message"])[1] else: message_name = text.suffix(message) definition_message = definitions.find_message(message_name) message_parts = definition_message.parts if parts: message_parts = [part for part in message_parts if part.name in parts] yield from cls.build_parts_attributes(message_parts, ns_map)
[docs] @classmethod def build_parts_attributes(cls, parts: List[Part], ns_map: Dict) -> Iterator[Attr]: """ Build attributes for the given list of parts. :param parts: List of parts :param ns_map: A prefix-URI Namespace mapping """ for part in parts: if part.element: prefix, type_name = text.split(part.element) name = type_name elif part.type: prefix, type_name = text.split(part.type) name = part.name else: logger.warning("Skip untyped message part %s", part.name) continue ns_map.update(part.ns_map) namespace = part.ns_map.get(prefix) type_qname = build_qname(namespace, type_name) native = namespace == Namespace.XS.uri namespace = "" if part.type else namespace yield cls.build_attr(name, type_qname, namespace=namespace, native=native)
[docs] @classmethod def operation_namespace(cls, config: Dict) -> Optional[str]: transport = config.get("transport") namespace = None if transport == "http://schemas.xmlsoap.org/soap/http": namespace = "http://schemas.xmlsoap.org/soap/envelope/" return namespace
[docs] @classmethod def attributes(cls, elements: Iterator[AnyElement]) -> Dict: """Return all attributes from all extended elements as a dictionary.""" return { split_qname(qname)[1]: value for element in elements if isinstance(element, AnyElement) for qname, value in element.attributes.items() }
[docs] @classmethod def build_attr( cls, name: str, qname: str, native: bool = False, forward: bool = False, namespace: Optional[str] = None, default: Optional[str] = None, ) -> Attr: """Builder method for attributes.""" return Attr( tag=Tag.ELEMENT, name=name, local_name=name, namespace=namespace, default=default, types=[AttrType(qname=qname, forward=forward, native=native)], )