Source code for semra.api

"""Semantic Mapping Reasoning Assembler."""

from __future__ import annotations

import itertools as itt
import logging
import typing
import typing as t
from collections import Counter, defaultdict
from collections.abc import Collection, Iterable
from typing import Literal, NamedTuple, TypeAlias, TypeVar, cast, overload

import bioregistry
import networkx as nx
import pandas as pd
import ssslm
from pydantic import BaseModel, Field
from ssslm import LiteralMapping
from tqdm.auto import tqdm

from semra.io.graph import _from_digraph_edge, to_digraph
from semra.rules import FLIP, SubsetConfiguration
from semra.struct import (
    Evidence,
    Mapping,
    MappingSet,
    ReasonedEvidence,
    Reference,
    SimpleEvidence,
    Triple,
)
from semra.utils import cleanup_prefixes, semra_tqdm
from semra.vocabulary import DB_XREF, EXACT_MATCH, INVERSION_MAPPING, KNOWLEDGE_MAPPING

__all__ = [
    "TEST_MAPPING_SET",
    "IdentifierIndex",
    "Index",
    "M2MIndex",
    "Mutation",
    "PrefixIdentifierDict",
    "PrefixIdentifierDict",
    "PrefixPairCounter",
    "apply_mutations",
    "assemble_evidences",
    "assert_projection",
    "count_component_sizes",
    "count_source_target",
    "deduplicate_evidence",
    "filter_many_to_many",
    "filter_mappings",
    "filter_minimum_confidence",
    "filter_prefixes",
    "filter_self_matches",
    "filter_subsets",
    "flip",
    "get_asymmetric_counter",
    "get_identifier_index",
    "get_index",
    "get_many_to_many",
    "get_observed_terms",
    "get_priority_reference",
    "get_symmetric_counter",
    "get_terms",
    "get_test_evidence",
    "get_test_reference",
    "hydrate_subsets",
    "keep_object_prefixes",
    "keep_prefixes",
    "keep_subject_prefixes",
    "print_source_target_counts",
    "prioritize",
    "prioritize_df",
    "project",
    "project_dict",
    "str_source_target_counts",
    "summarize_prefixes",
    "tabulate_index",
    "unindex",
    "update_literal_mappings",
    "validate_mappings",
]

logger = logging.getLogger(__name__)

#: An index allows for the aggregation of evidences for each core triple
Index = dict[Triple, list[Evidence]]

X = TypeVar("X")

#: A test mapping set that can be used in examples.
TEST_MAPPING_SET = MappingSet(name="Test Mapping Set", confidence=0.95)


# docstr-coverage: inherited
@typing.overload
def get_test_evidence(n: int) -> list[SimpleEvidence]: ...


# docstr-coverage: inherited
@typing.overload
def get_test_evidence(n: None) -> SimpleEvidence: ...


