Source code for kgw.biomedicine._monarchkg

import csv
import os
import sqlite3

import luigi
import orjson

from .. import _shared
from .._shared.extract import is_informative_value


class MetadataFetcher:
    _cached_values = {}

    @classmethod
    def get_versions(cls):
        # Memorize
        key = "versions"
        if key not in cls._cached_values:
            cls._cached_values[key] = _shared.extract.get_versions_from_monarch()

        # Lookup
        versions = cls._cached_values[key]
        return versions

    @classmethod
    def get_metadata(cls, version):
        # Check
        versions = cls.get_versions()
        if version not in versions:
            raise ValueError(
                f'Version "{version}" is not valid.\nAvailable options: {versions}'
            )

        # Memorize
        key = f"metadata_{version}"
        if key not in cls._cached_values:
            cls._cached_values[key] = _shared.extract.get_metadata_from_monarch(version)

        # Lookup
        metadata = cls._cached_values[key]
        return metadata


class FetchKnowledgeGraphFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "downloads")
        self.metadata = MetadataFetcher.get_metadata(self.version)

    def requires(self):
        filename = "monarch-kg.tar.gz"
        url = self.metadata[filename]["url"]
        return _shared.tasks.DownloadFile(self.subdirpath, filename, url)

    def output(self):
        return self.input()


class DecompressKnowledgeGraphFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "downloads")

    def requires(self):
        return FetchKnowledgeGraphFile(self.dirpath, self.version)

    def output(self):
        return {
            "nodes_file": luigi.LocalTarget(
                os.path.join(self.subdirpath, "monarch-kg_nodes.tsv")
            ),
            "edges_file": luigi.LocalTarget(
                os.path.join(self.subdirpath, "monarch-kg_edges.tsv")
            ),
        }

    def run(self):
        _shared.extract.extract_tar_gz(self.input().path)


class CreateSqliteFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()
    batch_size = luigi.IntParameter(default=10_000, significant=False)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "results")
        self.filepath = os.path.join(self.subdirpath, "kg.sqlite")

    def requires(self):
        tasks = {
            "subdirpath": _shared.tasks.CreateDirectory(self.subdirpath),
            "csv_files": DecompressKnowledgeGraphFile(self.dirpath, self.version),
        }
        return tasks

    def output(self):
        return luigi.LocalTarget(self.filepath)

    def run(self):
        with _shared.tasks.temp_output_path(self.filepath) as sqlite_filepath:
            with sqlite3.connect(sqlite_filepath) as conn:
                cursor = conn.cursor()
                _shared.transform.create_sql_schema(cursor)
                self._insert_nodes(conn, cursor)
                self._insert_edges(conn, cursor)
                conn.commit()
        self._remove_decompressed_input_files()

    def _insert_nodes(self, conn, cursor):
        nodes_filepath = self.input()["csv_files"]["nodes_file"].path
        sql_cmd = """
            INSERT INTO nodes (id, type, properties)
            VALUES (?, ?, ?)
        """
        skipped_keys = ["id", "category"]
        with open(nodes_filepath, "r", newline="") as f:
            reader = csv.reader(f, delimiter="\t")
            columns = next(reader)
            id_idx = columns.index("id")
            type_idx = columns.index("category")
            properties_name_idx = [
                (col, columns.index(col)) for col in columns if col not in skipped_keys
            ]
            batch = []
            for row in reader:
                node_id = row[id_idx]
                node_type = row[type_idx]
                node_properties = {
                    name: row[idx]
                    for name, idx in properties_name_idx
                    if is_informative_value(row[idx])
                }
                node_properties_str = orjson.dumps(node_properties).decode("utf-8")
                node = (node_id, node_type, node_properties_str)
                batch.append(node)
                if len(batch) >= self.batch_size:
                    cursor.executemany(sql_cmd, batch)
                    batch = []
            if batch:
                cursor.executemany(sql_cmd, batch)
        conn.commit()

    def _insert_edges(self, conn, cursor):
        edges_filepath = self.input()["csv_files"]["edges_file"].path
        sql_cmd = """
            INSERT INTO edges (source_id, target_id, type, properties)
            VALUES (?, ?, ?, ?)
        """
        skipped_keys = ["subject", "object", "predicate"]
        with open(edges_filepath, "r", newline="") as f:
            reader = csv.reader(f, delimiter="\t")
            columns = next(reader)
            source_id_idx = columns.index("subject")
            target_id_idx = columns.index("object")
            type_idx = columns.index("predicate")
            properties_name_idx = [
                (col, columns.index(col)) for col in columns if col not in skipped_keys
            ]
            batch = []
            for row in reader:
                source_id = row[source_id_idx]
                target_id = row[target_id_idx]
                edge_type = row[type_idx]
                edge_properties = {
                    name: row[idx]
                    for name, idx in properties_name_idx
                    if is_informative_value(row[idx])
                }
                edge_properties_str = orjson.dumps(edge_properties).decode("utf-8")
                edge = (source_id, target_id, edge_type, edge_properties_str)
                batch.append(edge)
                if len(batch) >= self.batch_size:
                    cursor.executemany(sql_cmd, batch)
                    batch = []
            if batch:
                cursor.executemany(sql_cmd, batch)
        conn.commit()

    def _remove_decompressed_input_files(self):
        nodes_filepath = self.input()["csv_files"]["nodes_file"].path
        edges_filepath = self.input()["csv_files"]["edges_file"].path
        for filepath in (nodes_filepath, edges_filepath):
            try:
                os.remove(filepath)
            except Exception:
                pass


[docs] class MonarchKg(_shared.base.Project): """Monarch Knowledge Graph (MonarchKG). References ---------- - Publication: https://doi.org/10.1093/nar/gkad1082 - Website: https://monarchinitiative.org - Code: https://github.com/monarch-initiative/monarch-ingest - Data: https://data.monarchinitiative.org/monarch-kg/index.html """ _label = "monarchkg" _MetadataFetcher = MetadataFetcher _CreateSqliteFile = CreateSqliteFile _node_type_to_color = { "biolink:Drug": "green", "biolink:ChemicalEntity": "green", "biolink:MolecularEntity": "green", "biolink:SmallMolecule": "green", "biolink:Gene": "blue", "biolink:Protein": "blue", "biolink:Disease": "red", "biolink:Pathway": "red", "biolink:BiologicalProcessOrActivity": "red", }