diff --git a/kgforge/core/archetypes/model.py b/kgforge/core/archetypes/model.py index 8c6a51f7..37fb1430 100644 --- a/kgforge/core/archetypes/model.py +++ b/kgforge/core/archetypes/model.py @@ -177,7 +177,7 @@ def validate( # Replace None by self._validate_many to switch to optimized bulk validation. run( self._validate_one, - None, + self._validate_many, data, execute_actions=execute_actions_before, exception=ValidationError, diff --git a/kgforge/specializations/models/demo_model.py b/kgforge/specializations/models/demo_model.py index 7b0190cc..921573c5 100644 --- a/kgforge/specializations/models/demo_model.py +++ b/kgforge/specializations/models/demo_model.py @@ -99,6 +99,8 @@ def _validate_one(self, resource: Resource, type_: str, inference: str) -> None: if reason is not None: raise ValidationError(reason) + _validate_many = None + # Utils. @staticmethod @@ -124,11 +126,6 @@ def _generate_context(self) -> Dict: def schema_id(self, type: str) -> str: raise not_supported() - def _validate_many( - self, resources: List[Resource], type_: str, inference: str - ) -> None: - raise not_supported() - class ModelLibrary: """Simulate a third-party library handling interactions with the data used by the model.""" diff --git a/kgforge/specializations/models/rdf/service.py b/kgforge/specializations/models/rdf/service.py index e7353011..0bf660d4 100644 --- a/kgforge/specializations/models/rdf/service.py +++ b/kgforge/specializations/models/rdf/service.py @@ -63,53 +63,52 @@ ALL_COLLECTORS_MAP = {c.constraint(): c for c in ALL_COLLECTORS} -def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]: - """traverses the Shape SACL properties to collect constrained properties - - This function is injected to pyshacl Shape object in order to traverse the Shacl graph. - It will call a specific collector depending on the SHACL property present in the NodeShape - - Args: - predecessors: list of nodes that have being traversed, used to break circular - recursion - - Returns: - properties, attributes: Tuple(list,dict), the collected properties and attributes - respectively gathered from the collectors - """ - - parameters = self.parameters() - properties = [] - attributes = {} - done_collectors = set() - for param in iter(parameters): - if param in ALL_COLLECTORS_MAP: - constraint_collector = ALL_COLLECTORS_MAP[param] - if constraint_collector not in done_collectors: - c = constraint_collector(self) - predecessors.add(self.node) - props, attrs = c.collect(predecessors) - if attrs: - attributes.update(attrs) - if props: - properties.extend(props) - done_collectors.add(constraint_collector) - if predecessors: - predecessors.remove(self.node) - else: - # FIXME: there are some SHACL constrains that are not implemented - # raise IndexError(f"{param} not implemented!") - pass - - return properties, attributes - - class ShapeWrapper(Shape): __slots__ = ("__dict__",) def __init__(self, shape: Shape) -> None: super().__init__(shape.sg, shape.node, shape._p, shape._path, shape.logger) + def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]: + """traverses the Shape SHACL properties to collect constrained properties + + This function is injected to pyshacl Shape object in order to traverse the Shacl graph. + It will call a specific collector depending on the SHACL property present in the NodeShape + + Args: + predecessors: list of nodes that have being traversed, used to break circular + recursion + + Returns: + properties, attributes: Tuple(list,dict), the collected properties and attributes + respectively gathered from the collectors + """ + + parameters = self.parameters() + properties = [] + attributes = {} + done_collectors = set() + for param in iter(parameters): + if param in ALL_COLLECTORS_MAP: + constraint_collector = ALL_COLLECTORS_MAP[param] + if constraint_collector not in done_collectors: + c = constraint_collector(self) + predecessors.add(self.node) + props, attrs = c.collect(predecessors) + if attrs: + attributes.update(attrs) + if props: + properties.extend(props) + done_collectors.add(constraint_collector) + if predecessors: + predecessors.remove(self.node) + else: + # FIXME: there are some SHACL constrains that are not implemented + # raise IndexError(f"{param} not implemented!") + pass + + return properties, attributes + def parameters(self): return ( p @@ -125,7 +124,7 @@ def __init__(self, graph: RDFDataset) -> None: # the following line triggers the shape loading self._shapes = self.shapes - def lookup_shape_from_node(self, node: URIRef) -> Shape: + def lookup_shape_from_node(self, node: URIRef) -> Optional[ShapeWrapper]: """Overwrite function to inject the transverse function for only to requested nodes. Args: @@ -138,12 +137,19 @@ def lookup_shape_from_node(self, node: URIRef) -> Shape: shape = self._node_shape_cache[node] except KeyError as ke: raise ValueError(f"Unknown shape node id '{node}': {str(ke)}") from ke - if shape: - shape_wrapper = ShapeWrapper(self._node_shape_cache[node]) - if not hasattr(shape_wrapper, "traverse"): - shape_wrapper.traverse = types.MethodType(traverse, shape_wrapper) - return shape_wrapper - return shape + + return ShapeWrapper(shape) + + @property + def shapes(self): # pyshacl implementation returns dict_values (not list). This cannot be pickled. + """ + + :returns: [Shape] + :rtype: list(pyshacl.shape.Shape) + """ + if len(self._node_shape_cache) < 1: + self._build_node_shape_cache() + return list(self._node_shape_cache.values()) class RdfService: @@ -193,7 +199,8 @@ def materialize(self, iri: URIRef) -> NodeProperties: """ raise NotImplementedError() - def validate(self, resource: Resource, type_: str, inference: str): + @staticmethod + def type_to_validate_against(resource: Resource, type_: str) -> str: try: if not resource.get_type() and not type_: raise ValueError( @@ -209,12 +216,25 @@ def validate(self, resource: Resource, type_: str, inference: str): raise TypeError( f"A single type should be provided for validation: {str(exc)}" ) from exc - shape_iri = self.get_shape_uriref_from_class_fragment(type_to_validate) - data_graph = as_graph(resource, False, self.context, None, None) - shape, shacl_graph, ont_graph = self.get_shape_graph(shape_iri) - conforms, report_graph, report_text = self._validate( - shape_iri, data_graph, shape, shacl_graph, inference, ont_graph + + return type_to_validate + + @staticmethod + def validate( + resource: Resource, shape: ShapeWrapper, shacl_graph: Graph, ont_graph: Graph, type_to_validate: str, inference: str, context: Context + ) -> Tuple[bool, Graph, str]: + + data_graph = as_graph(resource, False, context, None, None) + + inplace = inference and inference != "none" + conforms, report_graph, report_text = validate( + data_graph=data_graph, + shacl_graph=shacl_graph, + ont_graph=ont_graph, + inference=inference, + inplace=inplace, ) + # when no schema target was found in the data (i.e no data was selected for validation) # conforms is currently set to True by pyShacl. Here it is set to False so that # the validation fails when type_to_validate is not present in the data @@ -245,24 +265,6 @@ def validate(self, resource: Resource, type_: str, inference: str): ) return conforms, report_graph, report_text - def _validate( - self, - iri: str, - data_graph: Graph, - shape: Shape, - shacl_graph: Graph, - inference: str, - ont_graph: Graph = None, - ) -> Tuple[bool, Graph, str]: - inplace = inference and inference != "none" - return validate( - data_graph=data_graph, - shacl_graph=shacl_graph, - ont_graph=ont_graph, - inference=inference, - inplace=inplace, - ) - @abstractmethod def resolve_context(self, iri: str) -> Dict: """For a given IRI return its resolved context recursively""" @@ -408,7 +410,7 @@ def _process_imported_resource( imported_resource_uriref, resource_to_named_graph_uriref, collect_imported_ontology: bool, - ): + ) -> Tuple[Graph, Graph]: imported_graph_id = resource_to_named_graph_uriref[imported_resource_uriref] if imported_resource_uriref not in self._imported: imported_resource_graph, imported_ont_graph = ( @@ -618,25 +620,28 @@ def _import_shape(self, node_shape_uriref: URIRef): f"Failed to import the shape '{node_shape_uriref}': {str(e)}" ) from e - def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[Shape, Graph, Graph]: + def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[ShapeWrapper, Graph, Graph]: if node_shape_uriref not in self.shape_to_defining_resource: raise ValueError(f"Unknown shape '{node_shape_uriref}'") + + if self.shape_to_defining_resource[node_shape_uriref] not in self._imported: + return self._import_shape(node_shape_uriref) + try: shape = self.get_shape_graph_wrapper().lookup_shape_from_node( node_shape_uriref ) - except ValueError: - return self._import_shape(node_shape_uriref) - if self.shape_to_defining_resource[node_shape_uriref] in self._imported: - defining_resource = self.shape_to_defining_resource[node_shape_uriref] shape_graph = self._dataset_graph.graph( self._get_named_graph_from_shape(node_shape_uriref) ) - ont_graph = self._build_imported_ontology_graph(defining_resource) - else: - return self._import_shape(node_shape_uriref) + ont_graph = self._build_imported_ontology_graph( + self.shape_to_defining_resource[node_shape_uriref] + ) + + return shape, shape_graph, ont_graph - return shape, shape_graph, ont_graph + except ValueError: + return self._import_shape(node_shape_uriref) def _build_imported_ontology_graph(self, resurce_uriref): ont_graph = Graph() diff --git a/kgforge/specializations/models/rdf/store_service.py b/kgforge/specializations/models/rdf/store_service.py index a9900f50..cbb37b9d 100644 --- a/kgforge/specializations/models/rdf/store_service.py +++ b/kgforge/specializations/models/rdf/store_service.py @@ -70,7 +70,7 @@ def resolve_context(self, iri: str) -> Dict: def generate_context(self) -> Dict: for shape_uriref, schema_uriref in self.shape_to_defining_resource.items(): if schema_uriref not in self._imported: - self._transitive_load_shape_graph( + self._transitive_load_resource_graph( self._get_named_graph_from_shape(shape_uriref), schema_uriref ) # reloads the shapes graph diff --git a/kgforge/specializations/models/rdf_model.py b/kgforge/specializations/models/rdf_model.py index 5cd8388a..4ad3f639 100644 --- a/kgforge/specializations/models/rdf_model.py +++ b/kgforge/specializations/models/rdf_model.py @@ -13,11 +13,12 @@ # along with Blue Brain Nexus Forge. If not, see . import datetime import re +from multiprocessing import Pool from pathlib import Path -from typing import Dict, List, Callable, Optional, Any, Union +from typing import Dict, List, Callable, Optional, Any, Union, Tuple from pyshacl.consts import SH -from rdflib import URIRef, Literal +from rdflib import URIRef, Literal, Graph from rdflib.namespace import XSD from kgforge.core.archetypes.mapping import Mapping @@ -30,7 +31,7 @@ from kgforge.core.commons.execution import run, not_supported from kgforge.specializations.models.rdf.collectors import NodeProperties from kgforge.specializations.models.rdf.directory_service import DirectoryService -from kgforge.specializations.models.rdf.service import RdfService +from kgforge.specializations.models.rdf.service import RdfService, ShapeWrapper from kgforge.specializations.models.rdf.store_service import StoreService from kgforge.specializations.models.rdf.utils import as_term @@ -63,6 +64,8 @@ DEFAULT_TYPE_ORDER = [str, float, int, bool, datetime.date, datetime.time] +VALIDATION_PARALLELISM = None + class RdfModel(Model): """Specialization of Model that follows SHACL shapes""" @@ -127,33 +130,72 @@ def validate( inference=inference, ) - def _validate_many( - self, resources: List[Resource], type_: str, inference: str - ) -> None: - for resource in resources: - conforms, graph, _ = self.service.validate( - resource, type_=type_, inference=inference - ) - if conforms: - resource._validated = True - action = Action(self._validate_many.__name__, conforms, None) + @staticmethod + def _validate( + resource: Resource, shape: ShapeWrapper, shacl_graph: Graph, ont_graph: Graph, type_to_validate: str, + inference: str, short_message: bool, raise_: bool, context: Context + ) -> Resource: + + conforms, graph, report = RdfService.validate(resource, shape, shacl_graph, ont_graph, type_to_validate, inference, context=context) + + if not conforms: + if not short_message: + message = report else: - resource._validated = False violations = set( " ".join(re.findall("[A-Z][^A-Z]*", as_term(o))) for o in graph.objects(None, SH.sourceConstraintComponent) ) message = f"violation(s) of type(s) {', '.join(sorted(violations))}" - action = Action( - self._validate_many.__name__, conforms, ValidationError(message) - ) - resource._last_action = action + err = ValidationError(message) + else: + err = None + + resource._validated = conforms + resource._last_action = Action(operation="_validate_many", succeeded=conforms, error=err) + + if not conforms and raise_: + raise err + + return resource + + def _prepare_shapes(self, r: Resource, type_: str) -> Tuple[str, ShapeWrapper, Graph, Graph, Resource]: + type_to_validate = RdfService.type_to_validate_against(r, type_) + shape, shacl_graph, ont_graph = self.service.get_shape_graph( + node_shape_uriref=self.service.get_shape_uriref_from_class_fragment(type_to_validate) + ) + return type_to_validate, shape, shacl_graph, ont_graph, r + + @staticmethod + def fc_call(type_to_validate, shape, shacl_graph, ont_graph, r, inference, context): + + return RdfModel._validate( + resource=r, type_to_validate=type_to_validate, inference=inference, shape=shape, shacl_graph=shacl_graph, + ont_graph=ont_graph, short_message=True, raise_=False, context=context + ) + + def _validate_many(self, resources: List[Resource], type_: str, inference: str) -> None: + + def validate_iterator(): + for r in resources: + type_to_validate, shape, shacl_graph, ont_graph, r = self._prepare_shapes(r, type_) + yield type_to_validate, shape, shacl_graph, ont_graph, r, inference, self.service.context + + resources_2 = Pool(processes=VALIDATION_PARALLELISM).starmap(RdfModel.fc_call, validate_iterator()) + + for r_1, r_2 in zip(resources, resources_2): + r_1._validated = r_2._validated + r_1._last_action = r_2._last_action def _validate_one(self, resource: Resource, type_: str, inference: str) -> None: - conforms, _, report = self.service.validate(resource, type_, inference) - if conforms is False: - raise ValidationError("\n" + report) + + type_to_validate, shape, shacl_graph, ont_graph, r = self._prepare_shapes(resource, type_) + + RdfModel._validate( + resource=r, type_to_validate=type_to_validate, inference=inference, shape=shape, shacl_graph=shacl_graph, + ont_graph=ont_graph, short_message=False, raise_=True, context=self.service.context + ) # Utils.