User Guide

```{css echo=FALSE} img { border: 0px !important; margin: 2em 2em 2em 2em !important; } code { border: 0px !important; }

```r
knitr::opts_chunk$set(
    cache = FALSE,
    echo = TRUE,
    collapse = TRUE,
    comment = "#>"
)
options(clustermq.scheduler = "local")
library(clustermq)

Installation

Install the clustermq package in R from CRAN. This will automatically detect if ZeroMQ is installed and use the bundled library (with -DZMQ_BUILD_DRAFT_API=1 to detect worker crashes) if not:

# If your system has `libzmq` installed but you want to enable the worker crash
#   monitor, set the following environment variable to enable compilation of
#   the bundled `libzmq` library that has the required feature enabled:
# Sys.setenv(CLUSTERMQ_USE_SYSTEM_LIBZMQ=0)
install.packages('clustermq')

Alternatively you can use the remotes package to install directly from Github. Note that this version needs autoconf/automake for compilation:

# install.packages('remotes')
remotes::install_github('mschubert/clustermq')

In the develop branch, we will introduce code changes and new features. These may contain bugs, poor documentation, or other inconveniences. This branch may not install at times. However, feedback is very welcome.

# install.packages('remotes')
remotes::install_github('mschubert/clustermq', ref="develop")

