Unified Map¶
- Unified
- Many implementations, one way of access.
- Map
- Apply a function to a list of arguments and collect the results – serial, parallel or distributed.
Purpose¶
This package provides reasonably simple syntax for a frequent programming task which is implemented in various places (built-in, standard library, external libraries). Here are three descriptions of this task:
- Map a list of inputs to a list of results via a user-provided function.
- Apply a given function to a list of arguments to get a list of return values in the same order.
- A black-box function is evaluated with different inputs and the outputs are collected.
In general, this is a so-called “pleasingly parallel problem” (aka “embarrassingly parallel” or “pleasingly parallel”) because it is straightforward to separate it into independend subtasks. For this reason, and due to its frequent occurrence, it is recognized as a programming idiom (“parallel map”, “parallel for loop”) in parallel computing that can equally simple be applied in distributed computing. This package allows to do so with a focus on simplicity of use.
Use cases¶
The function can be univariate (=takes one argument) or multivariate (=takes several arguments).
The evaluation can be executed in a serial (=on one core), parallel (=on several cores) or distributed (=on several machines) way.
Note
The arrows indicate that parallel and distributed calculations send objects to other processes or machines, namely the elements of the input list, the function and the elements of the result list. This implies that all of these objects need to be serializable, which is not always the case:
- If a library uses Python’s pickle module for object serialization, then only certain objects can be “pickled”, i.e. be transformed in a binary serialization format. For example, a lambda expression will not work as function, since it can not be pickled.
- Some libraries use improved serialization functionality from elsewhere, e.g. dill or cloudpickle.
- Some libraries implement their own advanced serialization routines, e.g. joblib or dask.
Installation¶
Complete installation = with all optional dependencies
$ pip install "unified-map[complete]"
Note
Apache Spark installation currently can not be automated correctly. If you want to use it, please install it manually as described below.
Minimal installation = with no optional dependencies
$ pip install unified-map
Functions that rely on third-party libraries will not work. You can, however, install any optional dependency you want to use by yourself, see below.Optional dependencies
-
$ pip install joblib
-
$ pip install "dask[complete]"
-
Apache Spark is written in Scala and provides APIs for several languages (incl. Python).
There is a “Python packaged version of Spark” available on PyPI that could be installed with
pip
. This does, however, not include the necessary scripts (e.g. for starting workers), therefore manual installation is required.The “pre-packaged Apache Spark” needs to be downloaded from here. Extract it in a desired place in your file system (e.g.
$HOME/bin/spark
). Then certain environment variables need to be set (e.g. in.bashrc
) so that Apache Spark can be found in the shell and in Python. Most importantly,SPARK_HOME
needs to point at the installation directory:export SPARK_HOME="$HOME/bin/spark/spark-2.3.0-bin-hadoop2.7/" export PATH="$SPARK_HOME/bin/:$PATH" export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
Apache Spark uses Py4J to access Java objects from a Python interpreter:
$ pip install py4j
Apache Spark is written in Scala which runs on a Java Virtual Machine, hence it may be necessary to install or update Java on your machine. The documentation indicates that
Java 8+
is required.- On a Linux system you can check your Java version on the command line
with
java -version
. - An open-source implementations like OpenJDK may work (no guarantee!).
- If necessary, install the official Java Runtime Environment (JRE) from Oracle.
- On a Linux system you can check your Java version on the command line
with
-
Quickstart¶
Univariate function applied to a list of arguments where each element is a single value
import unified_map as umap
def square(x):
return x**2
data = [1, 2, 3, 4, 5, 6, 7, 8]
# Serial on one core
results = umap.univariate.serial.for_loop(square, data)
results = umap.univariate.serial.generator_expression(square, data)
results = umap.univariate.serial.generator_function(square, data)
results = umap.univariate.serial.list_comprehension(square, data)
results = umap.univariate.serial.map(square, data)
results = umap.univariate.serial.starmap(square, data)
# Parallel on several cores
results = umap.univariate.parallel.dask(square, data)
results = umap.univariate.parallel.futures(square, data)
results = umap.univariate.parallel.joblib(square, data)
results = umap.univariate.parallel.multiprocessing(square, data)
# Distributed on several machines
umap.cluster_setup.spark.connect_to_scheduler('10.0.0.5:7077')
results = umap.univariate.distributed.spark(square, data)
umap.cluster_setup.dask.connect_to_scheduler('10.0.0.5:8789')
results = umap.univariate.distributed.dask(square, data)
Multivariate function applied ot a list of arguments where each element is a tuple of values
import unified_map as umap
def add(x, y, z):
return x+y+z
data = [(1, 2, 3), (10, 20, 30), (1, 20, 300), (42, 15, 27)]
# Serial on one core
results = umap.multivariate.serial.for_loop(add, data)
results = umap.multivariate.serial.generator_expression(add, data)
results = umap.multivariate.serial.generator_function(add, data)
results = umap.multivariate.serial.list_comprehension(add, data)
results = umap.multivariate.serial.map(add, data)
results = umap.multivariate.serial.starmap(add, data)
# Parallel on several cores
results = umap.multivariate.parallel.dask(add, data)
results = umap.multivariate.parallel.futures(add, data)
results = umap.multivariate.parallel.joblib(add, data)
results = umap.multivariate.parallel.multiprocessing(add, data)
# Distributed on several machines
umap.cluster_setup.dask.connect_to_scheduler('10.0.0.5:8789')
results = umap.multivariate.distributed.dask(add, data)
umap.cluster_setup.spark.connect_to_scheduler('10.0.0.5:7077')
results = umap.multivariate.distributed.spark(add, data)
Note
connect_to_scheduler(<address>)
requires that a scheduler and workers
were started in other processes, possibly running on remote machines,
see Cluster setup.
Start a scheduler in one terminal:
$ python3
>>> import unified_map as umap
>>> umap.cluster_setup.dask.start_scheduler('10.0.0.5:8789')
Start workers in other terminals:
$ python3
>>> import unified_map as umap
>>> umap.cluster_setup.dask.start_worker('10.0.0.5:8789')