R/launchers.R

Defines functions find_dot print.miraiLaunchCmd local_url host_url cluster_config ssh_config remote_config launch_remote launch_local

Documented in cluster_config host_url launch_local launch_remote local_url remote_config ssh_config

# mirai ------------------------------------------------------------------------

#' Launch Daemon
#'
#' Launching a daemon is very much akin to launching a satellite. They are a way
#' to deploy a daemon (in our case) on the desired machine. Once it executes, it
#' connects back to the host process using its own communications.
#' \cr\cr `launch_local` deploys a daemon on the local machine in a new
#' background `Rscript` process.
#'
#' Daemons must already be set for launchers to work.
#'
#' These functions may be used to re-launch daemons that have exited after
#' reaching time or task limits.
#'
#' @inheritParams mirai
#' @param n integer number of daemons.
#'
#'   **or** for `launch_remote` only, a 'miraiCluster' or 'miraiNode'.
#' @param ... (optional) arguments passed through to [daemon()]. These include
#'   `asycdial`, `autoexit`, `cleanup`, `output`, `maxtasks`, `idletime`,
#'   `walltime` and `tlscert`. Only supply to override arguments originally
#'   provided to [daemons()], otherwise those will be used instead.
#' @param tls \[default NULL\] deprecated. Specify `tlscert` as a `...` argument
#'   to [daemons()] when setting up daemons instead.
#'
#' @return For **launch_local**: Integer number of daemons launched.
#'
#' @examplesIf interactive()
#' daemons(url = host_url(), dispatcher = FALSE)
#' status()
#' launch_local(1L, cleanup = FALSE)
#' launch_remote(1L, cleanup = FALSE)
#' Sys.sleep(1)
#' status()
#' daemons(0)
#'
#' daemons(url = host_url(tls = TRUE))
#' status()
#' launch_local(2L, output = TRUE)
#' Sys.sleep(1)
#' status()
#' daemons(0)
#'
#' @export
#'
launch_local <- function(n = 1L, ..., tls = NULL, .compute = NULL) {
  envir <- compute_env(.compute)
  is.null(envir) && stop(._[["daemons_unset"]])
  url <- envir[["url"]]
  write_args <- if (is.null(envir[["dispatcher"]])) args_daemon_direct else args_daemon_disp
  dots <- if (...length()) parse_dots(envir, ...) else envir[["dots"]]
  if (is.null(tls)) tls <- envir[["tls"]]
  for (i in seq_len(n)) {
    launch_daemon(write_args(url, dots, maybe_next_stream(envir), tls))
  }
  n
}

#' Launch Daemon
#'
#' `launch_remote` returns the shell command for deploying daemons as a
#' character vector. If an [ssh_config()], [cluster_config()] or
#' [remote_config()] configuration is supplied then this is used to launch the
#' daemon on the remote machine.
#'
#' For non-dispatcher daemons using the default seed strategy, the generated
#' command contains the argument `rs` specifying the length 7 L'Ecuyer-CMRG
#' random seed supplied to the daemon. The values will be different each time
#' the function is called.
#'
#' @param remote required only for launching remote daemons, a configuration
#'   generated by [ssh_config()], [cluster_config()], or [remote_config()]. An
#'   empty [remote_config()] does not perform any launches but returns the shell
#'   commands for deploying manually on remote machines.
#'
#' @return For **launch_remote**: A character vector of daemon launch commands,
#'   classed as 'miraiLaunchCmd'. The printed output may be copy / pasted
#'   directly to the remote machine.
#'
#' @rdname launch_local
#' @export
#'
launch_remote <- function(n = 1L, remote = remote_config(), ..., tls = NULL, .compute = NULL) {
  if (!is.numeric(n) && inherits(n, c("miraiCluster", "miraiNode"))) {
    .compute <- attr(n, "id")
    n <- max(length(n), 1L)
  }
  n <- as.integer(n)
  envir <- compute_env(.compute)
  is.null(envir) && stop(._[["daemons_unset"]])
  url <- envir[["url"]]
  write_args <- if (is.null(envir[["dispatcher"]])) args_daemon_direct else args_daemon_disp
  dots <- if (...length()) parse_dots(envir, ...) else envir[["dots"]]
  if (is.null(tls)) tls <- envir[["tls"]]

  command <- remote[["command"]]
  rscript <- remote[["rscript"]]
  quote <- remote[["quote"]]

  if (length(command)) {
    args <- remote[["args"]]

    if (is.list(args)) {
      tunnel <- remote[["tunnel"]]

      if (tunnel) {
        purl <- parse_url(url)
        purl[["hostname"]] == "127.0.0.1" || stop(._[["localhost"]])
        prefix <- sprintf("-R %s:127.0.0.1:%s", purl[["port"]], purl[["port"]])
        for (i in seq_along(args)) {
          args[[i]][1L] <- sprintf("%s %s", prefix, args[[i]][1L])
        }
      }

      if (length(args) == 1L) {
        args <- args[[1L]]
      } else if (n == 1L || n == length(args)) {
        cmds <- sprintf(
          "%s -e %s",
          rscript,
          lapply(seq_along(args), function(i) write_args(url, dots, maybe_next_stream(envir), tls))
        )

        for (i in seq_along(args)) {
          system2(
            command,
            args = `[<-`(args[[i]], find_dot(args[[i]]), if (quote) shQuote(cmds[i]) else cmds[i]),
            wait = FALSE
          )
        }

        return(`class<-`(cmds, "miraiLaunchCmd"))
      } else {
        stop(._[["arglen"]])
      }
    }
  }

  cmds <- sprintf(
    "%s -e %s",
    rscript,
    lapply(seq_len(n), function(i) write_args(url, dots, maybe_next_stream(envir), tls))
  )

  if (length(command)) {
    for (cmd in cmds) {
      system2(
        command,
        args = if (is.null(quote)) {
          arg <- `[<-`(args, find_dot(args), cmd)
          c("-c", shQuote(sprintf("%s%s%s", arg[1L], arg[2L], arg[3L])))
        } else {
          `[<-`(args, find_dot(args), if (quote) shQuote(cmd) else cmd)
        },
        wait = FALSE
      )
    }
  }

  `class<-`(cmds, "miraiLaunchCmd")
}

