Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize validate #418

Merged
merged 10 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kgforge/core/archetypes/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def validate(
# Replace None by self._validate_many to switch to optimized bulk validation.
run(
self._validate_one,
None,
self._validate_many,
data,
execute_actions=execute_actions_before,
exception=ValidationError,
Expand Down
7 changes: 2 additions & 5 deletions kgforge/specializations/models/demo_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def _validate_one(self, resource: Resource, type_: str, inference: str) -> None:
if reason is not None:
raise ValidationError(reason)

_validate_many = None

# Utils.

@staticmethod
Expand All @@ -124,11 +126,6 @@ def _generate_context(self) -> Dict:
def schema_id(self, type: str) -> str:
raise not_supported()

def _validate_many(
self, resources: List[Resource], type_: str, inference: str
) -> None:
raise not_supported()


class ModelLibrary:
"""Simulate a third-party library handling interactions with the data used by the model."""
Expand Down
169 changes: 87 additions & 82 deletions kgforge/specializations/models/rdf/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,53 +63,52 @@
ALL_COLLECTORS_MAP = {c.constraint(): c for c in ALL_COLLECTORS}


def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]:
"""traverses the Shape SACL properties to collect constrained properties

This function is injected to pyshacl Shape object in order to traverse the Shacl graph.
It will call a specific collector depending on the SHACL property present in the NodeShape

Args:
predecessors: list of nodes that have being traversed, used to break circular
recursion

Returns:
properties, attributes: Tuple(list,dict), the collected properties and attributes
respectively gathered from the collectors
"""

parameters = self.parameters()
properties = []
attributes = {}
done_collectors = set()
for param in iter(parameters):
if param in ALL_COLLECTORS_MAP:
constraint_collector = ALL_COLLECTORS_MAP[param]
if constraint_collector not in done_collectors:
c = constraint_collector(self)
predecessors.add(self.node)
props, attrs = c.collect(predecessors)
if attrs:
attributes.update(attrs)
if props:
properties.extend(props)
done_collectors.add(constraint_collector)
if predecessors:
predecessors.remove(self.node)
else:
# FIXME: there are some SHACL constrains that are not implemented
# raise IndexError(f"{param} not implemented!")
pass

return properties, attributes


class ShapeWrapper(Shape):
__slots__ = ("__dict__",)

def __init__(self, shape: Shape) -> None:
super().__init__(shape.sg, shape.node, shape._p, shape._path, shape.logger)

def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]:
"""traverses the Shape SHACL properties to collect constrained properties

This function is injected to pyshacl Shape object in order to traverse the Shacl graph.
It will call a specific collector depending on the SHACL property present in the NodeShape

Args:
predecessors: list of nodes that have being traversed, used to break circular
recursion

Returns:
properties, attributes: Tuple(list,dict), the collected properties and attributes
respectively gathered from the collectors
"""

parameters = self.parameters()
properties = []
attributes = {}
done_collectors = set()
for param in iter(parameters):
if param in ALL_COLLECTORS_MAP:
constraint_collector = ALL_COLLECTORS_MAP[param]
if constraint_collector not in done_collectors:
c = constraint_collector(self)
predecessors.add(self.node)
props, attrs = c.collect(predecessors)
if attrs:
attributes.update(attrs)
if props:
properties.extend(props)
done_collectors.add(constraint_collector)
if predecessors:
predecessors.remove(self.node)
else:
# FIXME: there are some SHACL constrains that are not implemented
# raise IndexError(f"{param} not implemented!")
pass

return properties, attributes

