Nothing
# mirai ------------------------------------------------------------------------
#' Daemons (Set Persistent Processes)
#'
#' Set daemons, or persistent background processes, to receive [mirai()]
#' requests. Specify `n` to create daemons on the local machine. Specify `url`
#' to receive connections from remote daemons (for distributed computing across
#' the network). Specify `remote` to optionally launch remote daemons via a
#' remote configuration. Dispatcher (enabled by default) ensures optimal
#' scheduling.
#'
#' Use `daemons(0)` to reset daemon connections:
#' \itemize{
#' \item All connected daemons and/or dispatchers exit automatically.
#' \item Any as yet unresolved 'mirai' will return an 'errorValue' 19
#' (Connection reset).
#' \item [mirai()] reverts to the default behaviour of creating a new
#' background process for each request.
#' }
#'
#' If the host session ends, all connected dispatcher and daemon processes
#' automatically exit as soon as their connections are dropped.
#'
#' Calling [daemons()] implicitly resets any existing daemons for the compute
#' profile with `daemons(0)`. Instead, [launch_local()] or [launch_remote()] may
#' be used to add daemons at any time without resetting daemons.
#'
#' @inheritParams mirai
#' @param n integer number of daemons to launch.
#' @param url \[default NULL\] if specified, a character string comprising a URL
#' at which to listen for remote daemons, including a port accepting incoming
#' connections, e.g. 'tcp://hostname:5555' or 'tcp://10.75.32.70:5555'.
#' Specify a URL with scheme 'tls+tcp://' to use secure TLS connections (for
#' details see Distributed Computing section below). Auxiliary function
#' [host_url()] may be used to construct a valid host URL.
#' @param remote \[default NULL\] required only for launching remote daemons, a
#' configuration generated by [ssh_config()], [cluster_config()], or
#' [remote_config()].
#' @param dispatcher \[default TRUE\] logical value, whether to use dispatcher.
#' Dispatcher runs in a separate process to ensure optimal scheduling,
#' and should normally be kept on (for details see Dispatcher section below).
#' @param ... (optional) additional arguments passed through to [daemon()] if
#' launching daemons. These include `asyncdial`, `autoexit`, `cleanup`,
#' `output`, `maxtasks`, `idletime`, `walltime` and `tlscert`.
#' @param seed \[default NULL\] (optional) The default of `NULL` initializes
#' L'Ecuyer-CMRG RNG streams for each daemon, the same as base R's parallel
#' package. Results are statistically-sound, although generally
#' non-reproducible, as which tasks are sent to which daemons may be
#' non-deterministic, and also depends on the number of daemons.
#' \cr (experimental) supply an integer value to instead initialize a
#' L'Ecuyer-CMRG RNG stream for the compute profile. This is advanced for each
#' mirai evaluation, hence allowing for reproducible results, as the random
#' seed is always associated with a given mirai, independently of where it is
#' evaluated.
#' @param serial \[default NULL\] (optional, requires dispatcher) a
#' configuration created by [serial_config()] to register serialization and
#' unserialization functions for normally non-exportable reference objects,
#' such as Arrow Tables or torch tensors. If `NULL`, configurations registered
#' with [register_serial()] are automatically applied.
#' @param tls \[default NULL\] (optional for secure TLS connections) if not
#' supplied, zero-configuration single-use keys and certificates are
#' automatically generated. If supplied, **either** the character path to
#' a file containing the PEM-encoded TLS certificate and associated private
#' key (may contain additional certificates leading to a validation chain,
#' with the TLS certificate first), **or** a length 2 character vector
#' comprising \[i\] the TLS certificate (optionally certificate chain) and
#' \[ii\] the associated private key.
#' @param pass \[default NULL\] (required only if the private key supplied to
#' `tls` is encrypted with a password) For security, should be provided
#' through a function that returns this value, rather than directly.
#'
#' @return Invisibly, logical `TRUE` when creating daemons and `FALSE` when
#' resetting.
#'
#' @section Local Daemons:
#'
#' Setting daemons, or persistent background processes, is typically more
#' efficient as it removes the need for, and overhead of, creating new processes
#' for each mirai evaluation. It also provides control over the total number of
#' processes at any one time.
#'
#' Supply the argument `n` to set the number of daemons. New background
#' [daemon()] processes are automatically launched on the local machine
#' connecting back to the host process, either directly or via dispatcher.
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#'
#' Specifying `dispatcher = FALSE`, daemons connect directly to the host and
#' tasks are distributed in a round-robin fashion, with tasks queued at each
#' daemon. Optimal scheduling is not guaranteed as, depending on the duration of
#' tasks, they can be queued at one daemon while others remain idle. However,
#' this solution is the most resource-light, and suited to similar-length tasks,
#' or where concurrent tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
#' Specify `url` as a character string to allow tasks to be distributed across
#' the network (`n` is only required in this case if also providing a launch
#' configuration to `remote`).
#'
#' The host / dispatcher listens at this URL, utilising a single port, and
#' [daemon()] processes dial in to this URL. Host / dispatcher automatically
#' adjusts to the number of daemons actually connected, allowing dynamic
#' upscaling / downscaling.
#'
#' The URL should have a 'tcp://' scheme, such as 'tcp://10.75.32.70:5555'.
#' Switching the URL scheme to 'tls+tcp://' automatically upgrades the
#' connection to use TLS. The auxiliary function [host_url()] may be used to
#' construct a valid host URL based on the computer's IP address.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' `[]` to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified as
#' 'tcp://\[::1\]:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://\[::1\]:0'
#' will automatically assign a free ephemeral port. Use [status()] to inspect
#' the actual assigned port at any time.
#'
#' Specify `remote` with a call to [ssh_config()], [cluster_config()] or
#' [remote_config()] to launch (programatically deploy) daemons on remote
#' machines, from where they dial back to `url`. If not launching daemons,
#' [launch_remote()] may be used to generate the shell commands for manual
#' deployment.
#'
#' @section Compute Profiles:
#'
#' If `NULL`, the `"default"` compute profile is used. Providing a character
#' value for `.compute` creates a new compute profile with the name specified.
#' Each compute profile retains its own daemons settings, and may be operated
#' independently of each other. Some usage examples follow:
#'
#' **local / remote** daemons may be set with a host URL and specifying
#' `.compute` as `"remote"`, which creates a new compute profile. Subsequent
#' [mirai()] calls may then be sent for local computation by not specifying the
#' `.compute` argument, or for remote computation to connected daemons by
#' specifying the `.compute` argument as `"remote"`.
#'
#' **cpu / gpu** some tasks may require access to different types of daemon,
#' such as those with GPUs. In this case, `daemons()` may be called to set up
#' host URLs for CPU-only daemons and for those with GPUs, specifying the
#' `.compute` argument as `"cpu"` and `"gpu"` respectively. By supplying the
#' `.compute` argument to subsequent [mirai()] calls, tasks may be sent to
#' either `cpu` or `gpu` daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via `daemons(0)` should
#' be carried out with the desired `.compute` argument specified.
#'
#' @seealso [with_daemons()] and [local_daemons()] for managing the compute
#' profile used locally.
#'
#' @examplesIf interactive()
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set up dispatcher accepting TLS over TCP connections
#' daemons(url = host_url(tls = TRUE))
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Use with() to evaluate with daemons for the duration of the expression
#' with(
#' daemons(2),
#' {
#' m1 <- mirai(Sys.getpid())
#' m2 <- mirai(Sys.getpid())
#' cat(m1[], m2[], "\n")
#' }
#' )
#'
#' \dontrun{
#'
#' # Launch daemons on remotes 'nodeone' and 'nodetwo' using SSH
#' # connecting back directly to the host URL over a TLS connection:
#' daemons(
#' url = host_url(tls = TRUE),
#' remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo'))
#' )
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#' n = 4,
#' url = local_url(tcp = TRUE),
#' remote = ssh_config('ssh://10.75.32.90', tunnel = TRUE)
#' )
#'
#' }
#'
#' @export
#'
daemons <- function(
n,
url = NULL,
remote = NULL,
dispatcher = TRUE,
...,
seed = NULL,
serial = NULL,
tls = NULL,
pass = NULL,
.compute = NULL
) {
if (is.null(.compute)) .compute <- .[["cp"]]
envir <- ..[[.compute]]
if (is.character(url)) {
res <- if (is.null(envir)) {
url <- url[1L]
envir <- init_envir_stream(seed)
dots <- parse_dots(envir, ...)
cfg <- configure_tls(url, tls, pass, envir)
if (is.character(dispatcher) && dispatcher == "none") dispatcher <- FALSE
if (dispatcher) {
launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass)
} else {
create_sock(envir, url, cfg[[2L]])
}
create_profile(envir, .compute, 0L, dots)
if (length(remote)) {
on.exit(daemons(0L, .compute = .compute))
launch_remote(n = n, remote = remote, .compute = .compute)
on.exit()
}
url
}
} else {
signal <- is.null(n)
if (signal) n <- 0L
is.numeric(n) || stop(._[["numeric_n"]])
n <- as.integer(n)
n == 0L && {
is.null(envir) && return(invisible(FALSE))
if (signal) send_signal(envir)
reap(envir[["sock"]])
if (otel_tracing) otel::end_span(envir[["otel_span"]])
..[[.compute]] <- NULL -> envir
return(invisible(FALSE))
}
res <- if (is.null(envir)) {
n > 0L || stop(._[["n_zero"]])
dynGet(".mirai_within_map", ifnotfound = FALSE) && stop(._[["within_map"]])
envir <- init_envir_stream(seed)
dots <- parse_dots(envir, ...)
if (is.character(dispatcher) && dispatcher == "none") dispatcher <- FALSE
if (dispatcher) {
launch_dispatcher(n, dots, envir, serial)
} else {
launch_daemons(seq_len(n), dots, envir)
}
create_profile(envir, .compute, n, dots)
}
}
is.null(res) && return({
daemons(0L, .compute = .compute)
daemons(
n = n,
url = url,
remote = remote,
dispatcher = dispatcher,
...,
seed = seed,
serial = serial,
tls = tls,
pass = pass,
.compute = .compute
)
})
if (otel_tracing) {
`[[<-`(
envir,
"otel_span",
otel::start_span(
"mirai::daemons",
attributes = otel::as_attributes(list(
url = envir[["url"]],
n = envir[["n"]],
dispatcher = if (is.null(envir[["dispatcher"]])) "false" else "true",
compute_profile = .compute
))
)
)
}
invisible(`class<-`(TRUE, c("miraiDaemons", .compute)))
}
#' @export
#'
print.miraiDaemons <- function(x, ...) print(unclass(x))
#' With Mirai Daemons
#'
#' Evaluate an expression with daemons that last for the duration of the
#' expression. Ensure each mirai within the statement is explicitly called (or
#' their values collected) so that daemons are not reset before they have all
#' completed.
#'
#' This function is an S3 method for the generic [with()] for class
#' 'miraiDaemons'.
#'
#' @param data a call to [daemons()].
#' @param expr an expression to evaluate.
#' @param ... not used.
#'
#' @return The return value of `expr`.
#'
#' @examplesIf interactive()
#' with(
#' daemons(2, dispatcher = FALSE),
#' {
#' m1 <- mirai(Sys.getpid())
#' m2 <- mirai(Sys.getpid())
#' cat(m1[], m2[], "\n")
#' }
#' )
#'
#' status()
#'
#' @export
#'
with.miraiDaemons <- function(data, expr, ...) {
prev_profile <- .[["cp"]]
`[[<-`(., "cp", class(data)[2L])
on.exit({
daemons(0L)
`[[<-`(., "cp", prev_profile)
})
expr
}
#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @param .compute \[default NULL\] character value for the compute profile to
#' query, or NULL to query the 'default' profile.
#'
#' **or** a 'miraiCluster' to obtain its status.
#'
#' @return A named list comprising:
#' \itemize{
#' \item **connections** - integer number of active daemon connections.
#' \item **daemons** - character URL at which host / dispatcher is
#' listening, or else `0L` if daemons have not yet been set.
#' \item **mirai** (present only if using dispatcher) - a named integer
#' vector comprising: **awaiting** - number of tasks queued for execution at
#' dispatcher, **executing** - number of tasks sent to a daemon for
#' execution, and **completed** - number of tasks for which the result has
#' been received (either completed or cancelled).
#' }
#'
#' @section Events:
#'
#' If dispatcher is used combined with daemon IDs, an additional element
#' **events** will report the positive integer ID when the daemon connects and
#' the negative value when it disconnects. Only the events since the previous
#' status query are returned.
#'
#' @seealso [info()] for more succinct information statistics.
#'
#' @examplesIf interactive()
#' status()
#' daemons(url = "tcp://[::1]:0")
#' status()
#' daemons(0)
#'
#' @export
#'
status <- function(.compute = NULL) {
is.list(.compute) && return(status(attr(.compute, "id")))
envir <- compute_env(.compute)
is.null(envir) && return(list(connections = 0L, daemons = 0L))
is.null(envir[["dispatcher"]]) || return(dispatcher_status(envir))
list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["url"]])
}
#' Information Statistics
#'
#' Retrieve statistics for the specified compute profile.
#'
#' The returned statistics are:
#'
#' - Connections: active daemon connections.
#' - Cumulative: total daemons that have ever connected.
#' - Awaiting: mirai tasks currently queued for execution at dispatcher.
#' - Executing: mirai tasks currently being evaluated on a daemon.
#' - Completed: mirai tasks that have been completed or cancelled.
#'
#' For non-dispatcher daemons: only 'connections' will be available and the
#' other values will be `NA`.
#'
#' @inheritParams mirai
#'
#' @return Named integer vector or else `NULL` if the compute profile is yet to
#' be set up.
#'
#' @seealso [status()] for more verbose status information.
#'
#' @examplesIf interactive()
#' info()
#' daemons(1)
#' info()
#' daemons(0)
#'
#' @export
#'
info <- function(.compute = NULL) {
envir <- compute_env(.compute)
is.null(envir) && return()
if (is.null(envir[["dispatcher"]])) {
res <- c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
} else {
res <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(res) && return()
}
`names<-`(res, c("connections", "cumulative", "awaiting", "executing", "completed"))
}
#' Query if Daemons are Set
#'
#' Returns a logical value, whether or not daemons have been set for a given
#' compute profile.
#'
#' @inheritParams mirai
#'
#' @return Logical `TRUE` or `FALSE`.
#'
#' @examplesIf interactive()
#' daemons_set()
#' daemons(1)
#' daemons_set()
#' daemons(0)
#'
#' @export
#'
daemons_set <- function(.compute = NULL) !is.null(compute_env(.compute))
#' Require Daemons
#'
#' Returns TRUE only if daemons are set, otherwise produces an informative
#' error for the user to set daemons, with a clickable function link if the
#' \CRANpkg{cli} package is available.
#'
#' @inheritParams mirai
#' @param call (only used if the \CRANpkg{cli} package is installed) the
#' execution environment of a currently running function, e.g.
#' `environment()`. The function will be mentioned in error messages as the
#' source of the error.
#'
#' @return Logical `TRUE`, or else errors.
#'
#' @note
#' Previously the arguments were reversed with `call` coming before `.compute`.
#' Specifying an environment to the first argument works for the time being,
#' although is deprecated and will be defunct in a future version.
#'
#' @examplesIf interactive()
#' daemons(1)
#' require_daemons()
#' daemons(0)
#'
#' @export
#'
require_daemons <- function(.compute = NULL, call = environment()) {
ensure_cli_initialized()
is.environment(.compute) && {
temp <- .compute
.compute <- if (is.character(call)) call
call <- temp
TRUE
}
daemons_set(.compute = .compute) || .[["require_daemons"]](.compute, call)
}
#' With Daemons
#'
#' Evaluate an expression using a specific compute profile.
#'
#' Will error if the specified compute profile is not yet set up.
#'
#' @inheritParams require_daemons
#' @param expr an expression to evaluate.
#'
#' @return For **with_daemons**: the return value of `expr`. \cr
#' For **local_daemons**: invisible NULL.
#'
#' @examplesIf interactive()
#' daemons(1, dispatcher = FALSE, .compute = "cpu")
#' daemons(1, dispatcher = FALSE, .compute = "gpu")
#'
#' with_daemons("cpu", {
#' s1 <- status()
#' m1 <- mirai(Sys.getpid())
#' })
#'
#' with_daemons("gpu", {
#' s2 <- status()
#' m2 <- mirai(Sys.getpid())
#' m3 <- mirai(Sys.getpid(), .compute = "cpu")
#' local_daemons("cpu")
#' m4 <- mirai(Sys.getpid())
#' })
#'
#' s1$daemons
#' m1[]
#'
#' s2$daemons
#' m2[] # different to m1
#'
#' m3[] # same as m1
#' m4[] # same as m1
#'
#' with_daemons("cpu", daemons(0))
#' with_daemons("gpu", daemons(0))
#'
#' @export
#'
with_daemons <- function(.compute, expr) {
require_daemons(.compute = .compute, call = environment())
prev_profile <- .[["cp"]]
`[[<-`(., "cp", .compute)
on.exit(`[[<-`(., "cp", prev_profile))
expr
}
#' @param frame \[default parent.frame()\] the frame (environment) to which the
#' daemons compute profile is scoped.
#'
#' @rdname with_daemons
#' @export
#'
local_daemons <- function(.compute, frame = parent.frame()) {
require_daemons(.compute = .compute, call = frame)
prev_profile <- .[["cp"]]
`[[<-`(., "cp", .compute)
expr <- as.call(list(function() `[[<-`(., "cp", prev_profile)))
do.call(on.exit, list(expr, TRUE, FALSE), envir = frame)
}
#' Create Serialization Configuration
#'
#' Returns a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once set by passing to the `serial` argument of [daemons()], the functions
#' apply to all mirai requests for that compute profile.
#'
#' This feature utilises the 'refhook' system of R native serialization.
#'
#' @param class a character string (or vector) of the class of object custom
#' serialization functions are applied to, e.g. `'ArrowTabular'` or
#' `c('torch_tensor', 'ArrowTabular')`.
#' @param sfunc a function (or list of functions) that accepts a reference
#' object inheriting from `class` and returns a raw vector.
#' @param ufunc a function (or list of functions) that accepts a raw vector and
#' returns a reference object.
#'
#' @return A list comprising the configuration. This should be passed to the
#' `serial` argument of [daemons()].
#'
#' @examples
#' cfg <- serial_config("test_cls", function(x) serialize(x, NULL), unserialize)
#' cfg
#'
#' cfg2 <- serial_config(
#' c("class_one", "class_two"),
#' list(function(x) serialize(x, NULL), function(x) serialize(x, NULL)),
#' list(unserialize, unserialize)
#' )
#' cfg2
#'
#' @export
#'
serial_config <- serial_config
#' Register Serialization Configuration
#'
#' Registers a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once registered, the functions apply to all [daemons()] calls where the
#' `serial` argument is `NULL`.
#'
#' @inheritParams serial_config
#'
#' @return Invisible NULL.
#'
#' @export
#'
register_serial <- function(class, sfunc, ufunc) {
cfg <- serial_config(class, sfunc, ufunc)
reg <- .[["serial"]]
`[[<-`(., "serial", lapply(seq_along(cfg), function(i) c(reg[[i]], cfg[[i]])))
invisible()
}
# internals --------------------------------------------------------------------
compute_env <- function(x) ..[[if (is.null(x)) .[["cp"]] else x]]
configure_tls <- function(url, tls, pass, envir, config = TRUE) {
purl <- parse_url(url)
sch <- purl[["scheme"]]
if ((startsWith(sch, "wss") || startsWith(sch, "tls")) && is.null(tls)) {
cert <- write_cert(cn = purl[["hostname"]])
`[[<-`(envir, "tls", cert[["client"]])
tls <- cert[["server"]]
}
cfg <- if (length(tls)) tls_config(server = tls, pass = pass)
list(tls, cfg)
}
create_profile <- function(envir, .compute, n, dots) {
`[[<-`(envir, "n", n)
`[[<-`(envir, "dots", dots)
`[[<-`(.., .compute, envir)
}
init_envir_stream <- function(seed) {
.advance()
oseed <- .GlobalEnv[[".Random.seed"]]
RNGkind("L'Ecuyer-CMRG")
if (length(seed)) set.seed(seed)
envir <- new.env(hash = FALSE, parent = ..)
`[[<-`(envir, "stream", .GlobalEnv[[".Random.seed"]])
`[[<-`(.GlobalEnv, ".Random.seed", oseed)
`[[<-`(envir, "seed", seed)
}
req_socket <- function(url, tls = NULL) {
`opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", 0L)
}
parse_dots <- function(envir, ...) {
...length() || return("")
dots <- list(...)
if (any(names(dots) == "tlscert")) {
`[[<-`(envir, "tls", dots[["tlscert"]])
}
dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))]
length(dots) || return("")
sprintf(",%s", paste(names(dots), dots, sep = "=", collapse = ","))
}
parse_tls <- function(tls) {
switch(
length(tls) + 1L,
"",
sprintf(",tlscert=\"%s\"", tls),
sprintf(",tlscert=c(\"%s\",\"%s\")", tls[1L], tls[2L])
)
}
libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]
args_daemon_direct <- function(url, dots, rs, tls = NULL) {
shQuote(sprintf(
"mirai::daemon(\"%s\",dispatcher=FALSE%s%s,rs=c(%s))",
url,
dots,
parse_tls(tls),
paste0(rs, collapse = ",")
))
}
args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
shQuote(sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls)))
}
args_dispatcher <- function(urld, url, n) {
shQuote(sprintf(
".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\",n=%d)",
libp(),
urld,
url,
n
))
}
launch_daemon <- function(args) system2(.command, args = c("-e", args), wait = FALSE)
query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
r <- send(sock, command, mode = send_mode, block = block)
r && return(r)
recv(sock, mode = recv_mode, block = block)
}
launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
pipe_notify(sock, cv, add = TRUE)
local <- is.numeric(url)
n <- if (local) url else 0L
if (local) url <- local_url()
system2(
.command,
args = c("--default-packages=NULL", "--vanilla", "-e", args_dispatcher(urld, url, n)),
wait = FALSE
)
if (is.null(serial)) serial <- .[["serial"]]
if (is.list(serial)) `opt<-`(sock, "serial", serial)
`[[<-`(envir, "cv", cv)
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "dispatcher", urld)
data <- list(Sys.getenv("R_DEFAULT_PACKAGES"), tls, pass, serial, envir[["stream"]])
sync <- 0L
while(!until(cv, .limit_long))
message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
pipe_notify(sock, NULL, add = TRUE)
send(sock, data, mode = 1L, block = TRUE)
if (local) {
launch_args <- args_daemon_disp(url, dots)
for (i in seq_len(n)) {
launch_daemon(launch_args)
}
}
req <- recv_aio(sock, mode = 2L, cv = cv)
while(!until(cv, .limit_long))
message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
res <- collect_aio(req)
`[[<-`(envir, "pid", as.integer(res[1L]))
`[[<-`(envir, "url", res[2L])
}
launch_daemons <- function(seq, dots, envir) {
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
pipe_notify(sock, cv, add = TRUE)
for (i in seq) {
launch_daemon(args_daemon_direct(urld, dots, maybe_next_stream(envir)))
}
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "url", urld)
sync <- 0L
for (i in seq)
while(!until(cv, .limit_long))
message(sprintf(._[["sync_daemons"]], sync <- sync + .limit_long_secs))
pipe_notify(sock, NULL, add = TRUE)
}
sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)
create_sock <- function(envir, url, tls) {
sock <- req_socket(url, tls = tls)
listener <- attr(sock, "listener")[[1L]]
url <- opt(listener, "url")
if (parse_url(url)[["port"]] == "0") {
url <- sub_real_port(opt(listener, "tcp-bound-port"), url)
}
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "url", url)
}
send_signal <- function(envir) {
signals <- if (is.null(envir[["dispatcher"]])) {
stat(envir[["sock"]], "pipes")
} else {
query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
}
for (i in seq_len(signals)) {
send(envir[["sock"]], ._scm_., mode = 2L)
msleep(10L)
}
}
dispatcher_status <- function(envir) {
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(status) && return(status)
out <- list(
connections = status[1L],
daemons = envir[["url"]],
mirai = c(
awaiting = status[3L],
executing = status[4L],
completed = status[5L]
)
)
if (length(status) > 5L) {
out <- c(out, list(events = status[6:length(status)]))
}
out
}
stop_d_cli <- function(.compute, call) {
cli::cli_abort(
if (is.character(.compute)) c(
sprintf("No daemons set for the '%s' compute profile.", .compute),
sprintf("Use e.g. {.run mirai::daemons(6, .compute = \"%s\")} to set 6 local daemons.", .compute)
) else c(
"No daemons set.",
"Use e.g. {.run mirai::daemons(6)} to set 6 local daemons."
),
call = call
)
}
stop_d <- function(.compute, call) {
stop(
if (is.character(.compute)) {
sprintf("No daemons set for the '%1$s' compute profile.\nUse e.g. mirai::daemons(6, .compute = \"%1$s\") to set 6 local daemons.", .compute)
} else {
"No daemons set.\nUse e.g. mirai::daemons(6) to set 6 local daemons."
},
call. = FALSE
)
}
._scm_. <- as.raw(c(0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.