#' Generic Remote Launch Configuration
#'
#' Provides a flexible generic framework for generating the shell commands to
#' deploy daemons remotely.
#'
#' @param command the command used to effect the daemon launch on the remote
#'   machine as a character string (e.g. `"ssh"`). Defaults to `"ssh"` for
#'   `ssh_config`, although may be substituted for the full path to a specific
#'   SSH application. The default NULL for `remote_config` does not carry out
#'   any launches, but causes [launch_remote()] to return the shell commands for
#'   manual deployment on remote machines.
#' @param args (optional) arguments passed to `command`, as a character vector
#'   that must include `"."` as an element, which will be substituted for the
#'   daemon launch command. Alternatively, a list of such character vectors to
#'   effect multiple launches (one for each list element).
#' @param rscript \[default "Rscript"\] assumes the R executable is on the
#'   search path. Replace with the full path of the Rscript executable on the
#'   remote machine if necessary. If launching on Windows, `"Rscript"` should be
#'   replaced with `"Rscript.exe"`.
#' @param quote \[default FALSE\] logical value whether or not to quote the
#'   daemon launch command (not required for Slurm `"srun"` for example, but
#'   required for Slurm `"sbatch"` or `"ssh"`).
#'
#' @return A list in the required format to be supplied to the `remote` argument
#'   of [daemons()] or [launch_remote()].
#'
#' @seealso [ssh_config()] for SSH launch configurations, or [cluster_config()]
#'   for cluster resource manager launch configurations.
#'
#' @examples
#' # Slurm srun example
#' remote_config(
#'   command = "srun",
#'   args = c("--mem 512", "-n 1", "."),
#'   rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # SSH requires 'quote = TRUE'
#' remote_config(
#'   command = "/usr/bin/ssh",
#'   args = c("-fTp 22 10.75.32.90", "."),
#'   quote = TRUE
#' )
#'
#' # can be used to start local dameons with special configurations
#' remote_config(
#'   command = "Rscript",
#'   rscript = "--default-packages=NULL --vanilla"
#' )
#'
#' @export
#'
remote_config <- function(command = NULL, args = c("", "."), rscript = "Rscript", quote = FALSE) {
  if (is.list(args)) lapply(args, find_dot) else find_dot(args)
  list(command = command, args = args, rscript = rscript, quote = quote, tunnel = FALSE)
}