def parameters(self):
return (
p
Expand All @@ -125,7 +124,7 @@ def __init__(self, graph: RDFDataset) -> None:
# the following line triggers the shape loading
self._shapes = self.shapes

def lookup_shape_from_node(self, node: URIRef) -> Shape:
def lookup_shape_from_node(self, node: URIRef) -> Optional[ShapeWrapper]:
"""Overwrite function to inject the transverse function for only to requested nodes.

Args:
Expand All @@ -138,12 +137,19 @@ def lookup_shape_from_node(self, node: URIRef) -> Shape:
shape = self._node_shape_cache[node]
except KeyError as ke:
raise ValueError(f"Unknown shape node id '{node}': {str(ke)}") from ke
if shape:
shape_wrapper = ShapeWrapper(self._node_shape_cache[node])
if not hasattr(shape_wrapper, "traverse"):
shape_wrapper.traverse = types.MethodType(traverse, shape_wrapper)
return shape_wrapper
return shape

return ShapeWrapper(shape)

@property
def shapes(self): # pyshacl implementation returns dict_values (not list). This cannot be pickled.
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
"""

:returns: [Shape]
:rtype: list(pyshacl.shape.Shape)
"""
if len(self._node_shape_cache) < 1:
self._build_node_shape_cache()
return list(self._node_shape_cache.values())


class RdfService:
Expand Down Expand Up @@ -193,7 +199,8 @@ def materialize(self, iri: URIRef) -> NodeProperties:
"""
raise NotImplementedError()

def validate(self, resource: Resource, type_: str, inference: str):
@staticmethod
def type_to_validate_against(resource: Resource, type_: str) -> str:
try:
if not resource.get_type() and not type_:
raise ValueError(
Expand All @@ -209,12 +216,25 @@ def validate(self, resource: Resource, type_: str, inference: str):
raise TypeError(
f"A single type should be provided for validation: {str(exc)}"
) from exc
shape_iri = self.get_shape_uriref_from_class_fragment(type_to_validate)
data_graph = as_graph(resource, False, self.context, None, None)
shape, shacl_graph, ont_graph = self.get_shape_graph(shape_iri)
conforms, report_graph, report_text = self._validate(
shape_iri, data_graph, shape, shacl_graph, inference, ont_graph

return type_to_validate

@staticmethod
def validate(
resource: Resource, shape: ShapeWrapper, shacl_graph: Graph, ont_graph: Graph, type_to_validate: str, inference: str, context: Context
) -> Tuple[bool, Graph, str]:

data_graph = as_graph(resource, False, context, None, None)

inplace = inference and inference != "none"
conforms, report_graph, report_text = validate(
data_graph=data_graph,
shacl_graph=shacl_graph,
ont_graph=ont_graph,
inference=inference,
inplace=inplace,
)

# when no schema target was found in the data (i.e no data was selected for validation)
# conforms is currently set to True by pyShacl. Here it is set to False so that
# the validation fails when type_to_validate is not present in the data
Expand Down Expand Up @@ -245,24 +265,6 @@ def validate(self, resource: Resource, type_: str, inference: str):
)
return conforms, report_graph, report_text

def _validate(
self,
iri: str,
data_graph: Graph,
shape: Shape,
shacl_graph: Graph,
inference: str,
ont_graph: Graph = None,
) -> Tuple[bool, Graph, str]:
inplace = inference and inference != "none"
return validate(
data_graph=data_graph,
shacl_graph=shacl_graph,
ont_graph=ont_graph,
inference=inference,
inplace=inplace,
)

@abstractmethod
def resolve_context(self, iri: str) -> Dict:
"""For a given IRI return its resolved context recursively"""
Expand Down Expand Up @@ -408,7 +410,7 @@ def _process_imported_resource(
imported_resource_uriref,
resource_to_named_graph_uriref,
collect_imported_ontology: bool,
):
) -> Tuple[Graph, Graph]:
imported_graph_id = resource_to_named_graph_uriref[imported_resource_uriref]
if imported_resource_uriref not in self._imported:
imported_resource_graph, imported_ont_graph = (
Expand Down Expand Up @@ -618,25 +620,28 @@ def _import_shape(self, node_shape_uriref: URIRef):
f"Failed to import the shape '{node_shape_uriref}': {str(e)}"
) from e

def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[Shape, Graph, Graph]:
def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[ShapeWrapper, Graph, Graph]:
if node_shape_uriref not in self.shape_to_defining_resource:
raise ValueError(f"Unknown shape '{node_shape_uriref}'")

if self.shape_to_defining_resource[node_shape_uriref] not in self._imported:
return self._import_shape(node_shape_uriref)

try:
shape = self.get_shape_graph_wrapper().lookup_shape_from_node(
node_shape_uriref
)
except ValueError:
return self._import_shape(node_shape_uriref)
if self.shape_to_defining_resource[node_shape_uriref] in self._imported:
defining_resource = self.shape_to_defining_resource[node_shape_uriref]
shape_graph = self._dataset_graph.graph(
self._get_named_graph_from_shape(node_shape_uriref)
)
ont_graph = self._build_imported_ontology_graph(defining_resource)
else:
return self._import_shape(node_shape_uriref)
ont_graph = self._build_imported_ontology_graph(
self.shape_to_defining_resource[node_shape_uriref]
)

return shape, shape_graph, ont_graph

return shape, shape_graph, ont_graph
except ValueError:
return self._import_shape(node_shape_uriref)

def _build_imported_ontology_graph(self, resurce_uriref):
ont_graph = Graph()
Expand Down
2 changes: 1 addition & 1 deletion kgforge/specializations/models/rdf/store_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def resolve_context(self, iri: str) -> Dict:
def generate_context(self) -> Dict:
for shape_uriref, schema_uriref in self.shape_to_defining_resource.items():
if schema_uriref not in self._imported:
self._transitive_load_shape_graph(
self._transitive_load_resource_graph(
self._get_named_graph_from_shape(shape_uriref), schema_uriref
)
# reloads the shapes graph
Expand Down
Loading
Loading