Source code for hypodisc.core.structures

#! /usr/bin/env python

from __future__ import annotations
from typing import Dict, Iterator, List, Optional, Set, Union
from uuid import uuid4

from rdf.terms import IRIRef, Resource

from hypodisc.data.graph import ns2pf


[docs]class GraphPattern(): """ GraphPattern class Holds all assertions of a graph pattern and keeps track of the connections and distances (from the root) of these assertions. """ def __init__(self, assertions:set[Assertion] = set(), domain:set = set(), parent:Optional[GraphPattern] = None) -> None: """__init__. :param identity: :type identity: IdentityAssertion :rtype: None """ self.domain = domain # type:set self.support = len(self.domain) self.parent = parent self.connections = dict() # type: Dict[Assertion, set] self.distances = dict() # type: Dict[int, set] if len(assertions) > 0: self.root = self._infer_root(assertions) self.distances = self._compute_distances({self.root}, assertions) self.connections = self._compute_connections(self.distances) if len(assertions) == 1: self.assertion = list(self.distances[0])[0] def _infer_root(self, assertions:set[Assertion]) -> Variable: """ Infer the root node of the graph :param assertions: :type assertions: set[Assertion] :rtype: Variable """ # infer root variable vars_head = set() vars_tail = set() for lhs, _, rhs in assertions: if isinstance(lhs, Variable): vars_head.add(lhs) if isinstance(rhs, Variable): vars_tail.add(rhs) return (vars_head - vars_tail).pop() def _compute_distances(self, targets:set[Variable], assertions:set[Assertion], depth:int = 0)\ -> dict[int, set[Assertion]]: """ Compute the number of hops from the root node :param targets: :type targets: set[Variable] :param assertions: :type assertions: set[Assertion] :param depth: :type depth: int :rtype: dict[int, set[Assertion]] """ if len(assertions) <= 0: return dict() distances = {depth: set()} remainder = set() endpoints = set() for a in assertions: if a.lhs in targets: distances[depth].add(a) if isinstance(a.rhs, Variable): endpoints.add(a.rhs) continue remainder.add(a) return distances | self._compute_distances(endpoints, remainder, depth + 1) def _compute_connections(self, distances:dict[int, set[Assertion]])\ -> dict[Assertion, set[Assertion]]: """ Compute the connections between statements :param distances: :type distances: dict[int, set[Assertion]] :rtype: dict[Assertion, set[Assertion]] """ connections = {a: set() for a_set in distances.values() for a in a_set} varmap = {a.rhs: a for a in connections.keys()} for d, a_set in distances.items(): if d <= 0: continue for a in a_set: a_key = varmap[a.lhs] connections[a_key].add(a) return connections def _update_domain(self, assertion:Optional[Assertion] = None) -> set: domains = list() # type: list[set[int]] if assertion is None: for rooted_assertion in self.distances[0]: domains.append(self._update_domain(rooted_assertion)) return set.intersection(*domains) if len(self.connections[assertion]) <= 0: # if there are no further connections then we only # care about the domain for dict_values in assertion._inv_idx_map.values(): domains.append(set(dict_values)) return set.union(*domains) arange = list() for connection in self.connections[assertion]: arange.append(self._update_domain(connection)) arange = set.intersection(*arange) for obj, subs in assertion._inv_idx_map.items(): if obj in arange: domains.append(subs) return set.union(*domains) if len(domains) > 0 else set()
[docs] def extend(self, endpoint:Variable, extension:Assertion) -> None: """ Extend the body by appending a new assertion to an existing endpoint. :param endpoint: :type endpoint: Assertion :param extension: :type extension: Assertion :rtype: None """ distance = 0 if type(endpoint) is not ObjectTypeVariable: raise Exception('Cannot extend from this endpoint') if endpoint.value != extension.lhs.value: raise Exception('Cannot connect different classes') if endpoint == self.root: assert extension.lhs == endpoint distance = 0 else: assertion = None for a in self.connections.keys(): if a.rhs == endpoint: assertion = a break if assertion is None: raise Exception('No possible connection found') # connect nodes inv_idx_map = extension._inv_idx_map extension = Assertion(assertion.rhs, extension.predicate, extension.rhs) extension._inv_idx_map = inv_idx_map self.connections[assertion].add(extension) for d, assertions in self.distances.items(): if assertion in assertions: distance = d + 1 break self.connections[extension] = set() if distance not in self.distances.keys(): self.distances[distance] = set() self.distances[distance].add(extension)
[docs] def copy(self) -> GraphPattern: """ Create a deep copy, except for the assertions, which remain as pointers. :rtype: GraphPattern """ g = GraphPattern(domain = {v for v in self.domain}) g.root = self.root g.assertion = self.assertion g.domain = {e for e in self.domain} g.support = self.support g.connections = {k: {v for v in self.connections[k]} for k in self.connections.keys()} g.distances = {k: {v for v in self.distances[k]} for k in self.distances.keys()} return g
def __len__(self) -> int: """ Return the number of assertions :rtype: int """ return len(self.connections.keys())
[docs] def width(self, depth:Optional[int]=None) -> int: """Return the maximum number of assertions on a certain depth, or the most overall if depth is None. :rtype: int """ if depth is None: return max([len(s) for s in self.distances.values()]) else: return len(self.distances[depth])
def __contains__(self, assertion:Assertion) -> bool: for a in self.connections.keys(): if a.equiv(assertion): return True return False
[docs] def contains_at_depth(self, assertion:Assertion, depth:int) -> bool: if depth in self.distances.keys(): for a in self.distances[depth]: if a.equiv(assertion): return True return False
[docs] def depth(self) -> int: """ Return the length of the longest non-cyclic path :rtype: int """ return max(self.distances.keys())
def __lt__(self, other) -> bool: """ Return true if self has less assertions or less depth :param other: :rtype: bool """ if len(self) < len(other): return True if max(self.distances.keys()) < max(other.distances.keys()): return True return False
[docs] def as_query(self, prefix_map:dict[str, str]) -> str: """ Return a SPARQL query representation of this pattern. :param prefix_map: :type prefix_map: dict[str, str] :rtype: str """ q = '' for ns in sorted(prefix_map.keys()): q += fr"PREFIX {prefix_map[ns]}: <{ns}>\n" if len(prefix_map) > 0: q += r'\n' var_i = 97 # start with 'a' q += r"SELECT ?s\n" q += r"WHERE {\n" bindings = dict() postpone = dict() filter = set() for d in self.distances.keys(): for a in self.distances[d]: if isinstance(a.rhs, ObjectTypeVariable): if a.rhs not in bindings.keys(): bindings[a.rhs] = "_" + chr(var_i) # blank node postpone[a.rhs] = True var_i += 1 q += r'\t' if d == 0: q += "?s" else: if a.lhs in bindings.keys(): # ObjectTypeVariable if a.lhs in postpone.keys(): pfe = ns2pf(prefix_map, a.lhs.value) q += ( f"?{bindings[a.lhs]} rdf:type " fr"{pfe} .\n\t" ) postpone[a.lhs] = False q += f"?{bindings[a.lhs]}" else: q += f"{a.lhs}" pfe = ns2pf(prefix_map, a.predicate) q += f" {pfe} " if a.rhs in bindings.keys(): # ObjectTypeVariable q += f"?{bindings[a.rhs]}" elif type(a.rhs) is DataTypeVariable: var = '_' + chr(var_i) if type(a.rhs.value) is IRIRef: # data type pfe = ns2pf(prefix_map, a.rhs.value) filter.add(f"DATATYPE(?{var}) == {pfe}") else: # language tag filter.add(fr"LANG(?{var}) == \"{a.rhs.value}\"") q += f"?{var}" var_i += 1 elif type(a.rhs) is MultiModalStringVariable: var = '_' + chr(var_i) filter.add(fr"REGEX(STR(?{var}), \"{a.rhs.regex}\")") q += f"?{var}" var_i += 1 elif type(a.rhs) is MultiModalNumericVariable: pfe = ns2pf(prefix_map, a.rhs.value) var = '_' + chr(var_i) if a.rhs.lower_bound == a.rhs.upper_bound: f = ( fr"\"{a.rhs.lower_bound}\"^^{pfe} == " f"?{var}" ) else: f = ( fr"\"{a.rhs.lower_bound}\"^^{pfe} <= " fr"?{var} && ?{var} <= \"{a.rhs.upper_bound}\"" f"^^{pfe}") filter.add(f) q += f"?{var}" var_i += 1 elif type(a.rhs) is ResourceWrapper: if type(a.rhs.value) is IRIRef: pfe = ns2pf(prefix_map, a.rhs.value) q += f"{pfe}" else: q += fr"\"{a.rhs.value}\"" if type(a.rhs.type) is IRIRef: # data type pfe = ns2pf(prefix_map, a.rhs.type) q += f"^^{pfe}" else: # language tag q += f"@{a.rhs.type}" else: q += f"{a.rhs}" q += r' .\n' if d < self.depth(): q += r'\n' add_whitespace = True for otype in postpone.keys(): if postpone[otype]: if add_whitespace: q += r'\n' add_whitespace = False pfe = ns2pf(prefix_map, otype.value) q += ( fr"\t?{bindings[otype]} rdf:type " fr"{pfe} .\n" ) postpone[otype] = False if len(filter) > 0: q += r"\n\tFILTER (\n\t\t" + r" &&\n\t\t".join(filter) + r"\n\t)\n" q += "}" return q
[docs] def as_dot(self, prefix_map:dict[str, str]) -> str: """ Return a dot representation of this pattern. :param prefix_map: :type prefix_map: dict[str, str] :rtype: str """ labels = dict() annotations = set() postpone = set() entities = dict() classes = dict() literals = dict() i = 1 var_i = 97 # start with 'a' dot = r"strict digraph { " dot += r"subgraph glabel { glabel [shape = box, label = \"" for k,v in prefix_map.items(): dot += fr"{v}: <{k}>\\n" dot += r"\"] } " dot += r"edge [ minlen = 2 ];" dot += r"\"root\" [shape = oval, label = \"?s\", style = bold ]; " for d in self.distances.keys(): for a in self.distances[d]: edge = '' if a.lhs == self.root: edge += r"\"root\"" else: # ObjectTypeVariable pfe = ns2pf(prefix_map, a.lhs.value) class_lab = fr"\"{pfe}\"" if a.lhs.value not in entities.keys(): entities[a.lhs.value] = f"n{i}" i += 1 node_id = entities[a.lhs.value] edge += node_id if node_id not in labels.keys(): labels[node_id] = fr"\"?_{chr(var_i)}\"" var_i += 1 if class_lab not in classes.keys(): classes[class_lab] = f"n{i}" i += 1 class_id = classes[class_lab] if class_id not in labels.keys(): labels[class_id] = class_lab postpone.add(fr"{node_id} -> {class_id} " r"[label = \"rdf:type\"]; ") edge += " -> " if isinstance(a.rhs, ObjectTypeVariable): pfe = ns2pf(prefix_map, a.rhs.value) class_lab = fr"\"{pfe}\"" if a.rhs.value not in entities.keys(): entities[a.rhs.value] = f"n{i}" i += 1 node_id = entities[a.rhs.value] edge += node_id if node_id not in labels.keys(): labels[node_id] = fr"\"?_{chr(var_i)}\"" var_i += 1 if class_lab not in classes.keys(): classes[class_lab] = f"n{i}" i += 1 class_id = classes[class_lab] if class_id not in labels.keys(): labels[class_id] = class_lab postpone.add(fr"{node_id} -> {class_id} " r"[label = \"rdf:type\"]; ") elif type(a.rhs) is DataTypeVariable: literals[a.rhs] = f"n{i}" i += 1 node_id = literals[a.rhs] edge += node_id if node_id not in labels.keys(): labels[node_id] = fr"\"?_{chr(var_i)}\"" var_i += 1 ann_id = f"n{i}" i += 1 annotations.add(ann_id) if type(a.rhs.value) is IRIRef: # data type pfe = ns2pf(prefix_map, a.rhs.value) labels[ann_id] = fr"\"{pfe}\"" postpone.add(fr"{node_id} -> {ann_id} " "[label = datatype, style = dashed, " "arrowhead = none]; ") else: # language tag labels[ann_id] = fr"\"{a.rhs.value}\"" postpone.add(fr"{node_id} -> {ann_id} " "[label = language, style = dashed, " "arrowhead = none]; ") elif type(a.rhs) is MultiModalStringVariable: literals[a.rhs] = f"n{i}" i += 1 node_id = literals[a.rhs] edge += node_id if node_id not in labels.keys(): labels[node_id] = fr"\"?_{chr(var_i)}\"" var_i += 1 ann_id = f"n{i}" i += 1 annotations.add(ann_id) labels[ann_id] = fr"\"\\\"{a.rhs.regex}\\\"\"" postpone.add(fr"{node_id} -> {ann_id} " "[label = regex, style = dashed, " "arrowhead = none]; ") elif type(a.rhs) is MultiModalNumericVariable: literals[a.rhs] = f"n{i}" i += 1 node_id = literals[a.rhs] edge += node_id if node_id not in labels.keys(): labels[node_id] = fr"\"?_{chr(var_i)}\"" var_i += 1 ann_id = f"n{i}" i += 1 annotations.add(ann_id) pfe = ns2pf(prefix_map, a.rhs.value) labels[ann_id] = ( fr"\"{pfe}\"") postpone.add(fr"{node_id} -> {ann_id} " "[label = datatype, style = dashed, " "arrowhead = none]; ") ann_id = f"n{i}" i += 1 annotations.add(ann_id) labels[ann_id] = ( fr"\"[{a.rhs.lower_bound}, " fr"{a.rhs.upper_bound}]\"") postpone.add(fr"{node_id} -> {ann_id} " "[label = range, style = dashed, " "arrowhead = none]; ") elif type(a.rhs) is ResourceWrapper: if type(a.rhs.value) is IRIRef: if a.rhs.value not in entities.keys(): entities[a.rhs.value] = f"n{i}" i += 1 node_id = entities[a.rhs.value] edge += node_id pfe = ns2pf(prefix_map, a.rhs.value) labels[node_id] = fr"\"{pfe}\"" else: literals[a.rhs] = f"n{i}" i += 1 node_id = literals[a.rhs] edge += node_id ann_id = f"n{i}" i += 1 annotations.add(ann_id) if type(a.rhs.type) is IRIRef: # data type pfe = ns2pf(prefix_map, a.rhs.type) labels[node_id] = fr"\"{a.rhs.value}\"" labels[ann_id] = fr"\"{pfe}\"" postpone.add(fr"{node_id} -> {ann_id} " "[label = datatype, style = dashed, " "arrowhead = none]; ") else: # language tag labels[node_id] = fr"\"{a.rhs.value}\"" labels[ann_id] = fr"\"{a.rhs.type}\"" postpone.add(fr"{node_id} -> {ann_id} " "[label = language, style = dashed, " "arrowhead = none]; ") pfe = ns2pf(prefix_map, a.predicate) edge += fr"[label = \"{pfe}\"]; " dot += edge for t in postpone: dot += t for node_id in entities.values(): lab = labels[node_id] dot += fr"{node_id} [shape = oval, label = {lab}]; " for node_id in classes.values(): lab = labels[node_id] dot += fr"{node_id} [shape = box, label = {lab}]; " for node_id in literals.values(): lab = labels[node_id] dot += fr"{node_id} [shape = plaintext, label = {lab}]; " for node_id in annotations: lab = labels[node_id] dot += fr"{node_id} [shape = plain, label = {lab}]; " return dot + '}'
def __repr__(self) -> str: """ Return an internal string representation :rtype: str """ return "GraphPattern [{}]".format(str(self)) def __str__(self) -> str: """ Return a string representation :rtype: str """ return "{" + "; ".join([str(assertion) for assertion in sorted(self.connections.keys())]) + "}" def __hash__(self) -> int: return hash(str(self))
[docs]class ResourceWrapper(IRIRef): """ Resource Wrapper class A wrapper which can take on any value of a certain resource, but which stores additional information """ def __init__(self, resource:Resource, type:Optional[IRIRef]=None) -> None: """ Initialize and instance of this class :params resource: the IRI or Literal value :returns: None """ super().__init__(resource) self.type = type def __eq__(self, other:ResourceWrapper) -> bool: if type(other) is not ResourceWrapper: return False return self.type == other.type\ and self.value == other.value def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "RESOURCE [{}]".format(str(self.value)) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "Resource {} [{}]".format(str(id(self)), str(self)) def __hash__(self) -> int: return hash(str(self))
[docs]class Variable(IRIRef): """ Variable class An unbound variable which can take on any value of a certain object or data type resource. Each instance has an unique ID to allow this object to be used as variable shared between assertions. """ def __init__(self, resource:IRIRef) -> None: """ Initialize and instance of this class :params resource: the IRI of the class or datatype :returns: None """ super().__init__(resource) self._uuid = uuid4().hex def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "VAR {}".format(self._uuid) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "Variable {}".format(self._uuid) def __lt__(self, other:Variable) -> bool: return str(self) < str(other) def __eq__(self, other:Variable) -> bool: return type(self) is type(other)\ and self._uuid == other._uuid
[docs] def equiv(self, other:Variable) -> bool: return type(self) is type(other)\ and self.value == other.value
def __hash__(self) -> int: return hash(str(self))
[docs]class TypeVariable(Variable): """ Type Variable class An unbound variable which can take on any value of a certain object or data type resource. Each instance has an unique ID to allow this object to be used as variable shared between assertions. """ def __init__(self, resource:IRIRef) -> None: """ Initialize and instance of this class :params resource: the IRI of the class or datatype :returns: None """ super().__init__(resource)
[docs] def equiv(self, other:TypeVariable) -> bool: return type(self) is type(other)\ and self.value == other.value
def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "TYPE [{}]".format(str(self.value)) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "TypeVariable {} [{}]".format(self._uuid, str(self))
[docs]class ObjectTypeVariable(TypeVariable): """ Object Type Variable class An unbound variable which can be any member of an object type class (entity) """ def __init__(self, resource:IRIRef): """ Initialize and instance of this class :params resource: the IRI of the class :returns: None """ super().__init__(resource) def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "OBJECT TYPE [{}]".format(str(self.value)) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "ObjectTypeVariable {} [{}]".format(self._uuid, str(self))
[docs]class DataTypeVariable(TypeVariable): """ Data Type Variable class An unbound variable which can take on any value of a data type class (literal) """ def __init__(self, resource:IRIRef): """ Initialize and instance of this class :params resource: the IRI of the datatype :returns: None """ super().__init__(resource) def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "DATA TYPE ({})".format(str(self.value)) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "DataTypeVariable [{}]".format(str(self.value))
[docs]class MultiModalVariable(DataTypeVariable): """ Multimodal Variable class An unbound variable which conveys a description of a cluster of values for node features. """ def __init__(self, resource:IRIRef): """ Initialize and instance of this class :params resource: the IRI of the datatype :returns: None """ super().__init__(resource) self.dtype = resource.value.split('/')[-1] if '#' in self.dtype: self.dtype = self.dtype.split('#')[-1]
[docs] def equiv(self, other:MultiModalVariable) -> bool: return type(self) is type(other)\ and self.value == other.value\ and self.dtype == other.dtype
def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return "MULTIMODAL [{}]".format(str(self.value)) def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "MultiModalVariable [{} - {}]".format(str(self.dtype), str(self.value))
[docs]class MultiModalNumericVariable(MultiModalVariable): """ Numeric Variable class """ _leq = chr(0x2264) # <= sign def __init__(self, resource:IRIRef, crange:tuple[float, float]) -> None: """ Initialize and instance of this class :returns: None """ super().__init__(resource) self.lower_bound, self.upper_bound = crange
[docs] def equiv(self, other:MultiModalNumericVariable) -> bool: """ Return true if self and other represent the same datatype, and have the same mean and variance. :param other: :type other: MultiModalNumericVariable :rtype: bool """ return type(self) is type(other)\ and self.dtype == other.dtype\ and self.value == other.value\ and self.lower_bound == other.lower_bound\ and self.upper_bound == other.upper_bound
def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return (f"{self.dtype}: {self.lower_bound:.2E} " f"{self._leq} x {self._leq} {self.upper_bound:.2E}") def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "MultiModalVariable {}".format(str(self))
[docs]class MultiModalStringVariable(MultiModalVariable): """ String Variable class """ def __init__(self, resource, regex) -> None: """ Initialize and instance of this class :returns: None """ super().__init__(resource) self.regex = regex
[docs] def equiv(self, other:MultiModalStringVariable) -> bool: """ Return true if self and other represent the same datatype, and have the same regular expression. :param other: :type other: MultiModalStringVariable :rtype: bool """ # does not account for equivalent regex patterns return type(self) is type(other)\ and self.value == other.value\ and self.regex == other.regex
def __str__(self) -> str: """ Return a description of this variable :returns: a description of this variable :rtype: str """ return f"{self.dtype}: {self.regex}" def __repr__(self) -> str: """ Return a technical description of this variable :returns: a technical description of this variable :rtype: str """ return "MultiModalVariable {}".format(str(self))
[docs]class Assertion(tuple): """ Assertion class Wrapper around tuple (an assertion) that gives each instantiation an unique uuid which allows for comparisons between assertions with the same values. This is needed when either lhs or rhs use TypeVariables. """ def __new__(cls, subject:Union[ResourceWrapper, Variable], predicate:Resource, object:Union[ResourceWrapper, Variable]) -> Assertion: """ Create a new instance of Assertion :param subject: the subject of an assertion :param predicate: the predicate of an assertion :param object: the object of an assertion :returns: a new instance of Assertion """ return super().__new__(cls, (subject, predicate, object)) def __init__(self, subject:Union[ResourceWrapper, Variable], predicate:Resource, object:Union[ResourceWrapper, Variable], _uuid:Optional[str] = None) -> None: """ Initialize a new instance of Assertion :param subject: the subject of an assertion :param predicate: the predicate of an assertion :param object: the object of an assertion :param _uuid: identifier of this instance :returns: None """ self.lhs = subject self.predicate = predicate self.rhs = object self.uuid = _uuid if _uuid is not None else 'A' + uuid4().hex self._inv_idx_map = dict() # type: dict[int,set[int]]
[docs] def copy(self, deep:bool = False) -> Assertion: """ Return a copy of this object :param deep: Copy the UUID and HASH as well :returns: a copy of type Assertion :rtype: Assertion """ copy = Assertion(self.lhs, self.predicate, self.rhs) if deep: copy.uuid = self.uuid return copy
def __getnewargs__(self) -> Assertion: """ Pickle elements correctly :returns: an instance of Assertion :rtype: Assertion """ return Assertion(self.lhs, self.predicate, self.rhs) def __hash__(self) -> int: """ Return unique hash for each assertion, regardless of content. :rtype: int """ return hash(str(self)) def __str__(self) -> str: """ Return the string description of this assertion :returns: a description of this assertion :rtype: str """ return "(" + ', '.join([str(self.lhs), str(self.predicate), str(self.rhs)]) + ")" def __lt__(self, other:Assertion) -> bool: """ Compare assertions per element :returns: true if self < other else false :rtype: str """ if self.predicate < other.predicate: return True if self.predicate == other.predicate: if self.lhs < other.lhs: return True if self.lhs == other.lhs and self.rhs < other.rhs: return True return False
[docs] def equal(self, other:Assertion) -> bool: """ Return true if assertions are equal content wise :param other: :type other: Assertion :rtype: bool """ if self.predicate == other.predicate\ and self.lhs == self.lhs\ and self.rhs == self.rhs: return True return False
[docs] def equiv(self, other:Assertion) -> bool: """ Return true is assertions are equivalent, which implies that they state the same or have an entity/attribute that is an instance of the type specified by the other. :param other: :type other: Assertion :rtype: bool """ if self.equal(other): return True if self.predicate != other.predicate\ or not self.lhs.equiv(other.lhs): return False # one is an instance of the other's type for a_rhs, b_rhs in [(self.rhs, other.rhs), (other.rhs, self.rhs)]: if isinstance(a_rhs, TypeVariable)\ and isinstance(b_rhs, ResourceWrapper)\ and a_rhs.value == b_rhs.type: return True return False