Nothing
# mirai ------------------------------------------------------------------------
#' Launch Daemons
#'
#' `launch_local()` launches daemons on the local machine as background R
#' processes that connect back to the host.
#'
#' Daemons must already be set for launchers to work.
#'
#' These functions may be used to re-launch daemons that have exited after
#' reaching time or task limits.
#'
#' @inheritParams mirai
#' @param n (integer) number of daemons to launch. For `launch_remote()`, may
#' also be a 'miraiCluster' or 'miraiNode'.
#' @param ... (daemon arguments) passed to [daemon()], including `asyncdial`,
#' `autoexit`, `cleanup`, `output`, `maxtasks`, `idletime`, and `walltime`.
#' Overrides arguments from [daemons()] if supplied.
#'
#' @return For **launch_local**: Integer number of daemons launched.
#'
#' @examplesIf interactive()
#' daemons(url = host_url(), dispatcher = FALSE)
#' info()
#' launch_local(1L, cleanup = FALSE)
#' launch_remote(1L, cleanup = FALSE)
#' Sys.sleep(1)
#' info()
#' daemons(0)
#'
#' daemons(url = host_url(tls = TRUE))
#' info()
#' launch_local(2L, output = TRUE)
#' Sys.sleep(1)
#' info()
#' daemons(0)
#'
#' @export
#'
launch_local <- function(n = 1L, ..., .compute = NULL) {
envir <- compute_env(.compute)
is.null(envir) && stop(._[["daemons_unset"]])
envir[["sync"]] && stop(._[["synchronous"]])
url <- envir[["url"]]
write_args <- if (is.null(envir[["dispatcher"]])) args_daemon_direct else args_daemon_disp
dots <- if (...length()) parse_dots(envir, ...) else envir[["dots"]]
tls <- envir[["tls"]]
for (i in seq_len(n)) {
launch_daemon(write_args(url, dots, maybe_next_stream(envir), tls))
}
n
}
#' Launch Daemons
#'
#' `launch_remote` returns the shell command for deploying daemons as a
#' character vector. If an [ssh_config()], [cluster_config()] or
#' [remote_config()] configuration is supplied then this is used to launch the
#' daemon on the remote machine.
#'
#' For non-dispatcher daemons using the default seed strategy, the generated
#' command contains the argument `rs` specifying the length 7 L'Ecuyer-CMRG
#' random seed supplied to the daemon. The values will be different each time
#' the function is called.
#'
#' @param remote (configuration) for launching daemons, generated by
#' [ssh_config()], [cluster_config()], [http_config()], or [remote_config()].
#' An empty [remote_config()] returns shell commands for manual deployment
#' without launching.
#'
#' @return For **launch_remote**: A character vector of daemon launch commands,
#' classed as 'miraiLaunchCmd'. The printed output may be copy / pasted
#' directly to the remote machine. For the [http_config()] launcher, a list
#' of server response data, returned invisibly.
#'
#' @rdname launch_local
#' @export
#'
launch_remote <- function(n = 1L, remote = remote_config(), ..., .compute = NULL) {
if (!is.numeric(n) && inherits(n, c("miraiCluster", "miraiNode"))) {
.compute <- attr(n, "id")
n <- max(length(n), 1L)
}
n <- as.integer(n)
envir <- compute_env(.compute)
is.null(envir) && stop(._[["daemons_unset"]])
envir[["sync"]] && stop(._[["synchronous"]])
url <- envir[["url"]]
write_args <- if (is.null(envir[["dispatcher"]])) args_daemon_direct else args_daemon_disp
dots <- if (...length()) parse_dots(envir, ...) else envir[["dots"]]
tls <- envir[["tls"]]
if (is.character(remote[["type"]]) && remote[["type"]] == "http") {
api_url <- if (is.function(remote[["url"]])) remote[["url"]]() else remote[["url"]]
method <- remote[["method"]]
cookie <- if (is.function(remote[["cookie"]])) remote[["cookie"]]() else remote[["cookie"]]
token <- if (is.function(remote[["token"]])) remote[["token"]]() else remote[["token"]]
data <- if (is.function(remote[["data"]])) remote[["data"]]() else remote[["data"]]
headers <- c(Authorization = sprintf("Bearer %s", token), Cookie = cookie)
res <- lapply(seq_len(n), function(i) {
cmd <- write_args(url, dots, maybe_next_stream(envir), tls)
cmd <- gsub("\\", "\\\\", cmd, fixed = TRUE)
cmd <- gsub("\"", "\\\"", cmd, fixed = TRUE)
ncurl(
url = api_url,
method = method,
headers = headers,
data = sprintf(data, cmd),
timeout = .limit_short
)
})
return(invisible(res))
}
command <- remote[["command"]]
rscript <- remote[["rscript"]]
quote <- remote[["quote"]]
if (length(command)) {
args <- remote[["args"]]
if (is.list(args)) {
tunnel <- remote[["tunnel"]]
if (tunnel) {
purl <- parse_url(url)
purl[["hostname"]] == "127.0.0.1" || stop(._[["localhost"]])
prefix <- sprintf("-R %s:127.0.0.1:%s", purl[["port"]], purl[["port"]])
for (i in seq_along(args)) {
args[[i]][1L] <- sprintf("%s %s", prefix, args[[i]][1L])
}
}
if (length(args) == 1L) {
args <- args[[1L]]
} else if (n == 1L || n == length(args)) {
cmds <- sprintf(
"%s -e %s",
rscript,
lapply(seq_along(args), function(i) {
shQuote(write_args(url, dots, maybe_next_stream(envir), tls))
})
)
for (i in seq_along(args)) {
system2(
command,
args = `[<-`(args[[i]], find_dot(args[[i]]), if (quote) shQuote(cmds[i]) else cmds[i]),
wait = FALSE
)
}
return(`class<-`(cmds, "miraiLaunchCmd"))
} else {
stop(._[["arglen"]])
}
}
}
cmds <- sprintf(
"%s -e %s",
rscript,
lapply(seq_len(n), function(i) shQuote(write_args(url, dots, maybe_next_stream(envir), tls)))
)
if (length(command)) {
for (cmd in cmds) {
system2(
command,
args = if (is.null(quote)) {
arg <- `[<-`(args, find_dot(args), cmd)
c("-c", shQuote(sprintf("%s%s%s", arg[1L], arg[2L], arg[3L])))
} else {
`[<-`(args, find_dot(args), if (quote) shQuote(cmd) else cmd)
},
wait = FALSE
)
}
}
`class<-`(cmds, "miraiLaunchCmd")
}
#' Generic Remote Launch Configuration
#'
#' Provides a flexible generic framework for generating the shell commands to
#' deploy daemons remotely.
#'
#' @param command (character) shell command for launching daemons (e.g.
#' `"ssh"`). `NULL` returns shell commands for manual deployment without
#' launching.
#' @param args (character vector) arguments to `command`, must include `"."` as
#' placeholder for the daemon launch command. May be a list of vectors for
#' multiple launches.
#' @param rscript (character) Rscript executable. Use full path if needed, or
#' `"Rscript.exe"` on Windows.
#' @param quote (logical) whether to quote the daemon launch command. Required
#' for `"sbatch"` and `"ssh"`, not for `"srun"`.
#'
#' @return A list in the required format to be supplied to the `remote` argument
#' of [daemons()] or [launch_remote()].
#'
#' @seealso [ssh_config()], [cluster_config()] and [http_config()] for other
#' types of remote configuration.
#'
#' @examples
#' # Slurm srun example
#' remote_config(
#' command = "srun",
#' args = c("--mem 512", "-n 1", "."),
#' rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # SSH requires 'quote = TRUE'
#' remote_config(
#' command = "/usr/bin/ssh",
#' args = c("-fTp 22 10.75.32.90", "."),
#' quote = TRUE
#' )
#'
#' # can be used to start local daemons with special configurations
#' remote_config(
#' command = "Rscript",
#' rscript = "--default-packages=NULL --vanilla"
#' )
#'
#' @export
#'
remote_config <- function(command = NULL, args = c("", "."), rscript = "Rscript", quote = FALSE) {
if (is.list(args)) {
lapply(args, find_dot)
} else {
find_dot(args)
}
list(command = command, args = args, rscript = rscript, quote = quote, tunnel = FALSE)
}
#' SSH Remote Launch Configuration
#'
#' Generates a remote configuration for launching daemons over SSH, with the
#' option of SSH tunnelling.
#'
#' @param remotes (character) URL(s) to SSH into using scheme 'ssh://', e.g.
#' 'ssh://10.75.32.90:22' or 'ssh://nodename'. Port defaults to 22.
#' @param tunnel (logical) whether to use SSH tunnelling. Requires `url`
#' hostname '127.0.0.1' (use [local_url()] with `tcp = TRUE`). See SSH
#' Tunnelling section.
#' @param timeout (integer) maximum seconds for connection setup.
#' @inheritParams remote_config
#'
#' @inherit remote_config return
#'
#' @section SSH Direct Connections:
#'
#' The simplest use of SSH is to execute the daemon launch command on a remote
#' machine, for it to dial back to the host / dispatcher URL.
#'
#' SSH key-based authentication must already be in place. The relevant port on
#' the host must be open to inbound connections from the remote machine. This
#' approach is suited to trusted networks.
#'
#' @section SSH Tunnelling:
#'
#' SSH tunnelling launches remote daemons without requiring the remote machine
#' to access the host directly. Often firewall configurations or security
#' policies may prevent opening a port to accept outside connections.
#'
#' A tunnel is created once the initial SSH connection is made. For simplicity,
#' this SSH tunnelling implementation uses the same port on both host and
#' daemon. SSH key-based authentication must already be in place, but no other
#' configuration is required.
#'
#' To use tunnelling, set the hostname of the [daemons()] `url` argument to be
#' '127.0.0.1'. Using [local_url()] with `tcp = TRUE` also does this for you.
#' Specifying a specific port to use is optional, with a random ephemeral port
#' assigned otherwise. For example, specifying 'tcp://127.0.0.1:5555' uses the
#' local port '5555' to create the tunnel on each machine. The host listens
#' to '127.0.0.1:5555' on its machine and the remotes each dial into
#' '127.0.0.1:5555' on their own respective machines.
#'
#' Daemons can be launched on any machine accessible via SSH, whether on the
#' local network or in the cloud.
#'
#' @seealso [cluster_config()], [http_config()] and [remote_config()] for other
#' types of remote configuration.
#'
#' @examples
#' # direct SSH example
#' ssh_config(c("ssh://10.75.32.90:222", "ssh://nodename"), timeout = 5)
#'
#' # SSH tunnelling example
#' ssh_config(c("ssh://10.75.32.90:222", "ssh://nodename"), tunnel = TRUE)
#'
#' \dontrun{
#'
#' # launch daemons on the remote machines 10.75.32.90 and 10.75.32.91 using
#' # SSH, connecting back directly to the host URL over a TLS connection:
#' daemons(
#' n = 1,
#' url = host_url(tls = TRUE),
#' remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222"))
#' )
#'
#' # launch 2 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#' n = 2,
#' url = local_url(tcp = TRUE),
#' remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
#' )
#' }
#'
#' @export
#'
ssh_config <- function(
remotes,
tunnel = FALSE,
timeout = 10,
command = "ssh",
rscript = "Rscript"
) {
premotes <- lapply(remotes, parse_url)
hostnames <- lapply(premotes, .subset2, "hostname")
ports <- lapply(premotes, .subset2, "port")
ssh_args <- sprintf("-o ConnectTimeout=%s -fTp %s", as.character(timeout), ports)
args <- lapply(seq_along(remotes), function(i) c(ssh_args[i], hostnames[[i]], "."))
list(command = command, args = args, rscript = rscript, quote = TRUE, tunnel = isTRUE(tunnel))
}
#' Cluster Remote Launch Configuration
#'
#' Generates a remote configuration for launching daemons using an HPC cluster
#' resource manager such as Slurm sbatch, SGE and Torque/PBS qsub or LSF bsub.
#'
#' @param command (character) cluster manager executable: `"sbatch"` (Slurm),
#' `"qsub"` (SGE/Torque/PBS), or `"bsub"` (LSF).
#' @param options (character) script options for `command` (e.g.
#' "#SBATCH --mem=16G"), newline-separated. May include shell commands such as
#' "module load R/4.5.0". Shebang line such as "#!/bin/bash" not required.
#' @inheritParams remote_config
#'
#' @inherit remote_config return
#'
#' @seealso [ssh_config()], [http_config()] and [remote_config()] for other
#' types of remote configuration.
#'
#' @examples
#' # Slurm Config:
#' cluster_config(
#' command = "sbatch",
#' options = "#SBATCH --job-name=mirai
#' #SBATCH --mem=16G
#' #SBATCH --output=job.out
#' module load R/4.5.0",
#' rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # SGE Config:
#' cluster_config(
#' command = "qsub",
#' options = "#$ -N mirai
#' #$ -l mem_free=16G
#' #$ -o job.out
#' module load R/4.5.0",
#' rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # Torque/PBS Config:
#' cluster_config(
#' command = "qsub",
#' options = "#PBS -N mirai
#' #PBS -l mem=16gb
#' #PBS -o job.out
#' module load R/4.5.0",
#' rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' # LSF Config:
#' cluster_config(
#' command = "bsub",
#' options = "#BSUB -J mirai
#' #BSUB -M 16000
#' #BSUB -o job.out
#' module load R/4.5.0",
#' rscript = file.path(R.home("bin"), "Rscript")
#' )
#'
#' \dontrun{
#'
#' # Launch 2 daemons using the Slurm sbatch defaults:
#' daemons(n = 2, url = host_url(), remote = cluster_config())
#' }
#'
#' @export
#'
cluster_config <- function(command = "sbatch", options = "", rscript = "Rscript") {
command <- command[[1L]]
options <- gsub("^[ \t]+|(?<=\n)[ \t]+", "", options, perl = TRUE)
args <- c(sprintf("%s<<'EOF'\n#!/bin/sh\n%s\n", command, options), ".", "\nEOF")
list(command = "/bin/sh", args = args, rscript = rscript, quote = NULL)
}
#' HTTP Remote Launch Configuration
#'
#' Generates a remote configuration for launching daemons via HTTP API. By
#' default, automatically configures for Posit Workbench using environment
#' variables.
#'
#' @param url (character or function) URL endpoint for the launch API. May be a
#' function returning the URL value.
#' @param method (character) HTTP method, typically `"POST"`.
#' @param cookie (character or function) session cookie value. May be a
#' function returning the cookie value. Set to `NULL` if not required for
#' authentication.
#' @param token (character or function) authentication bearer token. May be a
#' function returning the token value. Set to `NULL` if not required for
#' authentication.
#' @param data (character or function) JSON or formatted request body containing
#' the daemon launch command. May be a function returning the data value.
#' Should include a placeholder `"%s"` where the `mirai::daemon()` call
#' will be inserted at launch time.
#'
#' @inherit remote_config return
#'
#' @seealso [ssh_config()], [cluster_config()] and [remote_config()] for other
#' types of remote configuration.
#'
#' @examples
#' tryCatch(http_config(), error = identity)
#'
#' # Custom HTTP configuration example:
#' http_config(
#' url = "https://api.example.com/launch",
#' method = "POST",
#' cookie = function() Sys.getenv("MY_SESSION_COOKIE"),
#' token = function() Sys.getenv("MY_API_KEY"),
#' data = '{"command": "%s"}'
#' )
#'
#' \dontrun{
#' # Launch 2 daemons using http config default (for Posit Workbench):
#' daemons(n = 2, url = host_url(), remote = http_config())
#' }
#'
#' @export
#'
http_config <- function(
url = posit_workbench_url,
method = "POST",
cookie = posit_workbench_cookie,
token = NULL,
data = posit_workbench_data
) {
list(type = "http", url = url, method = method, cookie = cookie, token = token, data = data)
}
#' URL Constructors
#'
#' `host_url()` constructs a valid host URL (at which daemons may connect) based
#' on the computer's IP address. This may be supplied directly to the `url`
#' argument of [daemons()].
#'
#' `host_url()` will return a vector of URLs if multiple network adapters are in
#' use, and each will be named by the interface name (adapter friendly name on
#' Windows). If this entire vector is passed to the `url` argument of functions
#' such as `daemons()`, the first URL is used. If no suitable IP addresses are
#' detected, the computer's hostname will be used as a fallback.
#'
#' `local_url()` generates a random URL for the platform's default inter-process
#' communications transport: abstract Unix domain sockets on Linux, Unix domain
#' sockets on MacOS, Solaris and other POSIX platforms, and named pipes on
#' Windows.
#'
#' @param tls (logical) whether to use TLS (scheme 'tls+tcp://').
#' @param port (integer) port number. `0` assigns a free ephemeral port. For
#' [host_url()], must be open to daemon connections. For [local_url()], only
#' used when `tcp = TRUE`.
#'
#' @return A character vector (comprising a valid URL or URLs), named for
#' `host_url()`.
#'
#' @examples
#' host_url()
#' host_url(tls = TRUE)
#' host_url(tls = TRUE, port = 5555)
#'
#' @export
#'
host_url <- function(tls = FALSE, port = 0) {
ip <- ip_addr()
`names<-`(
sprintf(
"%s://%s:%d",
if (tls) "tls+tcp" else "tcp",
if (any(nzchar(ip))) ip else Sys.info()[["nodename"]],
as.integer(port)
),
names(ip)
)
}
#' URL Constructors
#'
#' `local_url()` constructs a URL suitable for local daemons, or for use with
#' SSH tunnelling. This may be supplied directly to the `url` argument of
#' [daemons()].
#'
#' @param tcp (logical) whether to use TCP. Required for SSH tunnelling.
#'
#' @examples
#' local_url()
#' local_url(tcp = TRUE)
#' local_url(tcp = TRUE, port = 5555)
#'
#' @rdname host_url
#' @export
#'
local_url <- function(tcp = FALSE, port = 0) {
tcp && return(sprintf("tcp://127.0.0.1:%d", as.integer(port)))
sprintf("%s%s", .urlscheme, random(12L))
}
#' @export
#'
print.miraiLaunchCmd <- function(x, ...) {
for (i in seq_along(x)) {
cat(sprintf("[%d]\n%s\n\n", i, x[i]), file = stdout())
}
invisible(x)
}
# internals --------------------------------------------------------------------
find_dot <- function(args) {
sel <- args == "."
any(sel) || stop(._[["dot_required"]], call. = FALSE)
sel
}
posit_workbench_cookie <- function() posit_workbench_get("cookie")
posit_workbench_url <- function() posit_workbench_get("url")
posit_workbench_data <- function(rscript = "Rscript") posit_workbench_get("data", rscript)
posit_workbench_get <- function(what, rscript = NULL) {
switch(
what,
cookie = Sys.getenv("RS_SESSION_RPC_COOKIE"),
url = file.path(Sys.getenv("RS_SERVER_ADDRESS"), "api", "launch_job"),
data = {
requireNamespace("secretbase", quietly = TRUE) || stop(._[["secretbase"]])
url <- Sys.getenv("RS_SERVER_ADDRESS")
cookie <- Sys.getenv("RS_SESSION_RPC_COOKIE")
nzchar(url) && nzchar(cookie) || stop(._[["posit_api"]])
envs <- ncurl(
file.path(url, "api", "get_compute_envs"),
headers = c(Cookie = cookie),
timeout = .limit_short
)
envs[["status"]] == 200L || stop(._[["posit_api"]])
cluster <- secretbase::jsondec(envs[["data"]])[["result"]][["clusters"]][[1L]]
lp <- sprintf(".libPaths(c(%s))", paste(sprintf("\"%s\"", .libPaths()), collapse = ","))
job <- list(
cluster = cluster[["name"]],
container = list(image = cluster[["defaultImage"]]),
resourceProfile = cluster[["resourceProfiles"]][[1L]][["name"]],
name = "mirai_daemon",
exe = rscript,
args = c("-e", sprintf("{%s;%%s}", lp))
)
json <- list(method = "launch_job", kwparams = list(job = job))
secretbase::jsonenc(json)
}
)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.