Cluster setup

Distributed computing is a wide topic with inconsistent terminology. To reduce complexity, this package currently sticks to third-party libraries which basically follow a master-slave paradigm, or in less problematic terms, a scheduler-worker paradigm. For more nuances see Dask and Apache Spark.

Cluster architecture visualization: Image not found

What steps are necessary to get going?

  1. Start a scheduler process (aka master, controller, manager, …), either on your local computer or on a node of the cluster.

    start_scheduler(...) is a Python function provided for this, see below. Alternatively, the libraries also provide a shell command for starting a scheduler.

  2. Start some worker processes (aka slaves, …) on nodes of the cluster. By giving them the scheduler address, they can connect themselves to the scheduler via the local network.

    start_worker(...) is a Python function provided for this, see below. Alternatively, the libraries also provide a shell command for starting a worker.

  3. Start a connection (aka client) from your computer to the scheduler via the local network. This allows you to send a computing task to the scheduler which in turn distributes it intelligently (load balancing, fault tolerance, …) to the nodes of the cluster. Once they completed their tasks, the results can be collected by the scheduler and sent back to your computer.

    connect_to_scheduler(...) is a Python function provided for this, see below.

  4. Check the system status.

    report_status(...) is a Python function provided for this, see below.

How can a program (e.g. a worker process) be started on a cluster node?

An initial assumption is that every node has the same Python environment (=same interpreter and library versions) as your local computer. One way to start a program remotely on a cluster node would be to connect via SSH and run it from the shell (best with a terminal multiplexer like GNU Screen). However, usually a cluster has a cluster manager (e.g. Sun Grid Engine, Apache Mesos, or others) who is in charge and provides users with computing resources on demand. This means one needs to first request resources (CPU cores, RAM) on the cluster and submit a job that is executed at some time. Since cluster managers differ substantially, please refer to its documentation or ask a cluster administrator for help.

Dask cluster setup

The object unified_map.cluster_setup.dask provides methods to configure this package and Dask for distributed computing. It is an instance of the class outlined below.

Note

Do not try to instantiate the class yourself. The package will only recognize the object unified_map.cluster_setup.dask that was created during import.

class unified_map.cluster_setup._DaskSetup

Setup Dask for distributed computing on a cluster.

References

start_scheduler(scheduler_address)

Start a scheduler process. It occupies the Python interpreter until stopped externally.

Parameters:scheduler_address – Desired scheduler address of the form ‘ip-address:port’
Raises:ConnectionError – If scheduler can not be started with the given address.

Example

>>> start_scheduler('127.0.0.1:8791')

Note

A scheduler process can also be started with Dask’s command line interface, for example:

$ dask-scheduler --host 127.0.0.1 --port 8791

References

start_worker(scheduler_address, max_memory='auto', num_cores=1)

Start a worker process. It occupies the Python interpreter until stopped externally.

Parameters:
  • scheduler_address – Address of running scheduler of the form ‘ip-address:port’
  • max_memory (optional) – Amount of memory that this worker may use. Can be number of bytes (0 = no limit), ‘auto’ for 60% of memory or a string like ‘5GB’
  • num_cores (optional) – Number of cores that this worker will use

Example

>>> start_worker('127.0.0.1:8791')

Note

A worker process can also be started with Dask’s command line interface, for example:

$ dask-worker '127.0.0.1:8791'

Warning

Prevent crashes: Be careful with having a memory limit when using a Nanny process (CLI does so automatically): “At 95% memory load a worker’s nanny process will terminate it. This is to avoid having our worker job being terminated by an external job scheduler (like YARN, Mesos, SGE, etc.).”

References

connect_to_scheduler(scheduler_address)

Start a connection to a running scheduler. The Python interpreter is not occupied.

Parameters:scheduler_address – Address of running scheduler of the form ‘ip-address:port’

Example

>>> connect_to_scheduler('127.0.0.1:8791')

References

report_status()

Print information about the current cluster setup and status of the system.

References

Spark cluster setup

The object unified_map.cluster_setup.spark provides methods to configure this package and Apache Spark for distributed computing. It is an instance of the class outlined below.

Note

Do not try to instantiate the class yourself. The package will only recognize the object unified_map.cluster_setup.spark that was created during import.

class unified_map.cluster_setup._SparkSetup

Setup Apache Spark (Python bindings via pyspark) for distributed computing on a Cluster.

References

start_scheduler(scheduler_address)

Start a scheduler process. It occupies the Python interpreter until stopped externally.

Parameters:scheduler_address – Desired scheduler address of the form ‘ip-address:port’

Example

>>> start_scheduler('127.0.0.1:7077')

Note

A scheduler process can also be started with Apache Spark’s command line interface, for example:

$ $SPARK_HOME/sbin/start-master.sh --host 127.0.0.1 --port 7077

It needs to be stopped afterwards:

$ $SPARK_HOME/sbin/stop-master.sh

The status can be monitored in a Webbrowser at http://localhost:8080 or another reported address.

start_worker(scheduler_address)

Start a worker process. It occupies the Python interpreter until stopped externally.

Parameters:scheduler_address – Address of running scheduler of the form ‘ip-address:port’

Example

>>> start_worker('127.0.0.1:7077')

Note

A worker process can also be started with Apache Spark’s command line interface, for example:

$ $SPARK_HOME/sbin/start-slave.sh spark://127.0.0.1:7077 --work-dir $HOME/temp_spark_worker

It needs to be stopped afterwards:

$ $SPARK_HOME/sbin/stop-slave.sh
connect_to_scheduler(scheduler_address)

Start a connection to a running scheduler. The Python interpreter is not occupied.

Parameters:scheduler_address – Address of running scheduler of the form ‘ip-address:port’

Example

>>> connect_to_scheduler('127.0.0.1:7077')

References

report_status(depth=None)

Print information about the current cluster setup and status of the system.

References