Unified Map

Python 3 | Release v1.0.0 | License Apache 2.0
Unified
Many implementations, one way of access.
Map
Apply a function to a list of arguments and collect the results – serial, parallel or distributed.

Purpose

This package provides reasonably simple syntax for a frequent programming task which is implemented in various places (built-in, standard library, external libraries). Here are three descriptions of this task:

  • Map a list of inputs to a list of results via a user-provided function.
  • Apply a given function to a list of arguments to get a list of return values in the same order.
  • A black-box function is evaluated with different inputs and the outputs are collected.

In general, this is a so-called “pleasingly parallel problem” (aka “embarrassingly parallel” or “pleasingly parallel”) because it is straightforward to separate it into independend subtasks. For this reason, and due to its frequent occurrence, it is recognized as a programming idiom (“parallel map”, “parallel for loop”) in parallel computing that can equally simple be applied in distributed computing. This package allows to do so with a focus on simplicity of use.

Use cases

The function can be univariate (=takes one argument) or multivariate (=takes several arguments).

Functions visualization: Image not found

The evaluation can be executed in a serial (=on one core), parallel (=on several cores) or distributed (=on several machines) way.

Execution modes visualization: Image not found

Note

The arrows indicate that parallel and distributed calculations send objects to other processes or machines, namely the elements of the input list, the function and the elements of the result list. This implies that all of these objects need to be serializable, which is not always the case:

  • If a library uses Python’s pickle module for object serialization, then only certain objects can be “pickled”, i.e. be transformed in a binary serialization format. For example, a lambda expression will not work as function, since it can not be pickled.
  • Some libraries use improved serialization functionality from elsewhere, e.g. dill or cloudpickle.
  • Some libraries implement their own advanced serialization routines, e.g. joblib or dask.

Installation

  • Complete installation = with all optional dependencies

    $ pip install "unified-map[complete]"
    

    Note

    Apache Spark installation currently can not be automated correctly. If you want to use it, please install it manually as described below.

  • Minimal installation = with no optional dependencies

    $ pip install unified-map
    
    Functions that rely on third-party libraries will not work. You can, however, install any optional dependency you want to use by yourself, see below.
  • Optional dependencies

    • Joblib installation

      $ pip install joblib
      
    • Dask installation

      $ pip install "dask[complete]"
      
    • Apache Spark installation

      Apache Spark is written in Scala and provides APIs for several languages (incl. Python).

      • There is a “Python packaged version of Spark” available on PyPI that could be installed with pip. This does, however, not include the necessary scripts (e.g. for starting workers), therefore manual installation is required.

      • The “pre-packaged Apache Spark” needs to be downloaded from here. Extract it in a desired place in your file system (e.g. $HOME/bin/spark). Then certain environment variables need to be set (e.g. in .bashrc) so that Apache Spark can be found in the shell and in Python. Most importantly, SPARK_HOME needs to point at the installation directory:

        export SPARK_HOME="$HOME/bin/spark/spark-2.3.0-bin-hadoop2.7/"
        export PATH="$SPARK_HOME/bin/:$PATH"
        export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
        
      • Apache Spark uses Py4J to access Java objects from a Python interpreter:

        $ pip install py4j
        
      • Apache Spark is written in Scala which runs on a Java Virtual Machine, hence it may be necessary to install or update Java on your machine. The documentation indicates that Java 8+ is required.

Quickstart

Univariate function applied to a list of arguments where each element is a single value

import unified_map as umap

def square(x):
    return x**2

data = [1, 2, 3, 4, 5, 6, 7, 8]

# Serial on one core
results = umap.univariate.serial.for_loop(square, data)
results = umap.univariate.serial.generator_expression(square, data)
results = umap.univariate.serial.generator_function(square, data)
results = umap.univariate.serial.list_comprehension(square, data)
results = umap.univariate.serial.map(square, data)
results = umap.univariate.serial.starmap(square, data)

# Parallel on several cores
results = umap.univariate.parallel.dask(square, data)
results = umap.univariate.parallel.futures(square, data)
results = umap.univariate.parallel.joblib(square, data)
results = umap.univariate.parallel.multiprocessing(square, data)

# Distributed on several machines
umap.cluster_setup.spark.connect_to_scheduler('10.0.0.5:7077')
results = umap.univariate.distributed.spark(square, data)

umap.cluster_setup.dask.connect_to_scheduler('10.0.0.5:8789')
results = umap.univariate.distributed.dask(square, data)

Multivariate function applied ot a list of arguments where each element is a tuple of values

import unified_map as umap

def add(x, y, z):
    return x+y+z

data = [(1, 2, 3), (10, 20, 30), (1, 20, 300), (42, 15, 27)]

# Serial on one core
results = umap.multivariate.serial.for_loop(add, data)
results = umap.multivariate.serial.generator_expression(add, data)
results = umap.multivariate.serial.generator_function(add, data)
results = umap.multivariate.serial.list_comprehension(add, data)
results = umap.multivariate.serial.map(add, data)
results = umap.multivariate.serial.starmap(add, data)

# Parallel on several cores
results = umap.multivariate.parallel.dask(add, data)
results = umap.multivariate.parallel.futures(add, data)
results = umap.multivariate.parallel.joblib(add, data)
results = umap.multivariate.parallel.multiprocessing(add, data)

# Distributed on several machines
umap.cluster_setup.dask.connect_to_scheduler('10.0.0.5:8789')
results = umap.multivariate.distributed.dask(add, data)

umap.cluster_setup.spark.connect_to_scheduler('10.0.0.5:7077')
results = umap.multivariate.distributed.spark(add, data)

Note

connect_to_scheduler(<address>) requires that a scheduler and workers were started in other processes, possibly running on remote machines, see Cluster setup.

Start a scheduler in one terminal:

$ python3
>>> import unified_map as umap
>>> umap.cluster_setup.dask.start_scheduler('10.0.0.5:8789')

Start workers in other terminals:

$ python3
>>> import unified_map as umap
>>> umap.cluster_setup.dask.start_worker('10.0.0.5:8789')

More detailed information