Configuration {#configuration}

Choose your preferred parallelism using:

options(clustermq.scheduler = "your scheduler here")

There are three kinds of schedulers:

Parallel and HPC schedulers can also be used via SSH.

Local parallelization

While this is not the main focus of the package, you can use it to parallelize function calls locally on multiple cores or processes. This can also be useful to test your code before submitting it to a scheduler.

Setting up the scheduler

An HPC cluster's scheduler ensures that computing jobs are distributed to available worker nodes. Hence, this is what clustermq interfaces with in order to do computations.

By default, we will take whichever scheduler we find and fall back on local processing. This will work in most, but not all cases.

To set up a scheduler explicitly, see the following links:

Default submission templates are provided and can be customized, e.g. to activate compute environments or containers.

SSH connector {#ssh-connector}

There are reasons why you might prefer to not to work on the computing cluster directly but rather on your local machine instead. RStudio is an excellent local IDE, it's more responsive than and feature-rich than browser-based solutions (RStudio server, Project Jupyter), and it avoids X forwarding issues when you want to look at plots you just made.

Using this setup, however, you lost access to the computing cluster. Instead, you had to copy your data there, and then submit individual scripts as jobs, aggregating the data in the end again. clustermq is trying to solve this by providing a transparent SSH interface.

In order to use clustermq from your local machine, the package needs to be installed on both there and on the computing cluster. On the computing cluster, set up your scheduler and make sure clustermq runs there without problems. Note that the remote scheduler can not be LOCAL (default if no HPC scheduler found) or SSH for this to work.

# If this is set to 'LOCAL' or 'SSH' you will get the following error:
#  Expected PROXY_READY, received ‘PROXY_ERROR: Remote SSH QSys is not allowed’
options(
    clustermq.scheduler = "multiprocess" # or multicore, LSF, SGE, Slurm etc.
)

On your local machine, add the following options in your ~/.Rprofile:

options(
    clustermq.scheduler = "ssh",
    clustermq.ssh.host = "user@host", # use your user and host, obviously
    clustermq.ssh.log = "~/cmq_ssh.log" # log for easier debugging
)

We recommend that you set up SSH keys for password-less login.

Usage

The Q function

The following arguments are supported by Q:

Behavior can further be fine-tuned using the options below:

The full documentation is available by typing ?Q.

Examples

The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.

The simplest example is to a function call that is completely self-sufficient, and there is one argument (x) that we iterate through:

fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)

Non-iterated arguments are supported by the const argument:

fx = function(x, y) x * 2 + y
Q(fx, x=1:3, const=list(y=10), n_jobs=1)

If a function relies on objects in its environment that are not passed as arguments (including other functions), they can be exported using the export argument:

fx = function(x) x * 2 + y
Q(fx, x=1:3, export=list(y=10), n_jobs=1)

If we want to use a package function we need to load it on the worker using the pkg argument or referencing it with package_name:::

fx = function(x) {
    x %>%
        mutate(area = Sepal.Length * Sepal.Width) %>%
        head()
}
Q(fx, x=list(iris), pkgs="dplyr", n_jobs=1)

As parallel foreach backend

The foreach package provides an interface to perform repeated tasks on different backends. While it can perform the function of simple loops using %do%:

library(foreach)
x = foreach(i=1:3) %do% sqrt(i)

it can also perform these operations in parallel using %dopar%:

x = foreach(i=1:3) %dopar% sqrt(i)

The latter allows registering different handlers for parallel execution, where we can use clustermq:

# set up the scheduler first, otherwise this will run sequentially
clustermq::register_dopar_cmq(n_jobs=2, memory=1024) # this accepts same arguments as `Q`
x = foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs

As BiocParallel supports foreach too, this means we can run all packages that use BiocParallel on the cluster as well via DoparParam.

library(BiocParallel)
register(DoparParam()) # after register_dopar_cmq(...)
bplapply(1:3, sqrt)

With drake

The drake package enables users to define a dependency structure of different function calls, and only evaluate them if the underlying data changed.

drake — or, Data Frames in R for Make — is a general-purpose workflow manager for data-driven tasks. It rebuilds intermediate data objects when their dependencies change, and it skips work when the results are already up to date. Not every runthrough starts from scratch, and completed workflows have tangible evidence of reproducibility. drake also supports scalability, parallel computing, and a smooth user experience when it comes to setting up, deploying, and maintaining data science projects.

It can use clustermq to perform calculations as jobs:

library(drake)
load_mtcars_example()
# clean(destroy = TRUE)
# options(clustermq.scheduler = "multicore")
make(my_plan, parallelism = "clustermq", jobs = 2, verbose = 4)

Options

The various configurable options are mentioned throughout the documentation, where applicable, however, we list all of the options here for reference.

Options can be set by including a call to options(<key> = <value>) in your .Rprofile, or by calling options(<key> = <value>) in a script or interactively during a session.

Troubleshooting

Debugging workers

Function calls evaluated by workers are wrapped in event handlers, which means that even if a call evaluation throws an error, this should be reported back to the main R session.

However, there are reasons why workers might crash, and in which case they can not report back. These include:

In this case, it is useful to have the worker(s) create a log file that will also include events that are not reported back. It can be requested using:

Q(..., log_worker=TRUE)

This will create a file called -.log in your current working directory, irrespective of which scheduler you use.

You can customize the file name using

Q(..., template=list(log_file = <yourlog>))

Note that in this case log_file is a template field of your scheduler script, and hence needs to be present there in order for this to work. The default templates all have this field included.

In order to log each worker separately, some schedulers support wildcards in their log file names. For instance:

Your scheduler documentation will have more details about the available options.

When reporting a bug that includes worker crashes, please always include a log file.

SSH

Before trying remote schedulers via SSH, make sure that the scheduler works when you first connect to the cluster and run a job from there.

If the terminal is stuck at

Connecting <user@host> via SSH ...

make sure that each step of your SSH connection works by typing the following commands in your local terminal and make sure that you don't get errors or warnings in each step:

```{sh eval=FALSE}

test your ssh login that you set up in ~/.ssh/config

if this fails you have not set up SSH correctly

ssh user@host

test port forwarding from 54709 remote to 6687 local (ports are random)

if the fails you will not be able to use clustermq via SSH

ssh -R 54709:localhost:6687 user@host R --vanilla

If you get an `Command not found: R` error, make sure your `$PATH` is set up
correctly in your `~/.bash_profile` and/or your `~/.bashrc` (depending on your
cluster config you might need either). You may also need to modify your [SSH
template](#ssh-template) to load R as a module or conda environment.

If you get a SSH warning or error try again with `ssh -v` to enable verbose
output. If the forward itself works, run the following in your local R session
(ideally also in command-line R, [not only in
RStudio](https://github.com/mschubert/clustermq/issues/206)):

```r
options(clustermq.scheduler = "ssh",
        clustermq.ssh.log = "~/ssh_proxy.log")
Q(identity, x=1, n_jobs=1)

This will create a log file on the remote server that will contain any errors that might have occurred during ssh_proxy startup.

If the ssh_proxy startup fails on your local machine with the error

Remote R process did not respond after 5 seconds. Check your SSH server log.

but the server log does not show any errors, then you can try increasing the timeout:

options(clustermq.ssh.timeout = 30) # in seconds

This can happen when your SSH startup template includes additional steps before starting R, such as activating a module or conda environment, or having to confirm the connection via two-factor authentication.

Environments

Environments for workers

In some cases, it may be necessary to activate a specific computing environment on the scheduler jobs prior to starting up the worker. This can be, for instance, because R was only installed in a specific environment or container.

Examples for such environments or containers are:

It should be possible to activate them in the job submission script (i.e., the template file). This is widely untested, but would look the following for the LSF scheduler (analogous for others):

```{sh eval=FALSE}

BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs

BSUB-o {{ log_file | /dev/null }} # stdout + stderr

BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes

BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes

BSUB-q default # name of the queue (uncomment)

BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)

module load {{ bashenv | default_bash_env }}

or: source activate {{ conda | default_conda_env_name }}

or: your environment activation command

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

This template still needs to be filled, so in the above example you need to
pass either

```r
Q(..., template=list(bashenv="my environment name"))

or set it via an .Rprofile option:

options(
    clustermq.defaults = list(bashenv="my default env")
)

Running master inside containers

If your master process is inside a container, accessing the HPC scheduler is more difficult. Containers, including singularity and docker, isolate the processes inside the container from the host. The R process will not be able to submit a job because the scheduler cannot be found.

Note that the HPC node running the master process must be allowed to submit jobs. Not all HPC systems allow compute nodes to submit jobs. If that is the case, you may need to run the master process on the login node, and discuss the issue with your system administrator.

If your container is binary compatible with the host, you may be able to bind in the scheduler executable to the container.

For example, PBS might look something like:

```{sh eval=FALSE}

PBS directives ...

module load singularity

SINGULARITYENV_APPEND_PATH=/opt/pbs/bin singularity exec --bind /opt/pbs/bin r_image.sif Rscript master_script.R

A working example of binding SLURM into a CentOS 7 container image from a
CentOS 7 host is available at
https://groups.google.com/a/lbl.gov/d/msg/singularity/syLcsIWWzdo/NZvF2Ud2AAAJ

Alternatively, you can create a script that uses SSH to execute the scheduler
on the login node. For this, you will need an SSH client in the container,
[keys set up for password-less login](https://www.digitalocean.com/community/tutorials/how-to-configure-ssh-key-based-authentication-on-a-linux-server),
and create a script to call the scheduler on the login node via ssh (e.g.
`~/bin/qsub` for SGE/PBS/Torque, `bsub` for LSF and `sbatch` for Slurm):

```{sh eval=FALSE}
#!/bin/bash
ssh -i ~/.ssh/<your key file> ${PBS_O_HOST:-"no_host_not_in_a_pbs_job"} qsub "$@"

Make sure the script is executable, and bind/copy it into the container somewhere on $PATH. Home directories are bound in by default in singularity.

```{sh eval=FALSE} chmod u+x ~/bin/qsub SINGULARITYENV_APPEND_PATH=~/bin

## Scheduler templates {#scheduler-templates}

### LSF {#LSF}

In your `~/.Rprofile` on your computing cluster, set the following options:

```r
options(
    clustermq.scheduler = "lsf",
    clustermq.template = "/path/to/file/below" # if using your own template
)

The option clustermq.template should point to a LSF template file like the one below (only needed if you want to supply your own template rather than using the default).

```{sh eval=FALSE}

BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs

BSUB-n {{ cores | 1 }} # number of cores to use per job

BSUB-o {{ log_file | /dev/null }} # stdout + stderr; %I for array index

BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes

BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes

BSUB-q default # name of the queue (uncomment)

BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, `#BSUB-*` defines command-line arguments to the `bsub` program.

* Memory: defined by `BSUB-M` and `BSUB-R`. Check your local setup if the
  memory values supplied are MiB or KiB, default is `4096` if not requesting
  memory when calling `Q()`
* Queue: `BSUB-q default`. Use the queue with name *default*. This will most
  likely not exist on your system, so choose the right name (or comment out
  this line with an additional `#`)
* Walltime: `BSUB-W {{ walltime }}`. Set the maximum time a job is allowed to
  run before being killed. The default here is to disable this line. If you
  enable it, enter a fixed value or pass the `walltime` argument to each
  function call. The way it is written, it will use 6 hours if no arguemnt is
  given.
* For other options, see [the LSF
  documentation](https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=bsub-options)
  and add them via `#BSUB-*` (where `*` represents the argument)
* Do not change the identifiers in curly braces (`{{ ... }}`), as they are used
  to fill in the right variables

Once this is done, the package will use your settings and no longer warn you of
the missing options.

### SGE {#SGE}

In your `~/.Rprofile` on your computing cluster, set the following options:

```r
options(
    clustermq.scheduler = "sge",
    clustermq.template = "/path/to/file/below" # if using your own template
)

The option clustermq.template should point to a SGE template file like the one below (only needed if you want to supply your own template rather than using the default).

```{sh eval=FALSE}

$ -N {{ job_name }} # job name

$ -q default # submit to queue named "default"

$ -j y # combine stdout/error in one file

$ -o {{ log_file | /dev/null }} # output file

$ -cwd # use pwd as work dir

$ -V # use environment variable

$ -t 1-{{ n_jobs }} # submit jobs as array

$ -pe {{ cores | 1 }} # number of cores to use per job

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, `#$-*` defines command-line arguments to the `qsub` program.

* Queue: `$ -q default`. Use the queue with name *default*. This will most
  likely not exist on your system, so choose the right name (or comment out
  this line with an additional `#`)
* For other options, see [the SGE
  documentation](https://gridscheduler.sourceforge.net/htmlman/manuals.html). Do
  not change the identifiers in curly braces (`{{ ... }}`), as they are used to
  fill in the right variables.

Once this is done, the package will use your settings and no longer warn you of
the missing options.

### SLURM {#SLURM}

In your `~/.Rprofile` on your computing cluster, set the following options:

```r
options(
    clustermq.scheduler = "slurm",
    clustermq.template = "/path/to/file/below" # if using your own template
)

The option clustermq.template should point to a SLURM template file like the one below (only needed if you want to supply your own template rather than using the default).

```{sh eval=FALSE}

!/bin/sh

SBATCH --job-name={{ job_name }}

SBATCH --partition=default

SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array index

SBATCH --error={{ log_file | /dev/null }}

SBATCH --mem-per-cpu={{ memory | 4096 }}

SBATCH --array=1-{{ n_jobs }}

SBATCH --cpus-per-task={{ cores | 1 }}

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, `#SBATCH` defines command-line arguments to the `sbatch` program.

* Queue: `SBATCH --partition default`. Use the queue with name *default*. This
  will most likely not exist on your system, so choose the right name (or
  comment out this line with an additional `#`)
* For other options, see [the SLURM
  documentation](https://slurm.schedmd.com/sbatch.html). Do not change the
  identifiers in curly braces (`{{ ... }}`), as they are used to fill in the
  right variables.

Once this is done, the package will use your settings and no longer warn you of
the missing options.

### PBS {#PBS}

In your `~/.Rprofile` on your computing cluster, set the following options:

```r
options(
    clustermq.scheduler = "pbs",
    clustermq.template = "/path/to/file/below" # if using your own template
)

The option clustermq.template should point to a PBS template file like the one below (only needed if you want to supply your own template rather than using the default).

```{sh eval=FALSE}

PBS -N {{ job_name }}

PBS -J 1-{{ n_jobs }}

PBS -l select=1:ncpus={{ cores | 1 }}:mpiprocs={{ cores | 1 }}:mem={{ memory | 4096 }}MB

PBS -l walltime={{ walltime | 12:00:00 }}

PBS -o {{ log_file | /dev/null }}

PBS -j oe

PBS -q default

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, `#PBS-*` defines command-line arguments to the `qsub` program.

* Queue: `#PBS-q default`. Use the queue with name *default*. This will most
  likely not exist on your system, so choose the right name (or comment out
  this line with an additional `#`)
* For other options, see the PBS documentation. Do not change the identifiers
  in curly braces (`{{ ... }}`), as they are used to fill in the right
  variables.

Once this is done, the package will use your settings and no longer warn you of
the missing options.

### Torque {#Torque}

In your `~/.Rprofile` on your computing cluster, set the following options:

```r
options(clustermq.scheduler = "Torque",
        clustermq.template = "/path/to/file/below" # if using your own template
)

The option clustermq.template should point to a Torque template file like the one below (only needed if you want to supply your own template rather than using the default).

```{sh eval=FALSE}

PBS -N {{ job_name }}

PBS -l nodes={{ n_jobs }}:ppn={{ cores | 1 }},walltime={{ walltime | 12:00:00 }}

PBS -o {{ log_file | /dev/null }}

PBS -q default

PBS -j oe

ulimit -v $(( 1024 * {{ memory | 4096 }} )) CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

In this file, `#PBS-*` defines command-line arguments to the `qsub` program.

* Queue: `#PBS -q default`. Use the queue with name *default*. This will most
  likely not exist on your system, so choose the right name (or comment out
  this line with an additional `#`)
* For other options, see the Torque documentation. Do not change the
  identifiers in curly braces (`{{ ... }}`), as they are used to fill in the
  right variables.

Once this is done, the package will use your settings and no longer warn you of
the missing options.

### SSH {#ssh-template}

While SSH is not a scheduler, we can access remote schedulers via SSH. If you
want to use it, first make sure that your real scheduler is running when
manually connecting to the HPC environment.

```r
options(clustermq.scheduler = "ssh",
        clustermq.ssh.host = "myhost", # set this up in your local ~/.ssh/config
        clustermq.ssh.log = "~/ssh_proxy.log", # log file on your HPC
        clustermq.ssh.timeout = 30, # if changing the default connection timeout
        clustermq.template = "/path/to/file/below" # if using your own template
)

The default template is shown below. If R is not in your HPC $PATH, you may need to specify its path or load the required bash modules/conda environments.

{sh eval=FALSE} ssh -o "ExitOnForwardFailure yes" -f \ -R {{ ctl_port }}:localhost:{{ local_port }} \ -R {{ job_port }}:localhost:{{ fwd_port }} \ {{ ssh_host }} \ "R --no-save --no-restore -e \ 'clustermq:::ssh_proxy(ctl={{ ctl_port }}, job={{ job_port }})' \ > {{ ssh_log | /dev/null }} 2>&1"



Try the clustermq package in your browser

Any scripts or data that you put into this service are public.

clustermq documentation built on Nov. 21, 2023, 5:06 p.m.