#' SSH Remote Launch Configuration
#'
#' Generates a remote configuration for launching daemons over SSH, with the
#' option of SSH tunnelling.
#'
#' @param remotes the character URL or vector of URLs to SSH into, using the
#'   'ssh://' scheme and including the port open for SSH connections (defaults
#'   to 22 if not specified), e.g. 'ssh://10.75.32.90:22' or 'ssh://nodename'.
#' @param tunnel \[default FALSE\] logical value, whether to use SSH tunnelling.
#'   If TRUE, requires the [daemons()] `url` hostname to be '127.0.0.1'. See
#'   the 'SSH Tunnelling' section below for further details.
#' @param timeout \[default 10\] maximum time allowed for connection setup in
#'   seconds.
#' @inheritParams remote_config
#'
#' @inherit remote_config return
#'
#' @section SSH Direct Connections:
#'
#' The simplest use of SSH is to execute the daemon launch command on a remote
#' machine, for it to dial back to the host / dispatcher URL.
#'
#' It is assumed that SSH key-based authentication is already in place. The
#' relevant port on the host must also be open to inbound connections from the
#' remote machine, and is hence suitable for use within trusted networks.
#'
#' @section SSH Tunnelling:
#'
#' Use of SSH tunnelling provides a convenient way to launch remote daemons
#' without requiring the remote machine to be able to access the host. Often
#' firewall configurations or security policies may prevent opening a port to
#' accept outside connections.
#'
#' In these cases SSH tunnelling offers a solution by creating a tunnel once the
#' initial SSH connection is made. For simplicity, this SSH tunnelling
#' implementation uses the same port on both host and daemon. SSH key-based
#' authentication must already be in place, but no other configuration is
#' required.
#'
#' To use tunnelling, set the hostname of the [daemons()] `url` argument to be
#' '127.0.0.1'. Using [local_url()] with `tcp = TRUE` also does this for you.
#' Specifying a specific port to use is optional, with a random ephemeral port
#' assigned otherwise. For example, specifying 'tcp://127.0.0.1:5555' uses the
#' local port '5555' to create the tunnel on each machine. The host listens
#' to '127.0.0.1:5555' on its machine and the remotes each dial into
#' '127.0.0.1:5555' on their own respective machines.
#'
#' This provides a means of launching daemons on any machine you are able to
#' access via SSH, be it on the local network or the cloud.
#'
#' @seealso [cluster_config()] for cluster resource manager launch
#'   configurations, or [remote_config()] for generic configurations.
#'
#' @examples
#' # direct SSH example
#' ssh_config(c("ssh://10.75.32.90:222", "ssh://nodename"), timeout = 5)
#'
#' # SSH tunnelling example
#' ssh_config(c("ssh://10.75.32.90:222", "ssh://nodename"), tunnel = TRUE)
#'
#' \dontrun{
#'
#' # launch 2 daemons on the remote machines 10.75.32.90 and 10.75.32.91 using
#' # SSH, connecting back directly to the host URL over a TLS connection:
#' daemons(
#'   url = host_url(tls = TRUE),
#'   remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222"))
#' )
#'
#' # launch 2 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#'   n = 2,
#'   url = local_url(tcp = TRUE),
#'   remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
#' )
#' }
#'
#' @export
#'
ssh_config <- function(remotes, tunnel = FALSE, timeout = 10, command = "ssh", rscript = "Rscript") {
  premotes <- lapply(remotes, parse_url)
  hostnames <- lapply(premotes, .subset2, "hostname")
  ports <- lapply(premotes, .subset2, "port")

  ssh_args <- sprintf("-o ConnectTimeout=%s -fTp %s", as.character(timeout), ports)
  args <- lapply(seq_along(remotes), function(i) c(ssh_args[i], hostnames[[i]], "."))

  list(command = command, args = args, rscript = rscript, quote = TRUE, tunnel = isTRUE(tunnel))
}

