Source code for sdv.validators.xml_schema

# Copyright (c) 2015, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

# builtin
import os
import collections

# external
from lxml import etree

# internal
from sdv import errors, utils, xmlconst

# relative
from . import base


[docs]class XmlSchemaError(base.ValidationError): """Represents an XML Schema validation error. Args: error: An error returned from ``etree`` XML Schema validation error log. Attributes: message: The XML validation error message. """ def __init__(self, error): super(XmlSchemaError, self).__init__() if error: self.message = unicode(error) else: self.message = None @property
[docs] def line(self): """Returns the line number associated with the error.""" if not self.message: return None try: # libxml2 schema validation errors are tokenized by colons tokenized = self.message.split(":") return int(tokenized[1]) except (IndexError, TypeError, ValueError): return None
[docs] def as_dict(self): """Returns a dictionary representation. Keys: * ``'message'``: The error message * ``'line'``: The line number associated with the error """ return {'message':self.message, 'line': self.line}
def __unicode__(self): return unicode(self.message) def __str__(self): return unicode(self).encode("utf-8")
[docs]class XmlValidationResults(base.ValidationResults): """Results of XML schema validation. Returned from :meth:`XmlSchemaValidator.validate`. Args: is_valid: The validation result. errors: A list of strings reported from the XML validation engine. Attributes: is_valid: ``True`` if the validation was successful and ``False`` otherwise. """ def __init__(self, is_valid, errors=None): super(XmlValidationResults, self).__init__(is_valid) self.errors = errors @property def errors(self): """"A list of :class:`XmlSchemaError` validation errors.""" return self._errors @errors.setter
[docs] def errors(self, value): if not value: self._errors = [] elif utils.is_iterable(value): self._errors = [XmlSchemaError(x) for x in value] else: self._errors = [XmlSchemaError(value)]
[docs] def as_dict(self): """A dictionary representation of the :class:`.XmlValidationResults` instance. Keys: * ``'result'``: The validation results (``True`` or ``False``) * ``'errors'``: A list of validation errors. Returns: A dictionary representation of an instance of this class. """ d = super(XmlValidationResults, self).as_dict() if self.errors: d['errors'] = [x.as_dict() for x in self.errors] return d
[docs]class XmlSchemaValidator(object): """Validates XML instance documents. Note: If validating against a single XML schema document, use ``lxml.etree.XMLSchema`` instead. Args: schema_dir: A directory of schema files used to validate XML instance documents. Attributes: OVERRIDE_SCHEMALOC: Overrides the schemalocation for a given namespace that may be discovered when walking `schema_dir`. This does not alter the schemalocation of namespaces declared by ``xsi:schemalLocation`` attributes if validating via ``xsi:schemaLocation``. """ OVERRIDE_SCHEMALOC = {} def __init__(self, schema_dir=None): self._schemalocs = self._map_schemalocs(schema_dir) def _get_includes(self, fp, root): """Returns a list of ``xs:include`` targets found within `root`. The returned list contains paths to the ``xs:include`` targets. The file paths are absolute. Note: This assumes all includes point to local schemas. Remote schema locations will not be parsed correctly! Args: fp: The file path to `root`. This is used to determine the path to the included schema if the include path is relative. root: An etree._Element representation of the schema. Returns: A list of file paths to included schemas. """ xs_includes = root.findall(xmlconst.TAG_XS_INCLUDE) dir_ = os.path.dirname(fp) includes = [] for include in xs_includes: loc = include.attrib['schemaLocation'] # NOT xsi:schemaLocation! # If the path is relative, get the absolute path if os.path.isabs(loc): locpath = loc else: locpath = os.path.abspath(os.path.join(dir_, loc)) includes.append(locpath) return includes def _build_include_graph(self, schema_paths): """Builds a graph of ``xs:include`` directive sources and targets for the schemas contained by the `schema_paths` list. Args: schema_paths: A list of schema file paths Returns: A graph representing ``xs:include`` statements found within the schemas in `schema_paths`. """ graph = collections.defaultdict(list) for fp in schema_paths: root = utils.get_etree_root(fp) includes = self._get_includes(fp, root) graph[fp].extend(includes) return graph def _is_included(self, graph, fp): """Returns ``True`` if the schema at `fp` was included by any other schemas in `graph`. """ return any(fp in includes for includes in graph.itervalues()) def _get_include_root(self, ns, list_schemas): """Attempts to determine the "root" schema for a targetNamespace. This builds a graph of ``xs:include`` directive sources and targets and attempts to find a common base for all includes. Note: If no schemas in `list_schemas` ``xs:include`` another schema, then ``list_schemas[0]`` is returned. This occurs when duplicate schemas (or different versions of the same schema that define the same namespace) were encountered in the initialization schema directory. Args: ns: The target namespace list_schemas: A list of schemas which exist or define the target namespace. Returns: A path to the root schema for the input `ns`. """ graph = self._build_include_graph(list_schemas) if all(not(x) for x in graph.itervalues()): return list_schemas[0] for fp in graph: has_ancestors = self._is_included(graph, fp) has_children = len(graph[fp]) > 0 if has_children and not has_ancestors: return fp msg = "Unable to determine base schema for %s" % ns raise errors.XMLSchemaIncludeError(msg) def _process_includes(self, imports): """Attempts to resolve cases where multiple schemas declare the same ``targetNamespace`` value. This is due to the use of the ``xs:include`` directive, which can be found in OASIS CIQ schemas along with others. This is done by building an ``xs:include`` graph, and returning the root of that graph. Note: This method is flawed! This assumes that the ``xs:include`` graph is really a tree, and has a root which can be imported and used to validate all instance data which belongs to its namespace. A better way may be to programatically combine all "split" schemas within a single schema document and map the targetNamespace to that combined schema document. Args: imports: A dictionary of namespaces to a list of schema file paths. Most often, this list will have only one file path in it. Returns: A dictionary of schema targetNamespaces to a single schema file path. """ processed = {} for ns, schemas in imports.iteritems(): if len(schemas) > 1: base_schema = self._get_include_root(ns, schemas) processed[ns] = base_schema else: processed[ns] = schemas[0] return processed def _walk_schemas(self, schema_dir): """Walks the `schema_dir` directory and builds a dictionary of schema ``targetNamespace`` values to a list of schema file paths. Because multiple schemas can declare the same ``targetNamespace`` value, the ``value`` portion of the returned dictionary is a ``list``. Note: This method attempts to resolve issues where the same schema exists in two or more locations under `schema_dir` by keeping a record of visited target namespaces and filenames. If the same filename:targetNS (not file path) pair has been visited already, the file is not added to the schemalocation dictionary. Returns: A dictionary of schema ``targetNamespace`` values to a list of schema file paths. """ seen = [] schemalocs = collections.defaultdict(list) for top, _, files in os.walk(schema_dir): for fn in files: if not fn.endswith('.xsd'): continue fp = os.path.abspath(os.path.join(top, fn)) target_ns = utils.get_target_ns(fp) if (target_ns, fn) in seen: continue schemalocs[target_ns].append(fp) seen.append((target_ns, fn)) for ns, loc in self.OVERRIDE_SCHEMALOC.iteritems(): schemalocs[ns] = [loc] return schemalocs def _map_schemalocs(self, schema_dir): """Walks the `schema_dir` directory and builds a dictionary which maps schema targetNamespace values to schema file paths. If `schema_dir` is ``None``, this function returns immediately. Returns: A dictionary mapping schema ``targetNamespace`` values to the schema file path. Raises: .XMlSchemaIncludeError: If an error occurs while processing ``xs:include`` directives. """ if not schema_dir: return schemalocs = self._walk_schemas(schema_dir) schemalocs = self._process_includes(schemalocs) return schemalocs def _parse_schemaloc(self, root): """Parses the ``xsi:schemaLocation`` attribute found on `root`. Returns: A dictionary of namespaces to schema locations. Raises: .XMLSchemaImportError: If `root` did not contain an ``xsi:schemaLocation`` attribute. """ if xmlconst.TAG_SCHEMALOCATION in root.attrib: imports = utils.get_schemaloc_pairs(root) return dict(imports) msg = ("Cannot validate using xsi:schemaLocation. The " "xsi:schemaLocation attribute was not found on the input " "document") raise errors.XMLSchemaImportError(msg) def _get_required_schemas(self, root): """Retrieve all the namespaces and schemalocations needed to validate `root`. Args: root: An etree._Element XML document. Returns: A dictionary mapping namespaces to schemalocations. """ def _get_schemalocs(node): schemalocs = {} for ns in node.nsmap.itervalues(): if ns not in self._schemalocs: continue schemalocs[ns] = self._schemalocs[ns] return schemalocs imports = {} for elem in root.iter(): schemalocs = _get_schemalocs(elem) imports.update(schemalocs) return imports def _build_required_imports(self, doc, schemaloc=False): root = utils.get_etree_root(doc) if schemaloc: return self._parse_schemaloc(root) return self._get_required_schemas(root) def _build_uber_schema(self, doc, schemaloc=False): """Builds a schema which is made up of ``xs:import`` directives for each schema required to validate `doc`. If schemaloc is ``True``, the ``xsi:schemaLocation`` attribute values are used to create the ``xs:import`` directives. If ``False``, the initialization schema directory is used. Returns: An ``etree.XMLSchema`` instance used to validate `doc`. Raise: .XMLSchemaImportError: If an error occurred while building the dictionary of namespace to schemalocation mappings used to drive the uber schema creation. """ root = utils.get_etree_root(doc) imports = self._build_required_imports(root, schemaloc) if not imports: raise errors.XMLSchemaImportError( "Cannot validate document. Error occurred while determining " "schemas required for validation." ) xsd = etree.fromstring( """ <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="http://stix.mitre.org/tools/validator" elementFormDefault="qualified" attributeFormDefault="qualified"/> """ ) for ns, loc in imports.iteritems(): loc = loc.replace("\\", "/") attrib = {'namespace': ns, 'schemaLocation':loc} import_ = etree.Element(xmlconst.TAG_XS_IMPORT, attrib=attrib) xsd.append(import_) return etree.XMLSchema(xsd)
[docs] def validate(self, doc, schemaloc=False): """Validates an XML instance document. Args: doc: An XML instance document. This can be a filename, file-like object, ``etree._Element``, or ``etree._ElementTree``. schemaloc: If ``True``, the document will be validated using the ``xsi:schemaLocation`` attribute found on the instance document root. Returns: An instance of :class:`.XmlValidationResults`. Raises: .ValidationError: If the class was not initialized with a schema directory and `schemaloc` is ``False`` or if there are any issues parsing `doc`. .XMLSchemaIncludeError: If an error occurs while processing the schemas required for validation. .XMLSchemaIncludeError: If an error occurs while processing ``xs:include`` directives. """ if not (schemaloc or self._schemalocs): raise errors.ValidationError( "No schemas to validate against! Try instantiating " "XmlValidator with use_schemaloc=True or setting the " "schema_dir param in __init__" ) root = utils.get_etree_root(doc) xsd = self._build_uber_schema(root, schemaloc) is_valid = xsd.validate(root) return XmlValidationResults(is_valid, xsd.error_log)
__all__ = [ 'XmlSchemaValidator', 'XmlValidationResults', 'XmlSchemaError' ]