Nothing
# mirai ------------------------------------------------------------------------
#' Daemons (Set Persistent Processes)
#'
#' Set daemons, or persistent background processes, to receive [mirai()]
#' requests. Specify `n` to create daemons on the local machine. Specify `url`
#' to receive connections from remote daemons (for distributed computing across
#' the network). Specify `remote` to optionally launch remote daemons via a
#' remote configuration. Dispatcher (enabled by default) ensures optimal
#' scheduling.
#'
#' Use `daemons(0)` to reset daemon connections:
#' \itemize{
#' \item All connected daemons and/or dispatchers exit automatically.
#' \item Any as yet unresolved 'mirai' will return an 'errorValue' 19
#' (Connection reset).
#' \item [mirai()] reverts to the default behaviour of creating a new
#' background process for each request.
#' }
#'
#' If the host session ends, all connected dispatcher and daemon processes
#' automatically exit as soon as their connections are dropped.
#'
#' Calling [daemons()] implicitly resets any existing daemons for the compute
#' profile with `daemons(0)`. Instead, [launch_local()] or [launch_remote()] may
#' be used to add daemons at any time without resetting daemons.
#'
#' @inheritParams mirai
#' @param n (integer) number of daemons to launch.
#' @param url (character) URL at which to listen for daemon connections, e.g.
#' 'tcp://hostname:5555'. Use scheme 'tls+tcp://' for secure TLS connections
#' (see Distributed Computing section). [host_url()] may be used to construct
#' a valid URL.
#' @param remote (configuration) for launching remote daemons, generated by
#' [ssh_config()], [cluster_config()], or [remote_config()].
#' @param dispatcher (logical) whether to use dispatcher for optimal FIFO
#' scheduling. See Dispatcher section below.
#' @param ... (daemon arguments) passed to [daemon()] when launching daemons.
#' Includes `asyncdial`, `autoexit`, `cleanup`, `output`, `maxtasks`,
#' `idletime`, `walltime`, and `tlscert`.
#' @param sync (logical) whether to evaluate mirai synchronously in the current
#' process for testing and debugging. When `TRUE`, other arguments except
#' `seed` and `.compute` are disregarded.
#' @param seed (integer) for reproducible random number generation. `NULL`
#' (default) initializes L'Ecuyer-CMRG RNG streams per daemon (statistically
#' sound, non-reproducible). An integer value instead initializes a stream per
#' mirai (experimental), allowing reproducible results independent of which
#' daemon evaluates it.
#' @param serial (configuration) for custom serialization of reference objects
#' (e.g. Arrow Tables, torch tensors), created by [serial_config()]. Requires
#' dispatcher. `NULL` applies any configurations from [register_serial()].
#' @param tls (character) for secure TLS connections. Either a file path to
#' PEM-encoded TLS certificate (possibly followed by other certificates in a
#' validation chain) and private key, or a length-2 vector of (certificate,
#' private key). `NULL` auto-generates single-use credentials.
#' @param pass (function) returning the password for an encrypted `tls` private
#' key. Use a function rather than a direct value for security.
#'
#' @return Invisibly, logical `TRUE` when creating daemons and `FALSE` when
#' resetting.
#'
#' @section Local Daemons:
#'
#' Setting daemons, or persistent background processes, is typically more
#' efficient as it removes the need for, and overhead of, creating new processes
#' for each mirai evaluation. It also provides control over the total number of
#' processes at any one time.
#'
#' Supply the argument `n` to set the number of daemons. New background
#' [daemon()] processes are automatically launched on the local machine
#' connecting back to the host process, either directly or via dispatcher.
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#'
#' With `dispatcher = FALSE`, daemons connect directly to the host and tasks
#' are distributed round-robin, with tasks queued at each daemon. Optimal
#' scheduling is not guaranteed, as tasks can queue at one daemon while others
#' remain idle. However, this is the most lightweight option, suited to
#' similar-length tasks or when concurrent tasks do not exceed available
#' daemons.
#'
#' @section Distributed Computing:
#'
#' Specify `url` as a character string to allow tasks to be distributed across
#' the network (`n` is only required in this case if also providing a launch
#' configuration to `remote`).
#'
#' The host / dispatcher listens at this URL, utilising a single port, and
#' [daemon()] processes dial in to this URL. Host / dispatcher automatically
#' adjusts to the number of daemons actually connected, allowing dynamic
#' upscaling / downscaling.
#'
#' The URL should have a 'tcp://' scheme, such as 'tcp://10.75.32.70:5555'.
#' Switching the URL scheme to 'tls+tcp://' automatically upgrades the
#' connection to use TLS. The auxiliary function [host_url()] may be used to
#' construct a valid host URL based on the computer's IP address.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' `[]` to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified as
#' 'tcp://\[::1\]:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://\[::1\]:0'
#' will automatically assign a free ephemeral port. Use [status()] to inspect
#' the actual assigned port at any time.
#'
#' Specify `remote` with a call to [ssh_config()], [cluster_config()] or
#' [remote_config()] to launch (programatically deploy) daemons on remote
#' machines, from where they dial back to `url`. If not launching daemons,
#' [launch_remote()] may be used to generate the shell commands for manual
#' deployment.
#'
#' @section Compute Profiles:
#'
#' If `NULL`, the `"default"` compute profile is used. Providing a character
#' value for `.compute` creates a new compute profile with the name specified.
#' Each compute profile retains its own daemons settings and operates
#' independently. Some usage examples follow:
#'
#' **local / remote** daemons may be set by specifying a host URL and
#' `.compute` as `"remote"`, creating a new compute profile. Subsequent
#' [mirai()] calls may then be sent for local computation by not specifying the
#' `.compute` argument, or for remote computation to connected daemons by
#' specifying the `.compute` argument as `"remote"`.
#'
#' **cpu / gpu** some tasks may require access to different types of daemon,
#' such as those with GPUs. In this case, `daemons()` may be called to set up
#' host URLs for CPU-only daemons and for those with GPUs, specifying the
#' `.compute` argument as `"cpu"` and `"gpu"` respectively. By supplying the
#' `.compute` argument to subsequent [mirai()] calls, tasks may be sent to
#' either `cpu` or `gpu` daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via `daemons(0)` should
#' be carried out with the desired `.compute` argument specified.
#'
#' @seealso [with_daemons()] and [local_daemons()] for managing the compute
#' profile used locally.
#'
#' @examplesIf interactive()
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Set up dispatcher accepting TLS over TCP connections
#' daemons(url = host_url(tls = TRUE))
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Use with() to evaluate with daemons for the duration of the expression
#' with(
#' daemons(2),
#' {
#' m1 <- mirai(Sys.getpid())
#' m2 <- mirai(Sys.getpid())
#' cat(m1[], m2[], "\n")
#' }
#' )
#'
#' \dontrun{
#'
#' # Launch daemons on remotes 'nodeone' and 'nodetwo' using SSH
#' # connecting back directly to the host URL over a TLS connection:
#' daemons(
#' url = host_url(tls = TRUE),
#' remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo'))
#' )
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#' n = 4,
#' url = local_url(tcp = TRUE),
#' remote = ssh_config('ssh://10.75.32.90', tunnel = TRUE)
#' )
#'
#' }
#'
#' @examples
#' # Synchronous mode
#' # mirai are run in the current process - useful for testing and debugging
#' daemons(sync = TRUE)
#' m <- mirai(Sys.getpid())
#' daemons(0)
#' m[]
#'
#' # Synchronous mode restricted to a specific compute profile
#' daemons(sync = TRUE, .compute = "sync")
#' with_daemons("sync", {
#' m <- mirai(Sys.getpid())
#' })
#' daemons(0, .compute = "sync")
#' m[]
#'
#' @export
#'
daemons <- function(
n,
url = NULL,
remote = NULL,
dispatcher = TRUE,
...,
sync = FALSE,
seed = NULL,
serial = NULL,
tls = NULL,
pass = NULL,
.compute = NULL
) {
if (is.null(.compute)) {
.compute <- .[["cp"]]
}
envir <- ..[[.compute]]
if (sync) {
url <- local_url()
dispatcher <- FALSE
remote <- serial <- tls <- pass <- NULL
}
if (length(url)) {
is.character(url) || stop(sprintf(._[["character_url"]], typeof(url)))
res <- if (is.null(envir)) {
url <- url[1L]
envir <- init_envir_stream(seed)
dots <- parse_dots(envir, ...)
cfg <- configure_tls(url, tls, pass, envir)
if (dispatcher) {
launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass)
} else {
create_sock(envir, url, cfg[[2L]])
}
create_profile(envir, .compute, 1L, dots, sync)
if (length(remote)) {
withCallingHandlers(
launch_remote(n = n, remote = remote, .compute = .compute),
error = function(cnd) daemons(0, .compute = .compute)
)
}
url
}
} else {
signal <- is.null(n)
if (signal) {
n <- 0L
}
is.numeric(n) || stop(._[["numeric_n"]])
n <- as.integer(n)
n == 0L &&
{
is.null(envir) && return(invisible(FALSE))
if (signal) {
send_signal(envir)
}
reap(envir[["sock"]])
otel_span("daemons reset", envir, links = list(envir[["otel_span"]]))
..[[.compute]] <- NULL -> envir
msleep(.sleep_daemons)
return(invisible(FALSE))
}
res <- if (is.null(envir)) {
n > 0L || stop(._[["n_zero"]])
dynGet(".mirai_within_map", ifnotfound = FALSE) && stop(._[["within_map"]])
envir <- init_envir_stream(seed)
dots <- parse_dots(envir, ...)
if (dispatcher) {
launch_dispatcher(n, dots, envir, serial)
} else {
launch_daemons(seq_len(n), dots, envir)
}
create_profile(envir, .compute, n, dots, sync)
}
}
is.null(res) &&
return({
daemons(0L, .compute = .compute)
daemons(
n = n,
url = url,
remote = remote,
dispatcher = dispatcher,
...,
sync = sync,
seed = seed,
serial = serial,
tls = tls,
pass = pass,
.compute = .compute
)
})
`[[<-`(envir, "otel_span", otel_span("daemons set", envir))
invisible(`class<-`(TRUE, c("miraiDaemons", .compute)))
}
#' @export
#'
print.miraiDaemons <- function(x, ...) print(unclass(x))
#' With Mirai Daemons
#'
#' Evaluate an expression with daemons that last for the duration of the
#' expression. Ensure each mirai within the statement is explicitly called (or
#' their values collected) so that daemons are not reset before they have all
#' completed.
#'
#' This function is an S3 method for the generic [with()] for class
#' 'miraiDaemons'.
#'
#' @param data (miraiDaemons) return value from [daemons()].
#' @param expr (expression) to evaluate with daemons active.
#' @param ... unused.
#'
#' @return The return value of `expr`.
#'
#' @examplesIf interactive()
#' with(
#' daemons(2, dispatcher = FALSE),
#' {
#' m1 <- mirai(Sys.getpid())
#' m2 <- mirai(Sys.getpid())
#' cat(m1[], m2[], "\n")
#' }
#' )
#'
#' Sys.getpid()
#'
#' @export
#'
with.miraiDaemons <- function(data, expr, ...) {
prev_profile <- .[["cp"]]
`[[<-`(., "cp", class(data)[2L])
on.exit({
daemons(0L)
`[[<-`(., "cp", prev_profile)
})
expr
}
#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @param .compute (character | miraiCluster) compute profile name, or `NULL`
#' for 'default'. Also accepts a 'miraiCluster'.
#'
#' @return A named list comprising:
#' \itemize{
#' \item **connections** - integer number of active daemon connections.
#' \item **daemons** - character URL at which host / dispatcher is
#' listening, or else `0L` if daemons have not yet been set.
#' \item **mirai** (present only if using dispatcher) - a named integer
#' vector comprising: **awaiting** - number of tasks queued for execution at
#' dispatcher, **executing** - number of tasks sent to a daemon for
#' execution, and **completed** - number of tasks for which the result has
#' been received (either completed or cancelled).
#' }
#'
#' @seealso [info()] for more succinct information statistics.
#'
#' @keywords internal
#' @export
#'
status <- function(.compute = NULL) {
is.list(.compute) && return(status(attr(.compute, "id")))
envir <- compute_env(.compute)
is.null(envir) && return(list(connections = 0L, daemons = 0L))
is.null(envir[["dispatcher"]]) || return(dispatcher_status(envir))
list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["url"]])
}
#' Information Statistics
#'
#' Retrieve statistics for the specified compute profile.
#'
#' The returned statistics are:
#'
#' - Connections: active daemon connections.
#' - Cumulative: total daemons that have ever connected.
#' - Awaiting: mirai tasks currently queued for execution at dispatcher.
#' - Executing: mirai tasks currently being evaluated on a daemon.
#' - Completed: mirai tasks that have been completed or cancelled.
#'
#' For non-dispatcher daemons: only 'connections' will be available and the
#' other values will be `NA`.
#'
#' @inheritParams mirai
#'
#' @return Named integer vector or else `NULL` if the compute profile is yet to
#' be set up.
#'
#' @examples
#' info()
#' daemons(sync = TRUE)
#' info()
#' daemons(0)
#'
#' @export
#'
info <- function(.compute = NULL) {
envir <- compute_env(.compute)
is.null(envir) && return()
if (is.null(envir[["dispatcher"]])) {
res <- c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
} else {
res <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(res) && return()
}
`names<-`(res, c("connections", "cumulative", "awaiting", "executing", "completed"))
}
#' Query if Daemons are Set
#'
#' Returns a logical value, whether or not daemons have been set for a given
#' compute profile.
#'
#' @inheritParams mirai
#'
#' @return Logical `TRUE` or `FALSE`.
#'
#' @examples
#' daemons_set()
#' daemons(sync = TRUE)
#' daemons_set()
#' daemons(0)
#'
#' @export
#'
daemons_set <- function(.compute = NULL) !is.null(compute_env(.compute))
#' Require Daemons
#'
#' Returns `TRUE` invisibly only if daemons are set, otherwise produces an
#' informative error for the user to set daemons, with a clickable function link
#' if the \pkg{cli} package is available.
#'
#' @inheritParams mirai
#' @param call (environment) execution environment for error attribution, e.g.
#' `environment()`. Used by \pkg{cli} for error messages.
#'
#' @return Invisibly, logical `TRUE`, or else errors.
#'
#' @examples
#' daemons(sync = TRUE)
#' (require_daemons())
#' daemons(0)
#'
#' @export
#'
require_daemons <- function(.compute = NULL, call = environment()) {
invisible(daemons_set(.compute = .compute) || stop_d(.compute, call))
}
#' With Daemons
#'
#' Evaluate an expression using a specific compute profile.
#'
#' Will error if the specified compute profile is not yet set up.
#'
#' @inheritParams require_daemons
#' @param expr (expression) to evaluate using the compute profile.
#'
#' @return For **with_daemons**: the return value of `expr`. \cr
#' For **local_daemons**: invisible NULL.
#'
#' @examplesIf interactive()
#' daemons(1, dispatcher = FALSE, .compute = "cpu")
#' daemons(1, dispatcher = FALSE, .compute = "gpu")
#'
#' with_daemons("cpu", {
#' m1 <- mirai(Sys.getpid())
#' })
#'
#' with_daemons("gpu", {
#' m2 <- mirai(Sys.getpid())
#' m3 <- mirai(Sys.getpid(), .compute = "cpu")
#' local_daemons("cpu")
#' m4 <- mirai(Sys.getpid())
#' })
#'
#' m1[]
#'
#' m2[] # different to m1
#'
#' m3[] # same as m1
#' m4[] # same as m1
#'
#' with_daemons("cpu", daemons(0))
#' with_daemons("gpu", daemons(0))
#'
#' @export
#'
with_daemons <- function(.compute, expr) {
require_daemons(.compute = .compute, call = environment())
prev_profile <- .[["cp"]]
`[[<-`(., "cp", .compute)
on.exit(`[[<-`(., "cp", prev_profile))
expr
}
#' @param frame (environment) scope for the compute profile setting.
#'
#' @rdname with_daemons
#' @export
#'
local_daemons <- function(.compute, frame = parent.frame()) {
require_daemons(.compute = .compute, call = frame)
prev_profile <- .[["cp"]]
`[[<-`(., "cp", .compute)
defer(`[[<-`(., "cp", prev_profile), envir = frame)
}
#' Create Serialization Configuration
#'
#' Returns a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once set by passing to the `serial` argument of [daemons()], the functions
#' apply to all mirai requests for that compute profile.
#'
#' This feature utilises the 'refhook' system of R native serialization.
#'
#' @param class (character) class name(s) for custom serialization, e.g.
#' `'ArrowTabular'` or `c('torch_tensor', 'ArrowTabular')`.
#' @param sfunc (function | list) serialization function(s) accepting a
#' reference object and returning a raw vector.
#' @param ufunc (function | list) unserialization function(s) accepting a raw
#' vector and returning a reference object.
#'
#' @return A list comprising the configuration. This should be passed to the
#' `serial` argument of [daemons()].
#'
#' @examples
#' cfg <- serial_config("test_cls", function(x) serialize(x, NULL), unserialize)
#' cfg
#'
#' cfg2 <- serial_config(
#' c("class_one", "class_two"),
#' list(function(x) serialize(x, NULL), function(x) serialize(x, NULL)),
#' list(unserialize, unserialize)
#' )
#' cfg2
#'
#' @export
#'
serial_config <- serial_config
#' Register Serialization Configuration
#'
#' Registers a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once registered, the functions apply to all [daemons()] calls where the
#' `serial` argument is `NULL`.
#'
#' @inheritParams serial_config
#'
#' @return Invisible NULL.
#'
#' @export
#'
register_serial <- function(class, sfunc, ufunc) {
cfg <- serial_config(class, sfunc, ufunc)
reg <- .[["serial"]]
`[[<-`(., "serial", lapply(seq_along(cfg), function(i) c(reg[[i]], cfg[[i]])))
invisible()
}
# internals --------------------------------------------------------------------
# Simplified version of withr standalone `defer()`
defer <- function(expr, envir) {
thunk <- as.call(list(function() expr))
do.call(on.exit, list(thunk, add = TRUE, after = FALSE), envir = envir)
}
compute_env <- function(x) ..[[if (is.null(x)) .[["cp"]] else x]]
configure_tls <- function(url, tls, pass, envir) {
purl <- parse_url(url)
sch <- purl[["scheme"]]
if ((startsWith(sch, "tls") || startsWith(sch, "wss")) && is.null(tls)) {
cert <- write_cert(cn = purl[["hostname"]])
`[[<-`(envir, "tls", cert[["client"]])
tls <- cert[["server"]]
}
list(tls, if (length(tls)) tls_config(server = tls, pass = pass))
}
create_profile <- function(envir, .compute, n, dots, sync) {
`[[<-`(envir, "n", n)
`[[<-`(envir, "dots", dots)
`[[<-`(envir, "sync", sync)
`[[<-`(envir, "compute", .compute)
`[[<-`(.., .compute, envir)
}
init_envir_stream <- function(seed) {
.advance()
oseed <- globalenv()[[".Random.seed"]]
on.exit(`[[<-`(globalenv(), ".Random.seed", oseed))
RNGkind("L'Ecuyer-CMRG")
if (length(seed)) {
set.seed(seed)
}
envir <- new.env(hash = FALSE, parent = ..)
`[[<-`(envir, "stream", globalenv()[[".Random.seed"]])
`[[<-`(envir, "seed", seed)
}
req_socket <- function(url, tls = NULL) {
`opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", 0L)
}
parse_dots <- function(envir, ...) {
...length() || return("")
dots <- list(...)
if (any(names(dots) == "tlscert")) {
`[[<-`(envir, "tls", dots[["tlscert"]])
}
dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))]
length(dots) || return("")
sprintf(",%s", paste(names(dots), dots, sep = "=", collapse = ","))
}
parse_tls <- function(tls) {
switch(
length(tls) + 1L,
"",
sprintf(",tlscert=\"%s\"", tls),
sprintf(",tlscert=c(\"%s\",\"%s\")", tls[1L], tls[2L])
)
}
libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]
args_daemon_direct <- function(url, dots, rs, tls = NULL) {
sprintf(
"mirai::daemon(\"%s\",dispatcher=FALSE%s%s,rs=c(%s))",
url,
dots,
parse_tls(tls),
paste0(rs, collapse = ",")
)
}
args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls))
}
args_dispatcher <- function(urld, url, n) {
sprintf(".libPaths(\"%s\");mirai::dispatcher(\"%s\",url=\"%s\",n=%d)", libp(), urld, url, n)
}
launch_daemon <- function(args) system2(.command, args = c("-e", shQuote(args)), wait = FALSE)
query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
r <- send(sock, command, mode = send_mode, block = block)
r && return(r)
recv(sock, mode = recv_mode, block = block)
}
sync_with <- function(cv, message_key, sync = 0L) {
while (!until(cv, .limit_long)) {
message(sprintf(._[[message_key]], sync <- sync + .limit_long_secs))
}
}
launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
pipe_notify(sock, cv, add = TRUE)
local <- is.numeric(url)
n <- if (local) url else 0L
if (local) {
url <- local_url()
}
system2(
.command,
args = c("--default-packages=NULL", "--vanilla", "-e", shQuote(args_dispatcher(urld, url, n))),
wait = FALSE
)
if (is.null(serial)) {
serial <- .[["serial"]]
}
if (is.list(serial)) {
`opt<-`(sock, "serial", serial)
}
`[[<-`(envir, "cv", cv)
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "dispatcher", urld)
data <- list(Sys.getenv("R_DEFAULT_PACKAGES"), tls, pass, serial, envir[["stream"]])
sync_with(cv, "sync_dispatcher")
pipe_notify(sock, NULL, add = TRUE)
send(sock, data, mode = 1L, block = TRUE)
if (local) {
launch_args <- args_daemon_disp(url, dots)
for (i in seq_len(n)) {
launch_daemon(launch_args)
}
}
raio <- recv_aio(sock, mode = 2L, cv = cv)
sync_with(cv, "sync_dispatcher")
`[[<-`(envir, "url", collect_aio(raio))
}
launch_daemons <- function(seq, dots, envir) {
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
pipe_notify(sock, cv, add = TRUE)
for (i in seq) {
launch_daemon(args_daemon_direct(urld, dots, maybe_next_stream(envir)))
}
`[[<-`(envir, "cv", cv)
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "url", urld)
for (i in seq) {
sync_with(cv, "sync_daemons")
}
pipe_notify(sock, NULL, add = TRUE)
}
create_sock <- function(envir, url, tls) {
sock <- req_socket(url, tls = tls)
`[[<-`(envir, "sock", sock)
`[[<-`(envir, "url", attr(attr(sock, "listener")[[1L]], "url"))
}
send_signal <- function(envir) {
signals <- if (is.null(envir[["dispatcher"]])) {
stat(envir[["sock"]], "pipes")
} else {
query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
}
for (i in seq_len(signals)) {
send(envir[["sock"]], ._scm_., mode = 2L)
msleep(.sleep_signal)
}
}
dispatcher_status <- function(envir) {
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(status) && return(status)
list(
connections = status[1L],
daemons = envir[["url"]],
mirai = c(awaiting = status[3L], executing = status[4L], completed = status[5L])
)
}
stop_d <- function(.compute, call) {
profile <- is.character(.compute)
msg <- if (profile) {
sprintf("No daemons set for the '%s' compute profile.", .compute)
} else {
"No daemons set."
}
try <- if (profile) {
sprintf("mirai::daemons(6, .compute = \"%s\")", .compute)
} else {
"mirai::daemons(6)"
}
cli_enabled || stop(sprintf("%s\nUse e.g. %s to set 6 local daemons.", msg, try), call. = FALSE)
cli::cli_abort(c(msg, sprintf("Use e.g. {.run %s} to set 6 local daemons.", try)), call = call)
}
._scm_. <- as.raw(c(
0x42,
0x0a,
0x03,
0x00,
0x00,
0x00,
0x02,
0x03,
0x04,
0x00,
0x00,
0x05,
0x03,
0x00,
0x05,
0x00,
0x00,
0x00,
0x55,
0x54,
0x46,
0x2d,
0x38,
0xfc,
0x00,
0x00,
0x00
))
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.