[docs] def get_test_evidence(n: int | None = None) -> SimpleEvidence | list[SimpleEvidence]: """Get test evidence.""" if isinstance(n, int): return [ SimpleEvidence( mapping_set=TEST_MAPPING_SET, author=Reference(prefix="orcid", identifier=f"0000-0000-0000-000{n}"), ) for n in range(n) ] return SimpleEvidence(mapping_set=TEST_MAPPING_SET)
# docstr-coverage: inherited @typing.overload def get_test_reference(n: int, prefix: str) -> list[Reference]: ... # docstr-coverage: inherited @typing.overload def get_test_reference(n: None, prefix: str) -> Reference: ...
[docs] def get_test_reference(n: int | None = None, prefix: str = "go") -> Reference | list[Reference]: """Get test reference(s).""" if isinstance(n, int): return [Reference(prefix=prefix, identifier=str(i + 1).zfill(7)) for i in range(n)] return Reference(prefix=prefix, identifier="0000001")
[docs] def count_source_target(mappings: Iterable[Mapping]) -> Counter[tuple[str, str]]: """Count pairs of source/target prefixes. :param mappings: An iterable of mappings :return: A counter whose keys are pairs of source prefixes and target prefixes appearing in the mappings >>> from semra import Mapping, Reference, EXACT_MATCH >>> from semra.api import get_test_reference >>> r1, r2 = get_test_reference(2) >>> m1 = Mapping(subject=r1, predicate=EXACT_MATCH, object=r2) >>> counter = count_source_target([m1]) """ return Counter((triple.subject.prefix, triple.object.prefix) for triple in get_index(mappings))
[docs] def str_source_target_counts(mappings: Iterable[Mapping], minimum: int = 0) -> str: """Create a table of counts of source/target prefix via :mod:`tabulate`. :param mappings: An iterable of mappings :param minimum: The minimum count to display in the table. Defaults to zero, which displays all source/target prefix pairs. :return: A table representing the counts for each source/target prefix pair. .. seealso:: This table is generated with :func:`count_source_target` """ from tabulate import tabulate so_prefix_counter = count_source_target(mappings) return tabulate( [(s, o, c) for (s, o), c in so_prefix_counter.most_common() if c > minimum], headers=["source prefix", "target prefix", "count"], tablefmt="github", )
[docs] def get_index(mappings: Iterable[Mapping], *, progress: bool = True, leave: bool = False) -> Index: """Aggregate and deduplicate evidences for each core triple.""" dd: defaultdict[Triple, list[Evidence]] = defaultdict(list) for mapping in semra_tqdm(mappings, desc="Indexing mappings", progress=progress, leave=leave): dd[mapping.triple].extend(mapping.evidence) return {triple: deduplicate_evidence(triple, evidence) for triple, evidence in dd.items()}
[docs] def assemble_evidences(mappings: list[Mapping], *, progress: bool = True) -> list[Mapping]: """Assemble evidences. More specifically, this aggregates evidences for all subject-predicate-object triples into a single :class:`semra.Mapping` instance. :param mappings: An iterable of mappings :param progress: Should a progress bar be shown? Defaults to true. :returns: A processed list of mappings, that is guaranteed to have exactly 1 Mapping object for each subject-predicate-object triple. Note that if the predicate is different, evidences are not assembled into the same Mapping object. >>> from semra import Mapping, Reference, EXACT_MATCH >>> from semra.api import get_test_evidence, get_test_reference >>> r1, r2 = get_test_reference(2) >>> e1, e2 = get_test_evidence(2) >>> m1 = Mapping(subject=r1, predicate=EXACT_MATCH, object=r2, evidence=[e1]) >>> m2 = Mapping(subject=r1, predicate=EXACT_MATCH, object=r2, evidence=[e2]) >>> m = assemble_evidences([m1, m2]) >>> assert m == [Mapping(subject=r1, predicate=EXACT_MATCH, object=r2, evidence=[e1, e2])] """ index = get_index(mappings, progress=progress) return unindex(index, progress=progress)
# TODO infer negative mappings for exact match from narrow/broad match # docstr-coverage:excused `overload` @overload def flip(mapping: Mapping, *, strict: Literal[True] = True) -> Mapping: ... # docstr-coverage:excused `overload` @overload def flip(mapping: Mapping, *, strict: Literal[False] = False) -> Mapping | None: ...
[docs] def flip(mapping: Mapping, *, strict: bool = False) -> Mapping | None: """Flip a mapping, if the relation is configured with an inversion. :param mapping: An input mapping :return: If the input mapping's predicate is configured with an inversion (e.g., broad match is configured by default to invert to narrow match), a new mapping is returned with the subject and object swapped, with the inverted predicate, and with a "mutated" evidence to track original provenance. If the mapping's predicate is not configured with an inversion (e.g., for practical purposes, regular dbrefs and close matches are not configured to invert), then None is returned """ if (p := FLIP.get(mapping.predicate)) is not None: return Mapping( subject=mapping.object, predicate=p, object=mapping.subject, evidence=[ReasonedEvidence(justification=INVERSION_MAPPING, mappings=[mapping])], ) elif strict: raise ValueError else: return None
def iter_components(mappings: t.Iterable[Mapping]) -> t.Iterable[set[Reference]]: """Iterate over connected components in the multidigraph view over the mappings.""" graph = to_digraph(mappings) return cast(t.Iterable[set[Reference]], nx.weakly_connected_components(graph))
[docs] def tabulate_index(index: Index) -> str: """Create a table of all mappings contained in an index. :param index: An index of mappings - a dictionary whose keys are subject-predicate-object tuples and values are lists of associated evidence (pre-deduplicated) :return: A table with four columns: 1. Source 2. Predicate 3. Object 4. Evidences """ from tabulate import tabulate rows: list[tuple[str, str, str, str]] = [] for triple, evidences in sorted(index.items()): if not evidences: rows.append((triple.subject.curie, triple.predicate.curie, triple.object.curie, "")) else: first, *rest = evidences rows.append( (triple.subject.curie, triple.predicate.curie, triple.object.curie, str(first)) ) for r in rest: rows.append(("", "", "", str(r))) return tabulate(rows, headers=["s", "p", "o", "ev"], tablefmt="github")
[docs] def keep_prefixes( mappings: Iterable[Mapping], prefixes: str | Iterable[str], *, progress: bool = True ) -> list[Mapping]: """Filter out mappings whose subject or object are not in the given list of prefixes. :param mappings: A list of mappings :param prefixes: A set of prefixes to use for filtering the mappings :param progress: Should a progress bar be shown? Defaults to true. :return: A subset of the original mappings whose subject and object are both in the given prefix list >>> from semra import DB_XREF, EXACT_MATCH, Reference >>> curies = "DOID:0050577", "mesh:C562966", "umls:C4551571" >>> r1, r2, r3 = (Reference.from_curie(c) for c in curies) >>> m1 = Mapping.from_triple((r1, DB_XREF, r2)) >>> m2 = Mapping.from_triple((r2, DB_XREF, r3)) >>> m3 = Mapping.from_triple((r1, DB_XREF, r3)) >>> assert keep_prefixes([m1, m2, m3], {"DOID", "mesh"}) == [m1] """ prefixes = cleanup_prefixes(prefixes) return [ mapping for mapping in semra_tqdm( mappings, desc=f"Keeping from {len(prefixes)} prefixes", progress=progress ) if mapping.subject.prefix in prefixes and mapping.object.prefix in prefixes ]
[docs] def keep_subject_prefixes( mappings: Iterable[Mapping], prefixes: str | Iterable[str], *, progress: bool = True ) -> list[Mapping]: """Filter out mappings whose subjects are not in the given list of prefixes. :param mappings: A list of mappings :param prefixes: A set of prefixes to use for filtering the mappings' subjects :param progress: Should a progress bar be shown? Defaults to true. :return: A subset of the original mappings whose subjects are in the given prefix list >>> from semra import DB_XREF, EXACT_MATCH, Reference >>> curies = "DOID:0050577", "mesh:C562966", "umls:C4551571" >>> r1, r2, r3 = (Reference.from_curie(c) for c in curies) >>> m1 = Mapping.from_triple((r1, DB_XREF, r2)) >>> m2 = Mapping.from_triple((r2, DB_XREF, r3)) >>> m3 = Mapping.from_triple((r1, DB_XREF, r3)) >>> assert keep_subject_prefixes([m1, m2, m3], {"DOID"}) """ prefixes = cleanup_prefixes(prefixes) return [ mapping for mapping in semra_tqdm(mappings, desc="Filtering subject prefixes", progress=progress) if mapping.subject.prefix in prefixes ]
[docs] def keep_object_prefixes( mappings: Iterable[Mapping], prefixes: str | Iterable[str], *, progress: bool = True ) -> list[Mapping]: """Filter out mappings whose objects are not in the given list of prefixes. :param mappings: A list of mappings :param prefixes: A set of prefixes to use for filtering the mappings' objects :param progress: Should a progress bar be shown? Defaults to true. :return: A subset of the original mappings whose objects are in the given prefix list >>> from semra import DB_XREF, EXACT_MATCH, Reference >>> curies = "DOID:0050577", "mesh:C562966", "umls:C4551571" >>> r1, r2, r3 = (Reference.from_curie(c) for c in curies) >>> m1 = Mapping.from_triple((r1, DB_XREF, r2)) >>> m2 = Mapping.from_triple((r2, DB_XREF, r3)) >>> m3 = Mapping.from_triple((r1, DB_XREF, r3)) >>> assert keep_object_prefixes([m1, m2, m3], {"mesh"}) == [m1] """ prefixes = cleanup_prefixes(prefixes) return [ mapping for mapping in semra_tqdm(mappings, desc="Filtering object prefixes", progress=progress) if mapping.object.prefix in prefixes ]
[docs] def filter_prefixes( mappings: Iterable[Mapping], prefixes: str | Iterable[str], *, progress: bool = True ) -> list[Mapping]: """Filter out mappings whose subject or object are in the given list of prefixes.""" prefixes = cleanup_prefixes(prefixes) return [ mapping for mapping in semra_tqdm( mappings, desc=f"Filtering out {len(prefixes)} prefixes", progress=progress ) if mapping.subject.prefix not in prefixes and mapping.object.prefix not in prefixes ]
[docs] def filter_self_matches(mappings: Iterable[Mapping], *, progress: bool = True) -> list[Mapping]: """Filter out mappings within the same resource.""" return [ mapping for mapping in semra_tqdm(mappings, desc="Filtering out self-matches", progress=progress) if mapping.subject.prefix != mapping.object.prefix ]
[docs] def filter_mappings( mappings: list[Mapping], skip_mappings: list[Mapping], *, progress: bool = True ) -> list[Mapping]: """Filter out mappings in the second set from the first set.""" skip_triples = {skip_mapping.triple for skip_mapping in skip_mappings} return [ mapping for mapping in semra_tqdm(mappings, desc="Filtering mappings", progress=progress) if mapping.triple not in skip_triples ]
#: A multi-leveled nested dictionary that represents many-to-many mappings. #: The first key is subject/object pairs, the second key is either a subject identifier or object identifier, #: the last key is the opposite object or subject identifier, and the values are a list of mappings. #: #: This data structure can be used to index either forward or backwards mappings, #: as done inside :func:`get_many_to_many` M2MIndex = defaultdict[tuple[str, str], defaultdict[str, defaultdict[str, list[Mapping]]]]
[docs] def get_many_to_many(mappings: list[Mapping]) -> list[Mapping]: """Get many-to-many mappings, disregarding predicate type.""" forward: M2MIndex = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) backward: M2MIndex = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) for mapping in mappings: forward[mapping.subject.prefix, mapping.object.prefix][mapping.subject.identifier][ mapping.object.identifier ].append(mapping) backward[mapping.subject.prefix, mapping.object.prefix][mapping.object.identifier][ mapping.subject.identifier ].append(mapping) index: defaultdict[Triple, list[Evidence]] = defaultdict(list) for preindex in [forward, backward]: for d1 in preindex.values(): for d2 in d1.values(): if len(d2) > 1: # means there are multiple identifiers mapped for mapping in itt.chain.from_iterable(d2.values()): index[mapping.triple].extend(mapping.evidence) # this is effectively the same as :func:`unindex` except the deduplicate_evidence is called # explicitly rv = [ Mapping.from_triple(triple, deduplicate_evidence(triple, evidence)) for triple, evidence in index.items() ] return rv
[docs] def filter_many_to_many(mappings: list[Mapping], *, progress: bool = True) -> list[Mapping]: """Filter out many to many mappings.""" skip_mappings = get_many_to_many(mappings) return filter_mappings(mappings, skip_mappings, progress=progress)
# docstr-coverage:excused `overload` @overload def project( mappings: Iterable[Mapping], source_prefix: str, target_prefix: str, *, return_sus: typing.Literal[True] = ..., progress: bool = False, ) -> tuple[list[Mapping], list[Mapping]]: ... # docstr-coverage:excused `overload` @overload def project( mappings: Iterable[Mapping], source_prefix: str, target_prefix: str, *, return_sus: typing.Literal[False] = ..., progress: bool = False, ) -> list[Mapping]: ...
[docs] def project( mappings: Iterable[Mapping], source_prefix: str, target_prefix: str, *, return_sus: bool = False, progress: bool = False, ) -> list[Mapping] | tuple[list[Mapping], list[Mapping]]: """Ensure that each identifier only appears as the subject of one mapping.""" mappings = keep_subject_prefixes(mappings, source_prefix, progress=progress) mappings = keep_object_prefixes(mappings, target_prefix, progress=progress) mappings_list = list(mappings) m2m_mappings = get_many_to_many(mappings_list) mappings_list = filter_mappings(mappings_list, m2m_mappings, progress=progress) mappings_list = assemble_evidences(mappings_list, progress=progress) if return_sus: return mappings_list, m2m_mappings return mappings_list
[docs] def project_dict(mappings: list[Mapping], source_prefix: str, target_prefix: str) -> dict[str, str]: """Get a dictionary from source identifiers to target identifiers.""" mappings = cast(list[Mapping], project(mappings, source_prefix, target_prefix)) return {mapping.subject.identifier: mapping.object.identifier for mapping in mappings}
[docs] def assert_projection(mappings: list[Mapping]) -> None: """Raise an exception if any entities appear as the subject in multiple mappings.""" counter = Counter(m.subject for m in mappings) counter = Counter({k: v for k, v in counter.items() if v > 1}) if not counter: return raise ValueError( f"Some subjects appear in multiple mappings, therefore this is not a " f"valid projection. Showing top 5: {counter.most_common(20)}" )
[docs] def prioritize( mappings: list[Mapping], priority: list[str], *, progress: bool = True ) -> list[Mapping]: """Get a priority star graph. :param mappings: An iterable of mappings. .. warning:: This assumes that inference and inversion have already been run. This means that if there exists any exact match mapping path between ``A`` and ``B``, then there exists an edge `A, exact, B``. Further, if there exists a mapping ``A, exact, B``, there must be a ``B, exact, A``. :param priority: A priority list of prefixes, where earlier in the list means the priority is higher. :return: A list of mappings representing a "prioritization", meaning that each element only appears as subject once. This condition means that the prioritization mapping can be applied to upgrade any reference to a "canonical" reference. This algorithm works in the following way 1. Get the subset of exact matches from the input mapping list 2. Convert the exact matches to an undirected mapping graph 3. Extract connected components. .. note:: because of construction, connected components might contain just two mappings, ``A, exact, B`` and ``B, exact A``. 4. For each component 1. Get the "priority" reference using :func:`get_priority_reference` 2. Construct new mappings where all references in the component are the subject and the priority reference is the object (skip the self mapping) Here's an example usage, where inference is run ahead of prioritization. >>> from semra import DB_XREF, EXACT_MATCH, Reference >>> from semra.inference import infer_reversible, infer_chains >>> curies = "doid:0050577", "mesh:C562966", "umls:C4551571" >>> r1, r2, r3 = (Reference.from_curie(c) for c in curies) >>> m1 = Mapping.from_triple((r1, EXACT_MATCH, r2)) >>> m2 = Mapping.from_triple((r2, EXACT_MATCH, r3)) >>> m3 = Mapping.from_triple((r1, EXACT_MATCH, r3)) >>> mappings = [m1, m2, m3] >>> mappings = infer_reversible(mappings) >>> mappings = infer_chains(mappings) >>> prioritize(mappings, ["mesh", "doid", "umls"]) """ original_mappings = len(mappings) mappings = [m for m in mappings if m.predicate == EXACT_MATCH] exact_mappings = len(mappings) priority = _clean_priority_prefixes(priority) graph = to_digraph(mappings).to_undirected() rv: list[Mapping] = [] for component in tqdm( nx.connected_components(graph), unit="component", unit_scale=True, disable=not progress ): o = get_priority_reference(component, priority) if o is None: continue for s in component: if s == o: # don't add self-edges continue if not graph.has_edge(s, o): # TODO should this work even if s-o edge not exists? # can also do "inference" here, but also might be # because of negative edge filtering logger.debug( "prioritize() should only be called on fully inferred graphs, meaning " "that in a given component, it is a full clique (i.e., there are edges " "in both directions between all nodes). Component: %s, s: %s, object: %s", ", ".join(s.curie for s in component), s, o, ) continue rv.extend(_from_digraph_edge(graph, s, o)) # sort such that the mappings are ordered by object by priority order # then identifier of object, then subject prefix in alphabetical order pos = {prefix: i for i, prefix in enumerate(priority)} rv = sorted( rv, key=lambda m: ( pos[m.object.prefix], m.object.identifier, m.subject.prefix, m.subject.identifier, ), ) end_mappings = len(rv) logger.info( f"Prioritized from {original_mappings:,} original ({exact_mappings:,} exact) to {end_mappings:,}" ) return rv
def _clean_priority_prefixes(priority: list[str]) -> list[str]: return [bioregistry.normalize_prefix(prefix, strict=True) for prefix in priority]
[docs] def get_priority_reference( component: t.Iterable[Reference], priority: list[str] ) -> Reference | None: """Get the priority reference from a component. :param component: A set of references with the pre-condition that they're all "equivalent" :param priority: A priority list of prefixes, where earlier in the list means the priority is higher :returns: Returns the reference with the prefix that has the highest priority. If multiple references have the highest priority prefix, returns the first one encountered. If none have a priority prefix, return None. >>> from semra import Reference >>> curies = ["DOID:0050577", "mesh:C562966", "umls:C4551571"] >>> references = [Reference.from_curie(curie) for curie in curies] >>> get_priority_reference(references, ["mesh", "umls"]).curie 'mesh:C562966' >>> get_priority_reference(references, ["DOID", "mesh", "umls"]).curie 'doid:0050577' >>> get_priority_reference(references, ["hpo", "ordo", "symp"]) """ prefix_to_references: defaultdict[str, list[Reference]] = defaultdict(list) for reference in component: prefix_to_references[reference.prefix].append(reference) for prefix in _clean_priority_prefixes(priority): references = prefix_to_references.get(prefix, []) if not references: continue if len(references) == 1: return references[0] # TODO multiple - I guess let's just return the first logger.debug("multiple references for %s", prefix) return references[0] # nothing found in priority, don't return at all. return None
[docs] def unindex(index: Index, *, progress: bool = True) -> list[Mapping]: """Convert a mapping index into a list of mapping objects. :param index: A mapping from subject-predicate-object triples to lists of evidence objects :param progress: Should a progress bar be shown? Defaults to true. :returns: A list of mapping objects In the following example, a very simple index for a single mapping is used to reconstruct a mapping list. >>> from semra.api import get_test_reference, get_test_evidence, unindex >>> s, p, o = get_test_reference(3) >>> e1 = get_test_evidence() >>> index = {(s, p, o): [e1]} >>> assert unindex(index) == [Mapping(subject=s, predicate=p, object=o, evidence=[e1])] """ return [ Mapping.from_triple(triple, evidence=evidence) for triple, evidence in semra_tqdm( index.items(), desc="Unindexing mappings", progress=progress ) ]
[docs] def deduplicate_evidence(triple: Triple | Mapping, evidence: list[Evidence]) -> list[Evidence]: """Deduplicate a list of evidences based on their "key" function.""" d = {e.key(triple): e for e in evidence} return list(d.values())
[docs] def validate_mappings(mappings: list[Mapping], *, progress: bool = True) -> None: """Validate mappings against the Bioregistry and raise an error on the first invalid.""" import bioregistry for mapping in tqdm( mappings, desc="Validating mappings", unit_scale=True, unit="mapping", disable=not progress ): if bioregistry.normalize_prefix(mapping.subject.prefix) != mapping.subject.prefix: raise ValueError( f"invalid subject prefix.\n\nMapping: {mapping}\n\nSubject:{mapping.subject}." ) if bioregistry.normalize_prefix(mapping.object.prefix) != mapping.object.prefix: raise ValueError(f"invalid object prefix: {mapping}.") if not bioregistry.is_valid_identifier(mapping.subject.prefix, mapping.subject.identifier): raise ValueError( f"Invalid mapping subject." f"\n\nMapping:{mapping}." f"\n\nSubject: {mapping.subject}" f"\n\nUse regex {bioregistry.get_pattern(mapping.subject.prefix)}" ) if ":" in mapping.subject.identifier: raise ValueError(f"banana in mapping subject: {mapping}") if not bioregistry.is_valid_identifier(mapping.object.prefix, mapping.object.identifier): raise ValueError( f"Invalid mapping object." f"\n\nMapping:{mapping}." f"\n\nObject: {mapping.object}" f"\n\nUse regex {bioregistry.get_pattern(mapping.object.prefix)}" ) if ":" in mapping.object.identifier: raise ValueError(f"banana in mapping object: {mapping}")
[docs] def summarize_prefixes(mappings: list[Mapping]) -> pd.DataFrame: """Get a dataframe summarizing the prefixes appearing in the mappings.""" import bioregistry prefixes = set(itt.chain.from_iterable((m.object.prefix, m.subject.prefix) for m in mappings)) return pd.DataFrame( [ ( prefix, bioregistry.get_name(prefix), bioregistry.get_homepage(prefix), bioregistry.get_description(prefix), ) for prefix in sorted(prefixes) ], columns=["prefix", "name", "homepage", "description"], ).set_index("prefix")
[docs] def filter_minimum_confidence( mappings: Iterable[Mapping], cutoff: float = 0.7 ) -> Iterable[Mapping]: """Filter mappings below a given confidence.""" for mapping in mappings: try: confidence = mapping.get_confidence() except ValueError: continue if confidence >= cutoff: yield mapping
[docs] def hydrate_subsets( subset_configuration: SubsetConfiguration, *, show_progress: bool = True, ) -> SubsetConfiguration: """Convert a subset configuration dictionary into a subset artifact. :param subset_configuration: A dictionary of prefixes to sets of parent terms :param show_progress: Should progress bars be shown? :return: A dictionary that uses the is-a hierarchy within the resources to get full term lists :raises ValueError: If a prefix can't be looked up with PyOBO To get all the cells from MeSH: .. code-block:: python from semra.api import hydrate_subsets, filter_subsets configuration = {"mesh": ["mesh:D002477"], ...} prefix_to_references = hydrate_subsets(configuration) It's also possible to use parents outside the vocabulary, such as when search for entity type in UMLS: .. code-block:: python from semra import Reference from semra.api import hydrate_subsets, filter_subsets configuration = { "umls": [ # all children of https://uts.nlm.nih.gov/uts/umls/semantic-network/Pathologic%20Function Reference.from_curie("sty:T049"), # cell or molecular dysfunction Reference.from_curie("sty:T047"), # disease or syndrome Reference.from_curie("sty:T191"), # neoplastic process Reference.from_curie("sty:T050"), # experimental model of disease Reference.from_curie("sty:T048"), # mental or behavioral dysfunction ], ... } prefix_to_references = hydrate_subsets(configuration) """ import pyobo rv: dict[str, set[Reference]] = {} # do lookup of the hierarchy and lookup of ancestors in 2 steps to allow for # querying parents inside a resource that aren't defined by it (e.g., sty terms in umls) for prefix, parents in subset_configuration.items(): try: hierarchy = pyobo.get_hierarchy( prefix, include_part_of=False, include_has_member=False, use_tqdm=show_progress ) except RuntimeError: # e.g., no build rv[prefix] = set() except Exception as e: raise ValueError(f"Failed on {prefix}") from e else: rv[prefix] = { descendant for parent in parents for descendant in nx.ancestors(hierarchy, parent) or [] if descendant.prefix == prefix } for parent in parents: if parent.prefix == prefix: rv[prefix].add(parent) return {k: sorted(v) for k, v in rv.items()}
[docs] def filter_subsets( mappings: t.Iterable[Mapping], prefix_to_references: SubsetConfiguration ) -> list[Mapping]: """Filter mappings that don't appear in the given subsets. :param mappings: An iterable of semantic mappings :param prefix_to_references: A dictionary whose keys are prefixes and whose values are collections of references for a subset of terms in the resource to keep. In situations where a mapping's subject or object's prefix does not appear in this dictionary, the check is skipped. :return: A list that has been filtered based on the prefix_to_identifiers dict :raises ValueError: If CURIEs are given instead of identifiers If you have a simple configuration dictionary that contains the parent terms, like ``{"mesh": [Reference.from_curie("mesh:D002477")]}``, you'll want to do the following first: .. code-block:: python from semra import Reference from semra.api import hydrate_subsets, filter_subsets mappings = [...] configuration = {"mesh": [Reference.from_curie("mesh:D002477")]} prefix_to_identifiers = hydrate_subsets(configuration) filter_subsets(mappings, prefix_to_identifiers) """ clean_prefix_to_identifiers = _clean_subset_configuration(prefix_to_references) rv = [] for mapping in mappings: if ( mapping.subject.prefix in clean_prefix_to_identifiers and mapping.subject not in clean_prefix_to_identifiers[mapping.subject.prefix] ): continue if ( mapping.object.prefix in clean_prefix_to_identifiers and mapping.object not in clean_prefix_to_identifiers[mapping.object.prefix] ): continue rv.append(mapping) return rv
def _clean_subset_configuration( prefix_to_references: SubsetConfiguration, ) -> dict[str, set[Reference]]: clean_prefix_to_identifiers = {} for prefix, references in prefix_to_references.items(): if not references: # skip empty lists continue norm_prefix = bioregistry.normalize_prefix(prefix, strict=True) clean_prefix_to_identifiers[norm_prefix] = set(references) return clean_prefix_to_identifiers def aggregate_components( mappings: t.Iterable[Mapping], prefix_allowlist: str | t.Collection[str] | None = None, ) -> t.Mapping[frozenset[str], set[frozenset[Reference]]]: """Get a counter where the keys are the set of all prefixes in a weakly connected component. :param mappings: Mappings to aggregate :param prefix_allowlist: An optional prefix filter - only keeps prefixes in this list :returns: A dictionary mapping from a frozenset of prefixes to a set of frozensets of references """ dd: defaultdict[frozenset[str], set[frozenset[Reference]]] = defaultdict(set) components = iter_components(mappings) if prefix_allowlist is not None: prefix_set = cleanup_prefixes(prefix_allowlist) for component in components: # subset to the priority prefixes subcomponent: frozenset[Reference] = frozenset( r for r in component if r.prefix in prefix_set ) key = frozenset(r.prefix for r in subcomponent) dd[key].add(subcomponent) else: for component in components: subcomponent = frozenset(component) key = frozenset(r.prefix for r in subcomponent) dd[key].add(subcomponent) return dict(dd)
[docs] def count_component_sizes( mappings: t.Iterable[Mapping], prefix_allowlist: str | t.Collection[str] | None = None ) -> t.Counter[frozenset[str]]: """Get a counter where the keys are the set of all prefixes in a weakly connected component.""" xx = aggregate_components(mappings, prefix_allowlist) return Counter({k: len(v) for k, v in xx.items()})
def count_coverage_sizes( mappings: t.Iterable[Mapping], prefix_allowlist: str | t.Collection[str] | None = None ) -> t.Counter[int]: """Get a counter of the number of prefixes in which each entity appears based on the mappings.""" xx = count_component_sizes(mappings, prefix_allowlist=prefix_allowlist) counter: t.Counter[int] = Counter() for prefixes, count in xx.items(): counter[len(prefixes)] += count # Back-fill any intermediate counts with zero max_key = max(counter) for i in range(1, max_key): if i not in counter: counter[i] = 0 return counter
[docs] def update_literal_mappings( literal_mappings: list[LiteralMapping], mappings: list[Mapping] ) -> list[LiteralMapping]: """Use a priority mapping to re-write terms with priority groundings. :param literal_mappings: A list of literal mappings :param mappings: A list of SeMRA mapping objects, constituting a priority mapping. This means that each mapping has a unique subject. :return: A new list of literal mappings that have been remapped .. code-block:: python from itertools import chain from pyobo import get_literal_mappings from ssslm.ner import make_grounder from semra import Configuration, Input from semra.api import update_literal_mappings prefixes = ["doid", "mondo", "efo"] # 1. Get terms literal_mappings = chain.from_iterable(get_literal_mappings(p) for p in prefixes) # 2. Get mappings configuration = Configuration.from_prefixes(name="Diseases", prefixes=prefixes) mappings = configuration.get_mappings() # 3. Update terms and use them (i.e., to construct a grounder) new_literal_mappings = update_literal_mappings(literal_mappings, mappings) grounder = make_grounder(new_literal_mappings) """ assert_projection(mappings) return ssslm.remap_literal_mappings( literal_mappings=literal_mappings, mappings=[(mapping.subject, mapping.object) for mapping in mappings], )
def _prioritization_to_curie_dict(mappings: Iterable[Mapping]) -> dict[str, str]: rv = {mapping.subject.curie: mapping.object.curie for mapping in mappings} return rv
[docs] def prioritize_df( mappings: list[Mapping], df: pd.DataFrame, *, column: str, target_column: str | None = None ) -> None: """Remap a column of a dataframe based on priority mappings.""" assert_projection(mappings) curie_remapping = _prioritization_to_curie_dict(mappings) if target_column is None: target_column = f"{column}_prioritized" def _map_curie(curie: str) -> str: norm_curie = bioregistry.normalize_curie(curie) if norm_curie is None: return curie return curie_remapping.get(norm_curie, norm_curie) df[target_column] = df[column].map(_map_curie)
#: An index from (source prefix, target prefix) to identifiers #: in the source vocabulary that have been mappped to the target #: vocabulary IdentifierIndex: TypeAlias = dict[tuple[str, str], set[str]]
[docs] def get_identifier_index( mappings: t.Iterable[Mapping], *, show_progress: bool = True, predicates: Collection[Reference] | None = None, directed: bool = False, ) -> IdentifierIndex: """Index which entities in each vocabulary have been mapped. :param mappings: An iterable of mappings to be indexed :param predicates: If given, filter to mappings with these predicates :return: A directed index For example, if we have the triples ``P1:1 skos:exactMatch P2:A`` and ``P1:1 skos:exactMatch P3:X``, we would have the following index: .. code-block:: python { ("P1", "P2"): {"1"}, ("P2", "P1"): {"A"}, ("P1", "P3"): {"1"}, ("P3", "P1"): {"X"}, } """ triples: Iterable[Triple] = iter(get_index(mappings, progress=show_progress, leave=False)) index: defaultdict[tuple[str, str], set[str]] = defaultdict(set) if predicates is not None: target_predicates_ = set(predicates) triples = (triple for triple in triples if triple.predicate in target_predicates_) for triple in triples: index[triple.subject.prefix, triple.object.prefix].add(triple.subject.identifier) if not directed: index[triple.object.prefix, triple.subject.prefix].add(triple.object.identifier) return dict(index)
#: A dictionary from prefixes appearing in subjects/objects #: of mappings to the set local unique identifiers appearing #: in mappings PrefixIdentifierDict: TypeAlias = t.Mapping[str, Collection[str]]
[docs] def get_observed_terms(mappings: t.Iterable[Mapping]) -> PrefixIdentifierDict: """Get the set of terms appearing in each prefix. :param mappings: An iterable of mappings :return: A dictionary from prefixes appearing in subjects/objects of mappings to the set local unique identifiers appearing in mappings >>> m1 = Mapping( ... subject=Reference.from_curie("chebi:10084"), ... predicate=EXACT_MATCH, ... object=Reference.from_curie("mesh:C453820"), ... ) >>> m2 = Mapping( ... subject=Reference.from_curie("chebi:10100"), ... predicate=EXACT_MATCH, ... object=Reference.from_curie("mesh:C062735"), ... ) >>> {k: sorted(v) for k, v in get_observed_terms([m1, m2]).items()} {'chebi': ['10084', '10100'], 'mesh': ['C062735', 'C453820']} """ entities: defaultdict[str, set[str]] = defaultdict(set) for mapping in tqdm(mappings, unit_scale=True, unit="mapping", desc="Indexing observed terms"): for reference in (mapping.subject, mapping.object): entities[reference.prefix].add(reference.identifier) return dict(entities)
[docs] def get_terms( prefixes: list[str], subset_configuration: SubsetConfiguration | None = None, *, show_progress: bool = True, ) -> PrefixIdentifierDict: """Get the set of identifiers for each of the resources.""" import pyobo prefix_to_identifiers: dict[str, set[str]] = {} if subset_configuration is None: hydrated_subset_configuration: SubsetConfiguration = {} else: hydrated_subset_configuration = hydrate_subsets( subset_configuration, show_progress=show_progress ) for prefix in tqdm(prefixes, desc="Getting terms"): tqdm.write(f"[{prefix}] getting terms") identifiers = pyobo.get_ids(prefix, use_tqdm=show_progress) subset: set[Reference] = set(hydrated_subset_configuration.get(prefix) or []) if subset: prefix_to_identifiers[prefix] = { identifier for identifier in identifiers if _keep_in_subset(prefix=prefix, identifier=identifier, subset=subset) } elif not identifiers: tqdm.write(f"[{prefix}] PyOBO did not return any IDs") else: prefix_to_identifiers[prefix] = identifiers return prefix_to_identifiers
def _keep_in_subset(prefix: str, identifier: str, subset: set[Reference]) -> bool: # check if the identifier is a "default" reference if identifier.startswith(f"{prefix}#"): return False return Reference(prefix=prefix, identifier=identifier) in subset #: A counter from pairs of prefixes to the maximum number #: of observed terms of one or the other. Note that this #: estimate is only an upper bound. PrefixPairCounter: TypeAlias = t.Counter[tuple[str, str]] class TermCount(NamedTuple): """A count that's annotated as being exact or not.""" exact: bool count: int # type:ignore def _count_terms( prefix: str, prefix_to_identifier_exact: PrefixIdentifierDict, prefix_to_identifier_observed: PrefixIdentifierDict, ) -> TermCount: if prefix in prefix_to_identifier_exact: count = len(prefix_to_identifier_exact[prefix]) # there is a situation where there might be a zero- # returned here because of impedance between pyobo # and bioregistry return TermCount(bool(count) > 0, count) elif prefix in prefix_to_identifier_observed: return TermCount(False, len(prefix_to_identifier_observed[prefix])) else: # TODO this might need to be a raise exception, since something is wrong msg = ( f"The prefix {prefix} was neither indexed in the exact term list nor" f"the observed term list.\n\n\texact: {sorted(prefix_to_identifier_exact)}" f"\n\n\tobserved: {sorted(prefix_to_identifier_observed)}" ) tqdm.write(msg) # raise KeyError(msg) return TermCount(False, 0)
[docs] def get_symmetric_counter( index: IdentifierIndex, priority: list[str], *, terms_exact: PrefixIdentifierDict, terms_observed: PrefixIdentifierDict, include_diag: bool = True, ) -> PrefixPairCounter: """Create a symmetric mapping counts counter from a directed index.""" counter: PrefixPairCounter = Counter() for left_prefix, right_prefix in index: left_observed_terms = index[left_prefix, right_prefix] left_all_terms: t.Collection[str] = terms_exact.get(left_prefix, []) if left_all_terms: left_observed_terms.intersection_update(left_all_terms) right_observed_terms = index[right_prefix, left_prefix] right_all_terms: t.Collection[str] = terms_exact.get(right_prefix, []) if right_all_terms: right_observed_terms.intersection_update(right_all_terms) if include_diag: counter[left_prefix, right_prefix] = max( len(left_observed_terms), len(right_observed_terms) ) for prefix in priority: counter[prefix, prefix] = _count_terms(prefix, terms_exact, terms_observed).count return counter
[docs] def get_asymmetric_counter( index: IdentifierIndex, priority: list[str], *, terms_exact: PrefixIdentifierDict, terms_observed: PrefixIdentifierDict, ) -> PrefixPairCounter: """Create a symmetric mapping counts counter from a directed index.""" return Counter( { (left_prefix, right_prefix): len(identifiers) for (left_prefix, right_prefix), identifiers in index.items() } )
[docs] class Mutation(BaseModel): """Represents a mutation operation on a mapping set.""" source: str = Field(..., description="The source type") target: str | list[str] | None = Field(None, description="limit mutation to these") confidence: float = 1.0 old: Reference = Field(default=DB_XREF) new: Reference = Field(default=EXACT_MATCH)
[docs] def should_apply_to(self, mapping: Mapping) -> bool: """Check if the mutation should be applied.""" if mapping.subject.prefix != self.source: return False if mapping.predicate != self.old: return False if self.target is None: return True elif isinstance(self.target, str): return self.target == mapping.object.prefix elif isinstance(self.target, list): return any(t == mapping.object.prefix for t in self.target) raise NotImplementedError
#: A data structure for fast access to mutations. MutationIndex: TypeAlias = dict[str, Mutation]
[docs] def apply_mutations( mappings: Iterable[Mapping], mutations: Iterable[Mutation], *, progress: bool = True ) -> Iterable[Mapping]: """Apply mutations.""" mutation_index = _index_mutations(mutations) for mapping in tqdm( mappings, disable=not progress, desc="Applying mutations", unit_scale=True, unit="mapping" ): yield _handle_mutation(mapping, mutation_index)
def _index_mutations(mutations: Iterable[Mutation]) -> MutationIndex: mutation_index = {} for mutation in mutations: if mutation.source in mutation_index: raise KeyError(f"got multiple configured mutations for source: {mutation.source}") mutation_index[mutation.source] = mutation return mutation_index def _handle_mutation(mapping: Mapping, mutation_index: MutationIndex) -> Mapping: mutation = mutation_index.get(mapping.subject.prefix) if not mutation: return mapping elif not mutation.should_apply_to(mapping): return mapping else: return Mapping( subject=mapping.subject, predicate=mutation.new, object=mapping.object, evidence=[ ReasonedEvidence( justification=KNOWLEDGE_MAPPING, mappings=[mapping], confidence_factor=mutation.confidence, ) ], )