Source code for hypodisc.data.graph

#! /usr/bin/env python

from __future__ import annotations
import gzip
from typing import Optional, Set, Tuple, Union
from typing_extensions import Self
from uuid import uuid4

import numpy as np
import scipy.sparse as sp

from rdf import NTriples, NQuads
from rdf.terms import IRIRef, Literal
from rdf.namespaces import RDF, RDFS, SKOS, OWL, XSD
from rdf.formats import RDF_Serialization_Format


# string datatype
XSD_STRING = XSD + "string"

# default prefixes
DEFAULT_PREFIX_MAP = {OWL.value: 'owl',
                      RDF.value: 'rdf',
                      RDFS.value: 'rdfs',
                      SKOS.value: 'skos',
                      XSD.value: 'xsd',
                      "http://purl.org/dc/terms/": 'dct',
                      "http://www.w3.org/ns/prov#": 'prov'}


[docs]def ns2pf(prefix_map: dict[str, str], iri: IRIRef) -> str: ns, name = irisplit(iri) if len(prefix_map) <= 0 or ns not in prefix_map.keys(): return iri return f"{prefix_map[ns]}:{name}"
[docs]def mkprefixes(namespaces: Set[str], custom_prefix_map: Optional[dict[str, str]] = None)\ -> dict[str, str]: prefix_map = dict() # add user-provided set (if any) if custom_prefix_map is not None: prefix_map.update(custom_prefix_map) # add other namespaces: i = 1 for ns in namespaces: if ns in prefix_map.keys(): continue if ns in DEFAULT_PREFIX_MAP.keys(): # use common name prefix_map[ns] = DEFAULT_PREFIX_MAP[ns] continue prefix_map[ns] = f'ns{i}' i += 1 return prefix_map
[docs]def irisplit(e: IRIRef) -> Tuple[str, str]: i = -1 for i in range(len(e.value) - 1, 0, -1): if e.value[i] in ('/', '#'): break return e.value[:i+1], e.value[i+1:]
[docs]class UniqueLiteral(Literal): def __init__(self, value: str, datatype: Union[IRIRef, None] = None, language: Union[str, None] = None) -> None: super().__init__(value=value, datatype=datatype, language=language) self._uuid = uuid4().hex def __eq__(self, other: UniqueLiteral) -> bool: return self._uuid == other._uuid def __hash__(self) -> int: return hash(self._uuid)
[docs]class KnowledgeGraph(): """ Knowledge Graph stored in vector representation plus query functions """ def __init__(self, rng: np.random.Generator, paths: list[str]) -> None: """ Knowledge Graph stored in vector representation plus query functions :param rng: :type rng: np.random.Generator :rtype: None """ self._rng = rng self.paths = paths self.namespaces = set() def __enter__(self) -> Self: return self def __exit__(self, exc_type, exc_value, traceback) -> None: return
[docs] def parse(self) -> None: """ Parse graph on file level. Supports plain or gzipped NTriple or NQuad files :param path: :type path: list[str] :rtype: None """ nodes = dict() # type: dict[Union[IRIRef,Literal], int] relations = dict() # type: dict[IRIRef, int] annotations = dict() # type: dict[Union[IRIRef, Literal], int] literals_attr = dict() # type: dict[int, int] facts = list() # type: list[list[list[int]]] namespaces = set() # type: set[str] n_idx, r_idx, d_idx = 0, 0, 0 annotations[XSD_STRING] = 0 d_idx += 1 for path in self.paths: parts = path.split('.') is_gzipped = parts[-1] == "gz" suffix = parts[-1] if not is_gzipped else parts[-2] parser = None if suffix == "nt": parser = NTriples elif suffix == "nq": parser = NQuads else: raise Exception("Supports graphs in NTriples or NQuads format." f" Unsupported format: {suffix}") if is_gzipped: with gzip.open(path, mode='r') as gf: with parser(data=gf.read(), mode='r') as g: n_idx, r_idx, d_idx, namespaces \ = self._parse(g, (n_idx, r_idx, d_idx), (nodes, relations, annotations, literals_attr, facts)) else: with parser(path=path, mode='r') as g: n_idx, r_idx, d_idx, namespaces\ = self._parse(g, (n_idx, r_idx, d_idx), (nodes, relations, annotations, literals_attr, facts)) self.namespaces = namespaces self._parse_vectorize(facts, nodes, relations, annotations, literals_attr)
def _parse(self, g: RDF_Serialization_Format, counters: tuple[int, int, int], data: tuple[dict, dict, dict, dict, list[list[list[int]]]])\ -> tuple[int, int, int, set[str]]: """ Parse content of graph Generate indices for nodes, relations, and facts Optimize on memory use by streaming the source graph :param g: :type g: RDF_Serialization_Format :param counters: :type counters: tuple[int, int, int] :param data: :type data: tuple[dict, dict, dict, list[list[list[int]]]] :rtype: tuple[int, int, set[str]] """ n_idx, r_idx, d_idx = counters nodes, relations, annotations, literals_attr, facts = data namespaces = set() for s, p, o in g.parse(): for e in [s, p, o]: if type(e) is not IRIRef: continue ns, _ = irisplit(e) namespaces.add(ns) # assign indices to elements if s in nodes.keys(): s_idx = nodes[s] else: s_idx = n_idx nodes[s] = s_idx n_idx += 1 if p in relations.keys(): p_idx = relations[p] else: p_idx = r_idx relations[p] = p_idx facts.append([[], []]) r_idx += 1 if isinstance(o, Literal): o = UniqueLiteral(o.value, o.datatype, o.language) if o.language is not None\ and o.language not in annotations.keys(): annotations[o.language] = d_idx d_idx += 1 elif o.datatype is not None\ and o.datatype not in annotations.keys(): annotations[o.datatype] = d_idx d_idx += 1 if o in nodes.keys(): o_idx = nodes[o] else: o_idx = n_idx nodes[o] = o_idx n_idx += 1 # store s,p,o as indices facts[p_idx][0].append(s_idx) facts[p_idx][1].append(o_idx) # save datatype or language tag if isinstance(o, UniqueLiteral): if o.language is not None\ and o.language in annotations.keys(): literals_attr[o_idx] = annotations[o.language] elif o.datatype is not None\ and o.datatype in annotations.keys(): literals_attr[o_idx] = annotations[o.datatype] else: # default to string literals_attr[o_idx] = annotations[XSD_STRING] for dt in set(annotations.keys()): if type(dt) is not IRIRef: continue ns, _ = irisplit(dt) namespaces.add(ns) return n_idx, r_idx, d_idx, namespaces def _parse_vectorize(self, facts: list[list[list[int]]], nodes: dict[Union[IRIRef, Literal], int], relations: dict[IRIRef, int], annotations: dict[Union[IRIRef, Literal], int], literals_attr: dict[int, int]) -> None: """ Vectorize graph representation. :param facts: :type facts: list[list[list[int]]] :param nodes: :type nodes: dict[Union[IRIRef, Literal], int] :param relations: :type relations: dict[IRIRef, int] :param datatypes: :type datatypes: dict[int, Union[IRIRef, Literal]] :rtype: None """ # statistics self.num_facts = 0 self.num_nodes = len(nodes) self.num_relations = len(relations) self.A = list() for p_idx in sorted(relations.values()): n = len(facts[p_idx][0]) data = np.ones(n, dtype=bool) self.A.append(sp.coo_array((data, (facts[p_idx][0], facts[p_idx][1])), shape=(self.num_nodes, self.num_nodes), dtype=bool)) self.num_facts += n # lookup and reverse lookup tables self.n2i = nodes self.i2n = np.array(list(self.n2i.keys())) self.r2i = relations self.i2r = np.array(list(self.r2i.keys())) self.a2i = annotations self.i2a = np.array(list(self.a2i.keys())) self.ni2ai = literals_attr