Source code for kgw._shared.base

import logging
import multiprocessing
import os
import sys
import warnings

import luigi

from . import tasks


class Project:
    def __init__(self, version, workdir):
        """Initialize a project instance so that tasks can be defined on it.

        Parameters
        ----------
        version : `str`
            Version of the dataset that will be downloaded and processed.
            The method :meth:`get_versions` returns all currently available
            versions.
        workdir : `str`
            Path of the working directory in which a unique subdirectory will
            be created to hold all downloaded and generated files for
            this project and version.

        Raises
        ------
        ValueError
            Raised if `version` is invalid or unavailable.
        TypeError
            Raised if `workdir` is not a string.

        Notes
        -----
        This class does not automatically download or process any data.
        Such tasks first need to be specified by calling the relevant methods
        on the project object and then passing it to the function :func:`~kgw.run`
        that builds and executes a corresponding workflow.

        """
        # Argument processing
        version = str(version)
        _available_versions = self.get_versions()
        if version.lower() == "latest":
            version = _available_versions[-1]
        if version not in _available_versions:
            msg = f'Version "{version}" is not valid.\nAvailable options: {_available_versions}'
            raise ValueError(msg)

        if not isinstance(workdir, str):
            msg = 'Argument "workdir" needs to be a string.'
            raise TypeError(msg)

        # Set instance variables
        self.version = version
        self.dirpath = os.path.join(workdir, f"{self._label}_v{version}")
        self._tasks = []

    @classmethod
    def get_versions(cls):
        """Fetch all currently available versions from the data repository of the project."""
        return cls._MetadataFetcher.get_versions()

    def to_sqlite(self):
        """Convert the knowledge graph to a file-based SQLite database.

        Generates the output file `kg.sqlite`. This database contains
        a unified representation for each knowledge graph, with the
        same schema being used for each projects. From this intermediate
        format it is possible to generate all other files, using just
        one method per output format rather than writing a custom
        converter for each project.

        References
        ----------
        - `SQLite <https://www.sqlite.org>`__
        - `Wikipedia: SQLite <https://en.wikipedia.org/wiki/SQLite>`__

        """
        tasks = [self._CreateSqliteFile(self.dirpath, self.version)]
        self._tasks.extend(tasks)

    def to_statistics(self):
        """Determine some statistical properties of the knowledge graph.

        Generates the output file `statistics.json`. This is a JSON
        file that contains data about basic statistics of the elements
        in the knowledge graph, such as node, edge and type counts.

        """
        required_tasks = [
            tasks.CreateStatisticsFile(
                self.dirpath, self.version, self._CreateSqliteFile
            ),
        ]
        self._tasks.extend(required_tasks)

    def to_schema(self):
        """Determine the schema of the knowledge graph.

        Generates the output file `schema.html`. This is a standalone
        HTML file with an interactive graph visualization of all
        entity types in the knowledge graph and the relationship types
        by which they are connected.

        References
        ----------
        - `Neo4j: Graph modeling guidelines
          <https://neo4j.com/docs/getting-started/data-modeling/guide-data-modeling/>`__
        - `Memgraph: Graph modeling
          <https://memgraph.com/docs/fundamentals/graph-modeling>`__

        """
        required_tasks = [
            tasks.CreateSchemaFile(
                self.dirpath,
                self.version,
                self._node_type_to_color,
                self._CreateSqliteFile,
            ),
        ]
        self._tasks.extend(required_tasks)

    def to_sql(self):
        """Convert the knowledge graph to a SQL text file.

        Generates the output file `kg.sql`. This is a text file with
        SQL commands that can be used to import the structure and content
        of the knowledge graph into relational database systems such as
        MySQL or PostgreSQL.

        References
        ----------
        - `StackOverflow: How to import an SQL file using the command line in MySQL?
          <https://stackoverflow.com/questions/17666249/how-to-import-an-sql-file-using-the-command-line-in-mysql>`__
        - `StackOverflow: Import SQL dump into PostgreSQL database
          <https://stackoverflow.com/questions/6842393/import-sql-dump-into-postgresql-database>`__

        """
        required_tasks = [
            tasks.CreateSqlFile(self.dirpath, self.version, self._CreateSqliteFile),
        ]
        self._tasks.extend(required_tasks)

    def to_csv(self):
        """Convert the knowledge graph to two CSV text files.

        Generates the output files `kg_nodes.csv` and `kg_edges.csv`.

        References
        ----------
        - `Wikipedia: CSV <https://en.wikipedia.org/wiki/Comma-separated_values>`__

        """
        required_tasks = [
            tasks.CreateCsvNodesFile(
                self.dirpath, self.version, self._CreateSqliteFile
            ),
            tasks.CreateCsvEdgesFile(
                self.dirpath, self.version, self._CreateSqliteFile
            ),
        ]
        self._tasks.extend(required_tasks)

    def to_jsonl(self):
        """Convert the knowledge graph to two JSON Lines text files.

        Generates the output files `kg_nodes.jsonl` and `kg_edges.jsonl`.

        References
        ----------
        - `JSONL <https://jsonlines.org>`__

        """
        required_tasks = [
            tasks.CreateJsonlNodesFile(
                self.dirpath, self.version, self._CreateSqliteFile
            ),
            tasks.CreateJsonlEdgesFile(
                self.dirpath, self.version, self._CreateSqliteFile
            ),
        ]
        self._tasks.extend(required_tasks)

    def to_metta(self, representation="spo"):
        """Convert the knowledge graph to a MeTTa text file.

        Generates the output file `kg_spo.metta`, `kg_properties_aggregated.metta` or
        `kg_properties_expanded.metta`, depending on the chosen representation.

        Caution: These representations are still subject to experimentation
        and testing. They might change in future versions of this package.

        Parameters
        ----------
        representation : str
            The format used to represent the knowledge graph in the MeTTa language.

            Available options:

            - `"spo"`: Semantic triples of the form `("subject", "predicate", "object")`.
              If properties are present in the original knowledge graph, they are
              ignored in this representation.
            - `"properties_aggregated"`: Properties (=key-value pairs) are represented
              by putting each key on a separate line, but each value is ensured to be a
              single number or string. This means values that hold a compound data type
              like a list or dict are aggregated into one string in JSON string format.
              Text identifiers of nodes are reused to create the association with their
              properties, while text identifiers of the form "e{cnt}" are introduced for
              edges to serve the same purpose.
            - `"properties_expanded"`: Properties (=key-value pairs) are represented
              by fully expanding their keys and values onto as many lines as required.
              Numerical identifiers for nodes and edges are introduced to create the
              association between these elements and their properties.

        References
        ----------
        - `MeTTa language <https://metta-lang.dev>`__
        - `OpenCog Hyperon framework <https://hyperon.opencog.org>`__

        """
        required_tasks = [
            tasks.CreateMettaFile(
                self.dirpath, self.version, representation, self._CreateSqliteFile
            ),
        ]
        self._tasks.extend(required_tasks)

    def to_graphml(self):
        """Convert the knowledge graph to a GraphML text file.

        Generates the output file `kg.graphml`.

        References
        ----------
        - `GraphML <http://graphml.graphdrawing.org>`__
        - `Wikipedia: GraphML <https://en.wikipedia.org/wiki/GraphML>`__

        """
        required_tasks = [
            tasks.CreateGraphMlFile(self.dirpath, self.version, self._CreateSqliteFile),
        ]
        self._tasks.extend(required_tasks)


