#! /usr/bin/env python
from collections import Counter
from random import random
from multiprocessing import Manager
from typing import Literal, Optional, Union
import numpy as np
from hypodisc.data.utils import write_query
from rdf.formats import NTriples
from rdf.namespaces import RDF, RDFS
from rdf.terms import IRIRef
from rdf.terms import Literal as rdfLiteral
from hypodisc.core.utils import predict_hash
from hypodisc.data.graph import KnowledgeGraph
from hypodisc.core.structures import (Assertion, GraphPattern,
ResourceWrapper,
DataTypeVariable,
Variable,
MultiModalNumericVariable,
MultiModalStringVariable,
ObjectTypeVariable)
from hypodisc.multimodal.clustering import (compute_clusters,
SUPPORTED_XSD_TYPES)
from hypodisc.multimodal.datatypes import (XSD_DATEFRAG, XSD_DATETIME,
XSD_NUMERIC, XSD_STRING)
XSD_TEMPORAL = set.union(XSD_DATETIME, XSD_DATEFRAG)
IGNORE_PREDICATES = {RDF + 'type', RDFS + 'label'}
[docs]def generate(root_patterns: dict[str, list],
depths: range, min_support: int,
p_explore: float, p_extend: float,
max_length: int, max_width: int,
out_writer: Optional[NTriples],
out_prefix_map: Optional[dict[str, str]],
out_ns: Optional[IRIRef],
strategy: Literal["BFS", "DFS"]) -> int:
""" Generate all patterns up to and including a maximum depth which
satisfy a minimal support.
:param depths:
:type depths: range
:param min_support:
:type min_support: int
:param p_explore:
:type p_explore: float
:param p_extend:
:type p_extend: float
:param max_length:
:type max_length: int
:param max_width:
:type max_width: int
"""
if strategy == "BFS":
return generate_bf(root_patterns=root_patterns,
depths=depths,
min_support=min_support,
p_explore=p_explore,
p_extend=p_extend,
max_length=max_length,
max_width=max_width,
out_writer=out_writer,
out_prefix_map=out_prefix_map,
out_ns=out_ns)
else: # DFS
return generate_df(root_patterns=root_patterns,
depths=depths,
min_support=min_support,
p_explore=p_explore,
p_extend=p_extend,
max_length=max_length,
max_width=max_width,
out_writer=out_writer,
out_prefix_map=out_prefix_map,
out_ns=out_ns)
[docs]def generate_df(root_patterns: dict[str, list],
depths: range, min_support: int,
p_explore: float, p_extend: float,
max_length: int, max_width: int,
out_writer: Optional[NTriples],
out_prefix_map: Optional[dict[str, str]],
out_ns: Optional[IRIRef]) -> int:
""" Generate all patterns up to and including a maximum depth which
satisfy a minimal support, using a depth first approach. This
approach uses less memory but does not have the anytime property.
:param depths:
:type depths: range
:param min_support:
:type min_support: int
:param p_explore:
:type p_explore: float
:param p_extend:
:type p_extend: float
:param max_length:
:type max_length: int
:param max_width:
:type max_width: int
"""
patterns = set()
num_patterns = 0
for name in sorted(list(root_patterns.keys())):
print(f"type {name}")
visited = set()
for depth in range(0, depths.stop):
print(" exploring depth {} / {}".format(depth+1, depths.stop),
end=" ")
if depth == 0:
patterns = root_patterns[name]
derivatives = set()
for pattern in patterns:
if len(pattern) >= max_length or pattern.width() >= max_width:
continue
# Gather candidate endpoints at this depth.
# Only consider unbound object type variables since we
# cannot connect to literals or data type variables, and
# bound entities already have a fixed context.
if depth <= 0:
endpoints = {pattern.root}
# add these as parents for next depth
if isinstance(pattern.assertion.rhs, ObjectTypeVariable):
derivatives.add(pattern)
else:
endpoints = {a.rhs for a in pattern.distances[depth-1]
if isinstance(a.rhs, ObjectTypeVariable)}
candidates = set()
for endpoint in endpoints:
if endpoint.value not in root_patterns.keys():
# no extension available
continue
if p_explore < random():
# skip this endpoint with probability p_explore
continue
# Gather all candidate extensions that can connect
# to an object type variable of the relevant type.
for base_pattern in root_patterns[endpoint.value]:
if p_extend < random():
# skip this extension with probability p_extend
continue
# get base assertion
extension = base_pattern.assertion
# prune
if pattern.contains_at_depth(extension, depth):
continue
pattern_hash = predict_hash(pattern,
endpoint,
extension)
if pattern_hash in visited:
continue
visited.add(pattern_hash)
candidates.add((endpoint, extension))
extensions = explore(pattern,
candidates=candidates,
max_length=max_length,
max_width=max_width,
min_support=min_support)
if out_writer is not None and out_prefix_map is not None:
for pattern in extensions:
num_patterns = write_query(out_writer, pattern,
num_patterns, out_ns,
out_prefix_map)
derivatives |= extensions
print("(+{} discovered)".format(len(derivatives)))
# omit exhausted classes from next iteration
if len(derivatives) > 0:
patterns = {v for v in derivatives}
else:
break
return num_patterns
[docs]def generate_bf(root_patterns: dict[str, list],
depths: range, min_support: int,
p_explore: float, p_extend: float,
max_length: int, max_width: int,
out_writer: Optional[NTriples],
out_prefix_map: Optional[dict[str, str]],
out_ns: Optional[IRIRef]) -> int:
""" Generate all patterns up to and including a maximum depth which
satisfy a minimal support, using a breadth first approach. This
approach has the anytime property yet uses more memory.
:param depths:
:type depths: range
:param min_support:
:type min_support: int
:param p_explore:
:type p_explore: float
:param p_extend:
:type p_extend: float
:param max_length:
:type max_length: int
:param max_width:
:type max_width: int
"""
parents = dict()
num_patterns = 0
for depth in range(0, depths.stop):
print("exploring depth {} / {}".format(depth+1, depths.stop))
if depth == 0:
parents = root_patterns
visited = set()
derivatives = dict()
for name in sorted(list(parents.keys())):
print(f" type {name}", end=" ")
patterns = parents[name]
if depth > 0:
patterns = parents.pop(name)
derivatives[name] = set()
for pattern in patterns:
if len(pattern) >= max_length or pattern.width() >= max_width:
continue
# Gather candidate endpoints at this depth.
# Only consider unbound object type variables since we
# cannot connect to literals or data type variables, and
# bound entities already have a fixed context.
if depth <= 0:
endpoints = {pattern.root}
# add these as parents for next depth
if isinstance(pattern.assertion.rhs, ObjectTypeVariable):
derivatives[name].add(pattern)
else:
endpoints = {a.rhs for a in pattern.distances[depth-1]
if isinstance(a.rhs, ObjectTypeVariable)}
candidates = set()
for endpoint in endpoints:
if endpoint.value not in root_patterns.keys():
# no extension available
continue
if p_explore < random():
# skip this endpoint with probability p_explore
continue
# Gather all candidate extensions that can connect
# to an object type variable of the relevant type.
for base_pattern in root_patterns[endpoint.value]:
if p_extend < random():
# skip this extension with probability p_extend
continue
# get base assertion
extension = base_pattern.assertion
# prune
if pattern.contains_at_depth(extension, depth):
continue
pattern_hash = predict_hash(pattern,
endpoint,
extension)
if pattern_hash in visited:
continue
visited.add(pattern_hash)
candidates.add((endpoint, extension))
extensions = explore(pattern,
candidates=candidates,
max_length=max_length,
max_width=max_width,
min_support=min_support)
if out_writer is not None and out_prefix_map is not None:
for pattern in extensions:
num_patterns = write_query(out_writer, pattern,
num_patterns, out_ns,
out_prefix_map)
derivatives[name] |= extensions
print("(+{} discovered)".format(len(derivatives[name])))
# omit exhausted classes from next iteration
parents = {k: v for k, v in derivatives.items()
if len(v) > 0}
return num_patterns
[docs]def explore(parent: GraphPattern, candidates: set,
max_length: int, max_width: int, min_support: int) -> set:
""" Explore all predicate-object pairs which where added by the previous
iteration as possible endpoints to expand from.
:param pattern:
:type pattern: GraphPattern
:param candidates:
:type candidates: set
:param max_length:
:type max_length: int
:param max_width:
:type max_width: int
:param min_support:
:type min_support: int
:rtype: set
"""
derivatives = set() # valid patterns
skipped = set() # invalid patterns
with Manager() as manager:
qexplore = manager.Queue()
qexplore.put(parent)
while not qexplore.empty():
pattern = qexplore.get()
if len(pattern) >= max_length or pattern.width() >= max_width:
continue
for endpoint, extension in candidates:
pattern_hash = predict_hash(pattern, endpoint, extension)
if extension in pattern:
# extension is already part of pattern
continue
elif pattern_hash in derivatives:
# already seen and added this pattern
continue
elif pattern_hash in skipped:
# already seen but skipped this pattern
continue
pattern_new = extend(pattern, endpoint, extension)
# add as new if satisfies support and has a more constraint
# domain than its parent or when the extension allows for
# possible future extensions.
if pattern_new.support >= min_support\
and (isinstance(extension.rhs, ObjectTypeVariable)
or pattern_new.parent is None
or pattern_new.support <
pattern_new.parent.support):
qexplore.put(pattern_new) # explore further extensions
derivatives.add(pattern_new)
else:
skipped.add(pattern_hash)
return derivatives
[docs]def extend(pattern: GraphPattern, endpoint: Variable, extension: Assertion)\
-> GraphPattern:
""" Extend a graph_pattern from a given endpoint variable by evaluating all
possible candidate extensions on whether they satisfy the minimal support
and confidence.
:param pattern:
:type pattern: GraphPattern
:param a_i:
:type a_i: Assertion
:param a_j:
:type a_j: Assertion
:rtype: GraphPattern
"""
# create new graph_pattern by extending that of the parent
pattern_new = pattern.copy()
pattern_new.parent = pattern
pattern_new.extend(endpoint=endpoint, extension=extension)
# compute support
pattern_new.domain = pattern_new._update_domain()
pattern_new.support = len(pattern_new.domain)
return pattern_new
[docs]def init_root_patterns(rng: np.random.Generator, kg: KnowledgeGraph,
min_support: float, mode: Literal["A", "AT", "T"],
textual_support: bool, numerical_support: bool,
temporal_support: bool, exclude: list[str])\
-> dict[str, list]:
""" Creating all patterns of types which satisfy minimal support.
:param rng:
:type rng: np.random.Generator
:param kg:
:type kg: KnowledgeGraph
:param min_support:
:type min_support: float
:param mode:
:type mode: Literal["A", "AT", "T"]
:param multimodal:
:type multimodal: bool
:returns:
:rtype: dict[str,list]
"""
print("mapping root patterns")
root_patterns = dict()
# don't generate what we won't need
generate_Abox = False
generate_Tbox = False
if "A" in mode:
generate_Abox = True
if "T" in mode:
generate_Tbox = True
multimodal = textual_support | numerical_support | temporal_support
rdf_type = RDF + "type"
rdf_type_idx = kg.r2i[rdf_type]
A_type = kg.A[rdf_type_idx]
# list of predicate indices to ingore
exclude_idx = [kg.r2i[IRIRef(p)] for p in exclude]
class_idx_list = sorted(list(set(A_type.col)))
class_freq = A_type.sum(axis=0)[class_idx_list]
for class_i, class_idx in enumerate(class_idx_list):
# if the number of type instances do not exceed the minimal support
# then any pattern of this type will not either
support = class_freq[class_i]
if support < min_support:
continue
class_name = kg.i2n[class_idx]
root_patterns[class_name] = list()
print(f" type {class_name}...", end='')
# find all members of this class,
class_members_idx = A_type.row[A_type.col == class_idx]
root_var = ObjectTypeVariable(class_name)
for p_idx in range(kg.num_relations):
if p_idx == rdf_type_idx or p_idx in exclude_idx:
continue
# mask for class members that have this relation outgoing
p_mask = np.isin(kg.A[p_idx].row, class_members_idx)
if sum(p_mask) < min_support:
# if the number of entities of type t that have this predicate
# is less than the minimal support, then the overall pattern
# will have less as well
continue
p = kg.i2r[p_idx]
# list of global indices for class members with this property
# plus the matching tail nodes
s_idx_list = kg.A[p_idx].row[p_mask]
o_idx_list = kg.A[p_idx].col[p_mask]
# infer (data) type from single tail node (assume rest is same)
o_type, is_literal = infer_type(kg, rdf_type_idx, o_idx_list[-1])
# assume that all objects linked by the same relation are of
# the same type. This may not always be true but it is usually
# the case in well-engineered graphs. See this as an
# optimization by approximation.
pattern = None
o_idx = o_idx_list[0]
inv_assertion_map = {o_idx: {s_idx_list[i]
for i, idx in
enumerate(o_idx_list)
if idx == o_idx}
for o_idx in o_idx_list}
if generate_Tbox and o_idx in kg.ni2ai.keys():
# object is literal
o_type = kg.i2a[kg.ni2ai[o_idx]]
var_o = DataTypeVariable(o_type)
pattern = new_var_graph_pattern(root_var, var_o,
set(s_idx_list), p,
inv_assertion_map)
else:
# object is entity (or bnode or literal without type)
idx = np.where(A_type.row == o_idx)
o_type_idx = A_type.col[idx]
if len(o_type_idx) > 0:
o_type = kg.i2n[o_type_idx[0]]
var_o = ObjectTypeVariable(o_type)
pattern = new_var_graph_pattern(root_var, var_o,
set(s_idx_list), p,
inv_assertion_map)
if pattern is not None and pattern.support >= min_support:
root_patterns[class_name].append(pattern)
# create graph_patterns for all predicate-object pairs
# treat both entities and literals as node
if generate_Abox:
if multimodal and is_literal:
o_freqs = Counter([kg.i2n[i].value for i in o_idx_list])
o_value_idx_map = {v:
{i for i, o_idx in enumerate(o_idx_list)
if kg.i2n[o_idx].value == v}
for v in o_freqs.keys()}
else: # if IRI
o_freqs = Counter(o_idx_list)
o_value_idx_map = {kg.i2n[k]:
{i for i, o_idx in enumerate(o_idx_list)
if k == o_idx} for k in o_freqs.keys()}
for o_value, o_freq in o_freqs.items():
if o_freq < min_support:
continue
if not is_literal:
o_value = kg.i2n[o_value]
domain = {s_idx_list[i] for i in o_value_idx_map[o_value]}
inv_assertion_map = {o_idx_list[i]: domain
for i in o_value_idx_map[o_value]}
# create new graph_pattern
pattern = new_graph_pattern(root_var, p, o_value,
o_type, domain,
inv_assertion_map)
if pattern is not None and pattern.support >= min_support:
root_patterns[class_name].append(pattern)
if multimodal:
# assume that all objects linked by the same relation are of
# the same type. This may not always be true but it is usually
# the case in well-engineered graphs. See this as an
# optimization by approximation.
o_idx = o_idx_list[-1]
if o_idx in kg.ni2ai.keys():
# object is literal
o_type = kg.i2a[kg.ni2ai[o_idx]]
if o_type not in SUPPORTED_XSD_TYPES:
continue
if not textual_support and o_type in XSD_STRING:
continue
if not numerical_support and o_type in XSD_NUMERIC:
continue
if not temporal_support and o_type in XSD_TEMPORAL:
continue
o_values = [kg.i2n[i].value for i in o_idx_list]
if len(o_values) < min_support:
# if the full set does not exceed the threshold then
# nor will subsets thereof
continue
os_idx_map = dict(zip(o_idx_list, s_idx_list))
clusters = compute_clusters(rng, o_type,
o_values,
o_idx_list)
for members, cluster in clusters:
if o_type in set.union(XSD_NUMERIC,
XSD_DATETIME,
XSD_DATEFRAG):
var_o = MultiModalNumericVariable(o_type, cluster)
else: # treat as strings
var_o = MultiModalStringVariable(o_type, cluster)
domain = {os_idx_map[i] for i in members}
inv_assertion_map = {o_idx: domain
for o_idx in members}
pattern = new_mm_graph_pattern(root_var, var_o,
domain, p,
inv_assertion_map)
if pattern is not None\
and pattern.support >= min_support:
root_patterns[class_name].append(pattern)
tree_size = len(root_patterns[class_name])
print(" (+{} discovered)".format(tree_size))
if tree_size <= 0:
del root_patterns[class_name]
return root_patterns
[docs]def infer_type(kg: KnowledgeGraph, rdf_type_idx: int,
node_idx: int) -> tuple[Union[IRIRef, str], bool]:
""" Infer the (data) type or language tag of a resource. Defaults to
rdfs:Class if none can be inferred.
:param kg:
:type kg: KnowledgeGraph
:param rdf_type_idx:
:type rdf_type_idx: int
:param node_idx:
:type node_idx: int
:rtype: IRIRef
"""
if node_idx in kg.ni2ai.keys():
# this is a literal node
return kg.i2a[kg.ni2ai[node_idx]], True
try:
idx = np.where(kg.A[rdf_type_idx].row == node_idx)
class_idx = kg.A[rdf_type_idx].col[idx][0]
iri = kg.i2n[class_idx]
except IndexError:
iri = RDFS + "Class"
return iri, False
[docs]def new_graph_pattern(root_var, p: IRIRef, o_value: Union[IRIRef, rdfLiteral],
o_type: IRIRef, domain: set,
inv_assertion_map: dict[int, set[int]]) -> GraphPattern:
""" Create a new graph_pattern and compute members and metrics
:param kg:
:type kg: KnowledgeGraph
:param parent:
:type parent: Optional[GraphPattern]
:param var:
:type var: ObjectTypeVariable
:param p_idx:
:type p_idx: int
:param o_idx:
:type o_idx: int
:param class_members_idx:
:type class_members_idx: np.ndarray
:rtype: GraphPattern
:returns: an instance of type GraphPattern
"""
assertion = Assertion(root_var, p, ResourceWrapper(o_value, o_type))
assertion._inv_idx_map = inv_assertion_map
graph_pattern = GraphPattern({assertion})
graph_pattern.domain = domain
graph_pattern.support = len(graph_pattern.domain)
return graph_pattern
[docs]def new_var_graph_pattern(root_var,
var_o: Union[ObjectTypeVariable, DataTypeVariable],
domain: set, p: IRIRef,
inv_assertion_map: dict[int, set[int]])\
-> GraphPattern:
""" Create a new variable graph_pattern and compute members and metrics
:param kg:
:param parent:
:type parent: Optional[GraphPattern]
:param var:
:type var: ObjectTypeVariable
:param var_o:
:type var_o: Union[ObjectTypeVariable, DataTypeVariable]
:param class_members_idx:
:type class_members_idx: np.ndarray
:param s_idx_list:
:type s_idx_list: np.ndarray
:param p_idx:
:type p_idx: int
:rtype: GraphPattern
:returns: an instance of type GraphPattern
"""
# assume that all objects linked by the same relation are of the same type
# this may not always be true but usually it is
assertion = Assertion(root_var, p, var_o)
assertion._inv_idx_map = inv_assertion_map
graph_pattern = GraphPattern({assertion})
graph_pattern.domain = domain
graph_pattern.support = len(graph_pattern.domain)
return graph_pattern
[docs]def new_mm_graph_pattern(root_var,
var_o: Union[ObjectTypeVariable, DataTypeVariable],
domain: set, p: IRIRef,
inv_assertion_map: dict[int, set[int]])\
-> GraphPattern:
""" Create a new multimodal graph_pattern and compute members and metrics
:param kg:
:param parent:
:type parent: Optional[GraphPattern]
:param var:
:type var: ObjectTypeVariable
:param var_o:
:type var_o: Union[ObjectTypeVariable, DataTypeVariable]
:param members:
:type members: set
:param class_members_idx:
:type class_members_idx: np.ndarray
:param p_idx:
:type p_idx: int
:rtype: GraphPattern
:returns: an instance of type GraphPattern
"""
# assume that all objects linked by the same relation are of the same type
# this may not always be true but usually it is
assertion = Assertion(root_var, p, var_o)
assertion._inv_idx_map = inv_assertion_map
graph_pattern = GraphPattern({assertion})
graph_pattern.domain = domain
graph_pattern.support = len(graph_pattern.domain)
return graph_pattern