Source code for kgw.biomedicine._hald

import os
import sqlite3

import luigi
import orjson

from .. import _shared
from .._shared.extract import is_informative_value


class MetadataFetcher:
    dataset_id = 22828196
    _cached_values = {}

    @classmethod
    def get_versions(cls):
        # Memorize
        key = "versions"
        if key not in cls._cached_values:
            cls._cached_values[key] = _shared.extract.get_versions_from_figshare(
                cls.dataset_id
            )

        # Lookup
        versions = cls._cached_values[key]
        return versions

    @classmethod
    def get_metadata(cls, version):
        # Check
        versions = cls.get_versions()
        if version not in versions:
            raise ValueError(
                f'Version "{version}" is not valid.\nAvailable options: {versions}'
            )

        # Memorize
        key = f"metadata_{version}"
        if key not in cls._cached_values:
            cls._cached_values[key] = _shared.extract.get_metadata_from_figshare(
                cls.dataset_id, version
            )

        # Lookup
        metadata = cls._cached_values[key]
        return metadata


class CreateSqliteFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()
    batch_size = luigi.IntParameter(default=10_000, significant=False)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "results")
        self.filepath = os.path.join(self.subdirpath, "kg.sqlite")

    def requires(self):
        tasks = {
            "subdirpath": _shared.tasks.CreateDirectory(self.subdirpath),
            "nodes_file": FetchNodesFile(self.dirpath, self.version),
            "edges_file": FetchEdgesFile(self.dirpath, self.version),
        }
        return tasks

    def output(self):
        return luigi.LocalTarget(self.filepath)

    def run(self):
        with _shared.tasks.temp_output_path(self.filepath) as sqlite_filepath:
            with sqlite3.connect(sqlite_filepath) as conn:
                cursor = conn.cursor()
                _shared.transform.create_sql_schema(cursor)
                self._insert_nodes(conn, cursor)
                self._insert_edges(conn, cursor)
                conn.commit()

    def _insert_nodes(self, conn, cursor):
        # Special aspects in handling the raw data
        # 1) The dict keys in the raw data can be ignored, because source ids, edge types,
        #    and target ids are encoded in a cleaner way in the dict values.
        nodes_filepath = self.input()["nodes_file"].path
        data_nodes = _shared.transform.read_json_file(nodes_filepath)
        sql_cmd = """
            INSERT INTO nodes (id, type, properties)
            VALUES (?, ?, ?)
        """
        skipped_keys = ["entity", "type"]
        batch = []
        for entry in data_nodes.values():
            entry = entry[0]
            node_id = entry["entity"]
            node_type = entry["type"]
            node_properties = {
                key: val
                for key, val in entry.items()
                if key not in skipped_keys and is_informative_value(val)
            }
            node_properties_str = orjson.dumps(node_properties).decode("utf-8")
            node = (node_id, node_type, node_properties_str)
            batch.append(node)
            if len(batch) >= self.batch_size:
                cursor.executemany(sql_cmd, batch)
                batch = []
        if batch:
            cursor.executemany(sql_cmd, batch)
        conn.commit()

    def _insert_edges(self, conn, cursor):
        # Special aspects in handling the raw data
        # 1) The dict keys in the raw data can be ignored, because source ids, edge types,
        #    and target ids are encoded in a cleaner way in the dict values.
        # 2) Some dict values are mostly redundant and are therefore skipped,
        #    e.g. source and source type are part of the node data.
        edges_filepath = self.input()["edges_file"].path
        data_edges = _shared.transform.read_json_file(edges_filepath)
        sql_cmd = """
            INSERT INTO edges (source_id, target_id, type, properties)
            VALUES (?, ?, ?, ?)
        """
        skipped_keys = [
            "source entity",
            "target entity",
            "relationship",
            "source",
            "target",
            "source type",
            "target type",
        ]
        batch = []
        for entry in data_edges.values():
            source_id = entry["source entity"]
            target_id = entry["target entity"]
            edge_type = entry["relationship"]
            edge_properties = {
                key: val
                for key, val in entry.items()
                if key not in skipped_keys and is_informative_value(val)
            }
            edge_properties_str = orjson.dumps(edge_properties).decode("utf-8")
            edge = (source_id, target_id, edge_type, edge_properties_str)
            batch.append(edge)
            if len(batch) >= self.batch_size:
                cursor.executemany(sql_cmd, batch)
                batch = []
        if batch:
            cursor.executemany(sql_cmd, batch)
        conn.commit()


class FetchNodesFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "downloads")
        self.metadata = MetadataFetcher.get_metadata(self.version)

    def requires(self):
        filename = "Entity_Info.json"
        url = self.metadata[filename]["url"]
        md5 = self.metadata[filename]["md5"]
        return _shared.tasks.DownloadFile(self.subdirpath, filename, url, md5=md5)

    def output(self):
        return self.input()


class FetchEdgesFile(_shared.tasks.ReportingTask):
    dirpath = luigi.Parameter()
    version = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.subdirpath = os.path.join(self.dirpath, "downloads")
        self.metadata = MetadataFetcher.get_metadata(self.version)

    def requires(self):
        filename = "Relation_Info.json"
        url = self.metadata[filename]["url"]
        md5 = self.metadata[filename]["md5"]
        return _shared.tasks.DownloadFile(self.subdirpath, filename, url, md5=md5)

    def output(self):
        return self.input()


[docs] class Hald(_shared.base.Project): """Human Aging and Longevity Dataset (HALD). References ---------- - Publication: https://doi.org/10.1038/s41597-023-02781-0 - Website: https://bis.zju.edu.cn/hald - Code: https://github.com/zexuwu/hald - Data: https://doi.org/10.6084/m9.figshare.22828196 """ _label = "hald" _MetadataFetcher = MetadataFetcher _CreateSqliteFile = CreateSqliteFile _node_type_to_color = { "Pharmaceutical Preparations": "green", "Toxin": "green", "Gene": "blue", "Peptide": "blue", "Protein": "blue", "RNA": "blue", "Disease": "red", }
[docs] def to_schema(self): tasks = [ _shared.tasks.CreateCompactSchemaFile( self.dirpath, self.version, self._node_type_to_color, self._CreateSqliteFile, ), ] self._tasks.extend(tasks)