def set_logging_behavior(verbose):
    # Turn off luigi's logger
    for name in ("luigi", "luigi-interface"):
        logger = logging.getLogger(name)
        if logger.handlers:
            logger.handlers.clear()
        logger.disabled = True
        warnings.filterwarnings("ignore", module=name)

    # Set up logger for this package
    logger = logging.getLogger(__name__)
    if logger.handlers:
        logger.handlers.clear()
    if verbose:
        logger.disabled = False
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s  %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
            handlers=[logging.StreamHandler(sys.stdout)],
            force=True,
        )
    else:
        logger.disabled = True


[docs] def run(workflow, num_workers=None, verbose=True): """Execute all tasks in the provided workflow according to their dependencies. This function uses the package Luigi [1]_ to build a dependency graph of all tasks defined in the workflow and execute them in parallel with multiple worker processes. Parameters ---------- workflow : `~kgw._shared.base.Project`, or list of `~kgw._shared.base.Project` Specification of a workflow in form of a single or multiple project objects. A project object provides several methods that can be called in order to add specific tasks. For example, calling the method `to_csv()` will store a task in the object that represents the conversion of the project's knowledge graph into CSV format. The workflow engine automatically detects these tasks, inspects their dependencies, and schedules all necessary steps in the correct order. num_workers : `int`, optional, default=4*cpu_count The number of worker processes to run tasks in parallel. If not specified, it defaults to 4 times the number of CPU cores available on the machine. verbose : `bool`, optional, default=True If `True`, a log of tasks and a summary of results is written to stdout. If `False`, no text is printed. Returns ------- success : `bool` Returns `True` if all tasks were successfully scheduled and executed, otherwise `False`. A failed run can be resumed from an intermediate state without re-running previously completed tasks. Raises ------ TypeError Raised if `workflow` is not a project object or list of such objects, or if `num_workers` is not an integer, or if `verbose` is not a boolean. ValueError Raised if `workflow` is an empty list. References ---------- .. [1] Luigi Documentation, https://luigi.readthedocs.io """ # Argument processing if isinstance(workflow, Project): workflows = [workflow] elif isinstance(workflow, (list, tuple, set)) and all( isinstance(w, Project) for w in workflow ): workflows = list(workflow) else: msg = 'Argument "workflow" needs to be a project or a list of projects.' raise TypeError(msg) if len(workflows) == 0: msg = "Got an empty list of workflows." raise ValueError(msg) if num_workers is None: num_workers = multiprocessing.cpu_count() * 4 elif not isinstance(num_workers, int): msg = 'Argument "num_workers" needs to be an integer or None.' raise TypeError(msg) if not isinstance(verbose, bool): msg = 'Argument "verbose" needs to be True or False.' raise TypeError(msg) # Set logging behavior set_logging_behavior(verbose) # Collect tasks tasks = [] for workflow in workflows: tasks.extend(workflow._tasks) # Run tasks if verbose: msg = "Log of performed tasks" print(msg) print("=" * len(msg)) print() result = luigi.build( tasks, local_scheduler=True, detailed_summary=True, workers=num_workers, ) # Report results def crop_results(text): prefix = "\n===== Luigi Execution Summary =====\n\n" postfix = "\n\n===== Luigi Execution Summary =====\n" if text.startswith(prefix): text = text[len(prefix) :] if text.endswith(postfix): text = text[: -len(postfix)] return text if verbose: print() print() msg = "Summary of workflow results" print(msg) print("=" * len(msg)) print() print(crop_results(result.summary_text)) return result.scheduling_succeeded