"""
Utilities for interacting with SHACL Shapes Graphs more easily.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Union
from rdflib import BNode, Graph, Literal, URIRef, paths
from rdflib.collection import Collection
from rdflib.namespace import RDF, SH
from rdflib.paths import Path
from rdflib.term import Node
if TYPE_CHECKING:
from rdflib.term import IdentifiedNode
[docs]
class SHACLPathError(Exception):
pass
# Map the variable length path operators to the corresponding SHACL path predicates
_PATH_MOD_TO_PRED = {
paths.ZeroOrMore: SH.zeroOrMorePath,
paths.OneOrMore: SH.oneOrMorePath,
paths.ZeroOrOne: SH.zeroOrOnePath,
}
# This implementation is roughly based on
# pyshacl.helper.sparql_query_helper::SPARQLQueryHelper._shacl_path_to_sparql_path
[docs]
def parse_shacl_path(
shapes_graph: Graph,
path_identifier: Node,
) -> Union[URIRef, Path]:
"""
Parse a valid SHACL path (e.g. the object of a triple with predicate sh:path)
from a :class:`~rdflib.graph.Graph` as a :class:`~rdflib.term.URIRef` if the path
is simply a predicate or a :class:`~rdflib.paths.Path` otherwise.
:param shapes_graph: A :class:`~rdflib.graph.Graph` containing the path to be parsed
:param path_identifier: A :class:`~rdflib.term.Node` of the path
:return: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
"""
path: Optional[Union[URIRef, Path]] = None
# Literals are not allowed.
if isinstance(path_identifier, Literal):
raise TypeError("Literals are not a valid SHACL path.")
# If a path is a URI, that's the whole path.
elif isinstance(path_identifier, URIRef):
if path_identifier == RDF.nil:
raise SHACLPathError(
"A list of SHACL Paths must contain at least two path items."
)
path = path_identifier
# Handle Sequence Paths
elif shapes_graph.value(path_identifier, RDF.first) is not None:
sequence = list(shapes_graph.items(path_identifier))
if len(sequence) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
path = paths.SequencePath(
*(parse_shacl_path(shapes_graph, path) for path in sequence)
)
# Handle sh:inversePath
elif inverse_path := shapes_graph.value(path_identifier, SH.inversePath):
path = paths.InvPath(parse_shacl_path(shapes_graph, inverse_path))
# Handle sh:alternativePath
elif alternative_path := shapes_graph.value(path_identifier, SH.alternativePath):
alternatives = list(shapes_graph.items(alternative_path))
if len(alternatives) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
path = paths.AlternativePath(
*(
parse_shacl_path(shapes_graph, alternative)
for alternative in alternatives
)
)
# Handle sh:zeroOrMorePath
elif zero_or_more_path := shapes_graph.value(path_identifier, SH.zeroOrMorePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, zero_or_more_path), "*")
# Handle sh:oneOrMorePath
elif one_or_more_path := shapes_graph.value(path_identifier, SH.oneOrMorePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, one_or_more_path), "+")
# Handle sh:zeroOrOnePath
elif zero_or_one_path := shapes_graph.value(path_identifier, SH.zeroOrOnePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, zero_or_one_path), "?")
# Raise error if none of the above options were found
elif path is None:
raise SHACLPathError(f"Cannot parse {repr(path_identifier)} as a SHACL Path.")
return path
def _build_path_component(
graph: Graph, path_component: URIRef | Path
) -> IdentifiedNode:
"""
Helper method that implements the recursive component of SHACL path
triple construction.
:param graph: A :class:`~rdflib.graph.Graph` into which to insert triples
:param graph_component: A :class:`~rdflib.term.URIRef` or
:class:`~rdflib.paths.Path` that is part of a path expression
:return: The :class:`~rdflib.term.IdentifiedNode of the resource in the
graph that corresponds to the provided path_component
"""
# Literals or other types are not allowed
if not isinstance(path_component, (URIRef, Path)):
raise TypeError(
f"Objects of type {type(path_component)} are not valid "
+ "components of a SHACL path."
)
# If the path component is a URI, return it
elif isinstance(path_component, URIRef):
return path_component
# Otherwise, the path component is represented as a blank node
bnode = BNode()
# Handle Sequence Paths
if isinstance(path_component, paths.SequencePath):
# Sequence paths are a Collection directly with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
Collection(
graph,
bnode,
[_build_path_component(graph, arg) for arg in path_component.args],
)
# Handle Inverse Paths
elif isinstance(path_component, paths.InvPath):
graph.add(
(bnode, SH.inversePath, _build_path_component(graph, path_component.arg))
)
# Handle Alternative Paths
elif isinstance(path_component, paths.AlternativePath):
# Alternative paths are a Collection but referenced by sh:alternativePath
# with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
coll = Collection(
graph,
BNode(),
[_build_path_component(graph, arg) for arg in path_component.args],
)
graph.add((bnode, SH.alternativePath, coll.uri))
# Handle Variable Length Paths
elif isinstance(path_component, paths.MulPath):
# Get the predicate corresponding to the path modifiier
pred = _PATH_MOD_TO_PRED.get(path_component.mod)
if pred is None:
raise SHACLPathError(f"Unknown path modifier {path_component.mod}")
graph.add((bnode, pred, _build_path_component(graph, path_component.path)))
# Return the blank node created for the provided path_component
return bnode
[docs]
def build_shacl_path(
path: URIRef | Path, target_graph: Graph | None = None
) -> tuple[IdentifiedNode, Graph | None]:
"""
Build the SHACL Path triples for a path given by a :class:`~rdflib.term.URIRef` for
simple paths or a :class:`~rdflib.paths.Path` for complex paths.
Returns an :class:`~rdflib.term.IdentifiedNode` for the path (which should be
the object of a triple with predicate sh:path) and the graph into which any
new triples were added.
:param path: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
:param target_graph: Optionally, a :class:`~rdflib.graph.Graph` into which to put
constructed triples. If not provided, a new graph will be created
:return: A (path_identifier, graph) tuple where:
- path_identifier: If path is a :class:`~rdflib.term.URIRef`, this is simply
the provided path. If path is a :class:`~rdflib.paths.Path`, this is
the :class:`~rdflib.term.BNode` corresponding to the root of the SHACL
path expression added to the graph.
- graph: None if path is a :class:`~rdflib.term.URIRef` (as no new triples
are constructed). If path is a :class:`~rdflib.paths.Path`, this is either the
target_graph provided or a new graph into which the path triples were added.
"""
# If a path is a URI, that's the whole path. No graph needs to be constructed.
if isinstance(path, URIRef):
return path, None
# Create a graph if one was not provided
if target_graph is None:
target_graph = Graph()
# Recurse through the path to build the graph representation
return _build_path_component(target_graph, path), target_graph