#' Cluster Remote Launch Configuration
#'
#' Generates a remote configuration for launching daemons using an HPC cluster
#' resource manager such as Slurm sbatch, SGE and Torque/PBS qsub or LSF bsub.
#'
#' @param command \[default "sbatch"\] for Slurm. Replace with "qsub" for
#'   SGE / Torque / PBS, or "bsub" for LSF. See examples below.
#' @param options \[default ""\] options as would be supplied inside a script
#'   file passed to `command`, e.g. "#SBATCH --mem=10G", each separated by a new
#'   line. See examples below.
#'   \cr Other shell commands e.g. to change working directory may also be
#'   included.
#'   \cr For certain setups, "module load R" as a final line is required, or
#'   for example "module load R/4.5.0" for a specific R version.
#'   \cr For the avoidance of doubt, the initial shebang line such as
#'   "#!/bin/bash" is not required.
#' @inheritParams remote_config
#'
#' @inherit remote_config return
#'
#' @seealso [ssh_config()] for SSH launch configurations, or [remote_config()]
#'   for generic configurations.
#'
#' @examples
#' # Slurm Config:
#' cluster_config(
#'   command = "sbatch",
#'   options = "#SBATCH --job-name=mirai
#'              #SBATCH --mem=10G
#'              #SBATCH --output=job.out
#'              module load R/4.5.0",
#'   rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # SGE Config:
#' cluster_config(
#'   command = "qsub",
#'   options = "#$ -N mirai
#'              #$ -l mem_free=10G
#'              #$ -o job.out
#'              module load R/4.5.0",
#'   rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # Torque/PBS Config:
#' cluster_config(
#'   command = "qsub",
#'   options = "#PBS -N mirai
#'              #PBS -l mem=10gb
#'              #PBS -o job.out
#'              module load R/4.5.0",
#'   rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # LSF Config:
#' cluster_config(
#'   command = "bsub",
#'   options = "#BSUB -J mirai
#'              #BSUB -M 10000
#'              #BSUB -o job.out
#'              module load R/4.5.0",
#'   rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' \dontrun{
#'
#' # Launch 2 daemons using the Slurm sbatch defaults:
#' daemons(n = 2, url = host_url(), remote = cluster_config())
#' }
#'
#' @export
#'
cluster_config <- function(command = "sbatch", options = "", rscript = "Rscript") {
  command <- command[[1L]]
  options <- sub("^[ \t]+", "", options, perl = TRUE)
  options <- gsub("\n[ \t]+", "\n", options, perl = TRUE)
  args <- c(
    sprintf("%s<<'EOF'\n#!/bin/sh\n%s\n", command, options),
    ".",
    "\nEOF"
  )
  list(command = "/bin/sh", args = args, rscript = rscript, quote = NULL)
}

#' URL Constructors
#'
#' `host_url()` constructs a valid host URL (at which daemons may connect) based
#' on the computer's IP address. This may be supplied directly to the `url`
#' argument of [daemons()].
#'
#' `host_url()` will return a vector of URLs if multiple network adapters are in
#' use, and each will be named by the interface name (adapter friendly name on
#' Windows). If this entire vector is passed to the `url` argument of functions
#' such as `daemons()`, the first URL is used. If no suitable IP addresses are
#' detected, the computer's hostname will be used as a fallback.
#'
#' `local_url()` generates a random URL for the platform's default inter-process
#' communications transport: abstract Unix domain sockets on Linux, Unix domain
#' sockets on MacOS, Solaris and other POSIX platforms, and named pipes on
#' Windows.
#'
#' @param tls \[default FALSE\] logical value whether to use TLS in which case
#'   the scheme used will be 'tls+tcp://'.
#' @param port \[default 0\] numeric port to use. `0` is a wildcard value that
#'   automatically assigns a free ephemeral port. For `host_url`, this port
#'   should be open to connections from the network addresses the daemons are
#'   connecting from. For `local_url`, is only taken into account if
#'   `tcp = TRUE`.
#'
#' @return A character vector (comprising a valid URL or URLs), named for
#'   `host_url()`.
#'
#' @examples
#' host_url()
#' host_url(tls = TRUE)
#' host_url(tls = TRUE, port = 5555)
#'
#' @export
#'
host_url <- function(tls = FALSE, port = 0) {
  ip <- ip_addr()
  `names<-`(
    sprintf(
      "%s://%s:%d",
      if (tls) "tls+tcp" else "tcp",
      if (any(nzchar(ip))) ip else Sys.info()[["nodename"]],
      as.integer(port)
    ),
    names(ip)
  )
}

#' URL Constructors
#'
#' `local_url()` constructs a URL suitable for local daemons, or for use with
#' SSH tunnelling. This may be supplied directly to the `url` argument of
#' [daemons()].
#'
#' @param tcp \[default FALSE\] logical value whether to use a TCP connection.
#'   This must be used for SSH tunnelling.
#'
#' @examples
#' local_url()
#' local_url(tcp = TRUE)
#' local_url(tcp = TRUE, port = 5555)
#'
#' @rdname host_url
#' @export
#'
local_url <- function(tcp = FALSE, port = 0) {
  tcp && return(sprintf("tcp://127.0.0.1:%d", as.integer(port)))
  sprintf("%s%s", .urlscheme, random(12L))
}

#' @export
#'
print.miraiLaunchCmd <- function(x, ...) {
  for (i in seq_along(x)) cat(sprintf("[%d]\n%s\n\n", i, x[i]), file = stdout())
  invisible(x)
}

# internals --------------------------------------------------------------------

find_dot <- function(args) {
  sel <- args == "."
  any(sel) || stop(._[["dot_required"]], call. = FALSE)
  sel
}

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Sept. 9, 2025, 5:51 p.m.