Source code for hypodisc.multimodal.clustering

#! /usr/bin/env python

from sys import maxsize
from typing import Any

import numpy as np
from rdf.terms import IRIRef, Literal
from rdf.namespaces import XSD
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler

from hypodisc.multimodal.datatypes import (XSD_DATEFRAG, XSD_DATETIME,
                                           XSD_NUMERIC, XSD_STRING)
from hypodisc.multimodal.langutil import (generalize_patterns, generate_regex,
                                          RegexPattern)
from hypodisc.multimodal.timeutils import (cast_datefrag_delta,
                                           cast_datefrag_rev, cast_datefrag,
                                           cast_datetime, cast_datetime_delta,
                                           cast_datetime_rev)


SUPPORTED_XSD_TYPES = set.union(XSD_DATEFRAG,
                                XSD_DATETIME,
                                XSD_NUMERIC,
                                XSD_STRING)

CLUSTERS_MIN = 1
CLUSTERS_MAX = 10
SEED_MIN = 0
SEED_MAX = 2**32 - 1


[docs]def cast_values(dtype:IRIRef, values:list) -> tuple[np.ndarray, np.ndarray]: """ Cast raw values to a datatype suitable for clustering. Default to string. :param dtype: :type dtype: IRIRef :param values: :type values: list :rtype: np.ndarray """ X = np.empty(len(values), dtype=object) X_idx = list() if dtype in XSD_NUMERIC: func = lambda _, v : float(v) elif dtype in XSD_DATETIME: # cluster on POSIX timestamps func = lambda dtype, v : cast_datetime(dtype, v) elif dtype in XSD_DATEFRAG: # cluster on days func = lambda dtype, v : cast_datefrag(dtype, v) else: # default to XSD_STRING: # nothing changes func = lambda _, v : str(v) for i, v in enumerate(values): try: X[i] = func(dtype, v) except: continue X_idx.append(i) return X[X_idx], np.array(X_idx, dtype=int)
[docs]def cast_values_rev_dist(dtype:IRIRef, clusters:list[tuple])\ -> list[tuple[set, Any]]: """ Cast clusters to relevant datatypes distributions :param dtype: :type dtype: IRIRef :param clusters: :type clusters: list[tuple] :rtype: list[tuple[set, Any]] """ values = list() if dtype in set.union(XSD_NUMERIC, XSD_DATETIME, XSD_DATEFRAG): for mu, sigma, members in clusters: try: if dtype in XSD_NUMERIC: if 'integer' in dtype.value.lower(): mu = int(mu) sigma = int(sigma) values.append((members, (mu, sigma))) elif dtype in XSD_DATETIME: # POSIX timestamps mu = cast_datetime_rev(dtype, mu) # returns dtype sigma = cast_datetime_delta(sigma) # returns duration values.append((members, (mu, sigma))) elif dtype in XSD_DATEFRAG: # days mu = cast_datefrag_rev(dtype, mu) # returns dtype sigma = cast_datefrag_delta(sigma) # return dayTimeDuration values.append((members, (mu, sigma))) except: continue else: # default to string for pattern, members in clusters: values.append((members, pattern.exact())) return values
[docs]def cast_values_rev(dtype:IRIRef, clusters:list[tuple])\ -> list[tuple[set, Any]]: """ Cast clusters to relevant datatypes ranges :param dtype: :type dtype: IRIRef :param clusters: :type clusters: list[tuple] :rtype: list[tuple[set, Any]] """ values = list() if dtype in set.union(XSD_NUMERIC, XSD_DATETIME, XSD_DATEFRAG): for mu, sigma, members in clusters: try: lower_bound = mu - 3 * sigma upper_bound = mu + 3 * sigma if dtype in XSD_NUMERIC: if 'integer' in dtype.value.lower(): lower_bound = int(lower_bound) upper_bound = int(upper_bound) else: lower_bound = float(lower_bound) upper_bound = float(upper_bound) values.append((members, (lower_bound, upper_bound))) elif dtype in XSD_DATETIME: # POSIX timestamps lower_bound = cast_datetime_rev(dtype, lower_bound) # returns dtype upper_bound = cast_datetime_rev(dtype, upper_bound) # returns dtype values.append((members, (lower_bound, upper_bound))) elif dtype in XSD_DATEFRAG: # days lower_bound = cast_datefrag_rev(dtype, lower_bound) # returns dtype upper_bound = cast_datefrag_rev(dtype, upper_bound) # returns dtype values.append((members, (lower_bound, upper_bound))) except: continue else: # default to string for pattern, members in clusters: values.append((members, pattern.exact())) return values
[docs]def compute_clusters(rng:np.random.Generator, dtype:IRIRef, values:list, values_gidx:np.ndarray)\ -> list[tuple[set, Any]]: """Compute clusters from list of values. :param rng: :type rng: np.random.Generator :param dtype: :type dtype: IRIRef :param values: :type values: list[Literal] :param values_gidx: :type values_gidx: np.ndarray :rtype: list[tuple[set, Any]] """ X, X_idx = cast_values(dtype, values) X_gidx = values_gidx[X_idx] # global indices of nodes in order of X if dtype in set.union(XSD_NUMERIC, XSD_DATETIME, XSD_DATEFRAG): X = X.astype(np.float32) num_components = range(CLUSTERS_MIN, CLUSTERS_MAX) means, stdevs, assignments = compute_numeric_clusters(rng, X, num_components) clusters = [(means[i], stdevs[i], set(X_gidx[assignments == i])) for i in range(len(means))] values = cast_values_rev(dtype, clusters) else: # default to string X = X.astype(str) clusters = string_clusters(X, X_gidx) values = cast_values_rev(dtype, clusters) return values
[docs]def compute_numeric_clusters(rng:np.random.Generator, X:np.ndarray, num_components:range, num_tries:int = 3, eps:float = 1e-3, standardize:bool = True, shuffle:bool = True)\ -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute numerical cluster means and stdevs for a range of possible number of components. Also return the cluster assignments. :param rng: :type rng: np.random.Generator :param X: :type X: np.ndarray :param num_components: :type num_components: range :param num_tries: :type num_tries: int :param eps: :type eps: float :param standardize: :type standardize: bool :param shuffle: :type shuffle: bool :rtype: dict """ if X.ndim == 1: # convert to array of shape (n_samples, n_features) X = X.reshape(-1, 1) scaler = StandardScaler() if standardize: # standardize scaler.fit(X) X = scaler.transform(X) # compensate for small datasets # let stdev of noise scale with data sample = X stdev = np.sqrt(X.var()) while sample.shape[0] < 1024: sample = np.vstack([sample, X + rng.normal(0, stdev/(stdev+1))]) if shuffle: # shuffle order rng.shuffle(sample) bic_min = None # best score mu = np.empty(0) covar = np.empty(0) assignments = np.empty(0) for nc in num_components: bic, means, covars, y = compute_GMM(rng, X, sample, nc, num_tries, eps) if bic_min is None or bic + eps < bic_min: bic_min = bic mu = means covar = covars assignments = y if standardize: # revert standardization mu = scaler.inverse_transform(mu) sigma = np.einsum('s,pqr->psq', np.sqrt(scaler.var_), covar) else: # compute standard deviations num_components = mu.shape[0] sigma = np.array([[np.sqrt(np.trace(covar[i])/num_components)] for i in range(0, num_components)]) return mu, sigma, assignments
[docs]def compute_GMM(rng:np.random.Generator, X:np.ndarray, sample:np.ndarray, num_components:int, num_tries:int, eps:float) -> tuple[float, np.ndarray, np.ndarray, np.ndarray]: """ Compute a GMM from different random states and return the best results. Train the model on the sample but returns the predictions on X :param rng: :type rng: np.random.Generator :param X: :type X: np.ndarray :param num_components: :type num_components: int :param num_tries: :type num_tries: int :param eps: :type eps: float :rtype: list """ bic_min = float(maxsize) # best score mu = np.empty(0) covar = np.empty(0) assignments = np.empty(0) for _ in range(num_tries): seed = rng.integers(SEED_MIN, SEED_MAX) gmm = GaussianMixture(n_components = num_components, random_state = seed) gmm.fit(sample) bic = gmm.bic(sample) if bic_min is None or bic + eps < bic_min: bic_min = float(bic) mu = gmm.means_ covar = gmm.covariances_ assignments = gmm.predict(X) return bic_min, mu, covar, assignments
[docs]def string_clusters(X:np.ndarray, X_gidx:np.ndarray, merge_charsets:bool = True, omit_empty:bool = True)\ -> list[tuple[RegexPattern,set[int]]]: """ Generate clusters of string by infering regex patterns and by generalizing these on similarity. :param object_list: :type object_list: np.ndarray :param merge_charsets: :type merge_charsets: bool :param omit_empty: :type omit_empty: bool :rtype: dict[RegexPattern,int] """ patterns = dict() #type: dict[RegexPattern, set[int]] for i, s in enumerate(X): try: pattern = generate_regex(s) except: continue if not (omit_empty and len(pattern) <= 0): # map patterns to global node indices whose value they match idx_set = { X_gidx[i] } if pattern not in patterns.keys(): patterns[pattern] = set() patterns[pattern] = patterns[pattern].union(idx_set) # merge character sets on word level and generalize these as well if merge_charsets: merged_patterns = dict() #type: dict[RegexPattern, set[int]] for p, members in patterns.items(): q = p.generalize() if p == q: # no further generalization possible continue if q not in merged_patterns.keys(): merged_patterns[q] = set() merged_patterns[q] = merged_patterns[q].union(members) # update pattern dictionary for p, members in merged_patterns.items(): if p not in patterns.keys(): patterns[p] = set() patterns[p] = patterns[p].union(members) # generalize found patterns for p, members in generalize_patterns(patterns).items(): if p not in patterns.keys(): patterns[p] = set() patterns[p] = patterns[p].union(members) return list(patterns.items())