R/daemons.R

Defines functions stop_d dispatcher_status send_signal create_sock launch_daemons launch_dispatcher sync_with query_dispatcher launch_daemon args_dispatcher args_daemon_disp args_daemon_direct libp parse_tls parse_dots req_socket init_envir_stream create_profile configure_tls compute_env defer register_serial local_daemons with_daemons require_daemons daemons_set info status with.miraiDaemons print.miraiDaemons daemons

Documented in daemons daemons_set info local_daemons register_serial require_daemons status with_daemons with.miraiDaemons

# mirai ------------------------------------------------------------------------

#' Daemons (Set Persistent Processes)
#'
#' Set daemons, or persistent background processes, to receive [mirai()]
#' requests. Specify `n` to create daemons on the local machine. Specify `url`
#' to receive connections from remote daemons (for distributed computing across
#' the network). Specify `remote` to optionally launch remote daemons via a
#' remote configuration. Dispatcher (enabled by default) ensures optimal
#' scheduling.
#'
#' Use `daemons(0)` to reset daemon connections:
#' \itemize{
#'   \item All connected daemons and/or dispatchers exit automatically.
#'   \item Any as yet unresolved 'mirai' will return an 'errorValue' 19
#'   (Connection reset).
#'   \item [mirai()] reverts to the default behaviour of creating a new
#'   background process for each request.
#' }
#'
#' If the host session ends, all connected dispatcher and daemon processes
#' automatically exit as soon as their connections are dropped.
#'
#' Calling [daemons()] implicitly resets any existing daemons for the compute
#' profile with `daemons(0)`. Instead, [launch_local()] or [launch_remote()] may
#' be used to add daemons at any time without resetting daemons.
#'
#' @inheritParams mirai
#' @param n (integer) number of daemons to launch.
#' @param url (character) URL at which to listen for daemon connections, e.g.
#'   'tcp://hostname:5555'. Use scheme 'tls+tcp://' for secure TLS connections
#'   (see Distributed Computing section). [host_url()] may be used to construct
#'   a valid URL.
#' @param remote (configuration) for launching remote daemons, generated by
#'   [ssh_config()], [cluster_config()], or [remote_config()].
#' @param dispatcher (logical) whether to use dispatcher for optimal FIFO
#'   scheduling. See Dispatcher section below.
#' @param ... (daemon arguments) passed to [daemon()] when launching daemons.
#'   Includes `asyncdial`, `autoexit`, `cleanup`, `output`, `maxtasks`,
#'   `idletime`, `walltime`, and `tlscert`.
#' @param sync (logical) whether to evaluate mirai synchronously in the current
#'   process for testing and debugging. When `TRUE`, other arguments except
#'   `seed` and `.compute` are disregarded.
#' @param seed (integer) for reproducible random number generation. `NULL`
#'   (default) initializes L'Ecuyer-CMRG RNG streams per daemon (statistically
#'   sound, non-reproducible). An integer value instead initializes a stream per
#'   mirai (experimental), allowing reproducible results independent of which
#'   daemon evaluates it.
#' @param serial (configuration) for custom serialization of reference objects
#'   (e.g. Arrow Tables, torch tensors), created by [serial_config()]. Requires
#'   dispatcher. `NULL` applies any configurations from [register_serial()].
#' @param tls (character) for secure TLS connections. Either a file path to
#'   PEM-encoded TLS certificate (possibly followed by other certificates in a
#'   validation chain) and private key, or a length-2 vector of (certificate,
#'   private key). `NULL` auto-generates single-use credentials.
#' @param pass (function) returning the password for an encrypted `tls` private
#'   key. Use a function rather than a direct value for security.
#'
#' @return Invisibly, logical `TRUE` when creating daemons and `FALSE` when
#'   resetting.
#'
#' @section Local Daemons:
#'
#' Setting daemons, or persistent background processes, is typically more
#' efficient as it removes the need for, and overhead of, creating new processes
#' for each mirai evaluation. It also provides control over the total number of
#' processes at any one time.
#'
#' Supply the argument `n` to set the number of daemons. New background
#' [daemon()] processes are automatically launched on the local machine
#' connecting back to the host process, either directly or via dispatcher.
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#'
#' With `dispatcher = FALSE`, daemons connect directly to the host and tasks
#' are distributed round-robin, with tasks queued at each daemon. Optimal
#' scheduling is not guaranteed, as tasks can queue at one daemon while others
#' remain idle. However, this is the most lightweight option, suited to
#' similar-length tasks or when concurrent tasks do not exceed available
#' daemons.
#'
#' @section Distributed Computing:
#'
#' Specify `url` as a character string to allow tasks to be distributed across
#' the network (`n` is only required in this case if also providing a launch
#' configuration to `remote`).
#'
#' The host / dispatcher listens at this URL, utilising a single port, and
#' [daemon()] processes dial in to this URL. Host / dispatcher automatically
#' adjusts to the number of daemons actually connected, allowing dynamic
#' upscaling / downscaling.
#'
#' The URL should have a 'tcp://' scheme, such as 'tcp://10.75.32.70:5555'.
#' Switching the URL scheme to 'tls+tcp://' automatically upgrades the
#' connection to use TLS. The auxiliary function [host_url()] may be used to
#' construct a valid host URL based on the computer's IP address.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' `[]` to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified as
#' 'tcp://\[::1\]:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://\[::1\]:0'
#' will automatically assign a free ephemeral port. Use [status()] to inspect
#' the actual assigned port at any time.
#'
#' Specify `remote` with a call to [ssh_config()], [cluster_config()] or
#' [remote_config()] to launch (programatically deploy) daemons on remote
#' machines, from where they dial back to `url`. If not launching daemons,
#' [launch_remote()] may be used to generate the shell commands for manual
#' deployment.
#'
#' @section Compute Profiles:
#'
#' If `NULL`, the `"default"` compute profile is used. Providing a character
#' value for `.compute` creates a new compute profile with the name specified.
#' Each compute profile retains its own daemons settings and operates
#' independently. Some usage examples follow:
#'
#' **local / remote** daemons may be set by specifying a host URL and
#' `.compute` as `"remote"`, creating a new compute profile. Subsequent
#' [mirai()] calls may then be sent for local computation by not specifying the
#' `.compute` argument, or for remote computation to connected daemons by
#' specifying the `.compute` argument as `"remote"`.
#'
#' **cpu / gpu** some tasks may require access to different types of daemon,
#' such as those with GPUs. In this case, `daemons()` may be called to set up
#' host URLs for CPU-only daemons and for those with GPUs, specifying the
#' `.compute` argument as `"cpu"` and `"gpu"` respectively. By supplying the
#' `.compute` argument to subsequent [mirai()] calls, tasks may be sent to
#' either `cpu` or `gpu` daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via `daemons(0)` should
#' be carried out with the desired `.compute` argument specified.
#'
#' @seealso [with_daemons()] and [local_daemons()] for managing the compute
#'   profile used locally.
#'
#' @examplesIf interactive()
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Set up dispatcher accepting TLS over TCP connections
#' daemons(url = host_url(tls = TRUE))
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' info()
#' # Reset to zero
#' daemons(0)
#'
#' # Use with() to evaluate with daemons for the duration of the expression
#' with(
#'   daemons(2),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' \dontrun{
#'
#' # Launch daemons on remotes 'nodeone' and 'nodetwo' using SSH
#' # connecting back directly to the host URL over a TLS connection:
#' daemons(
#'   url = host_url(tls = TRUE),
#'   remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo'))
#' )
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#'   n = 4,
#'   url = local_url(tcp = TRUE),
#'   remote = ssh_config('ssh://10.75.32.90', tunnel = TRUE)
#' )
#'
#' }
#'
#' @examples
#' # Synchronous mode
#' # mirai are run in the current process - useful for testing and debugging
#' daemons(sync = TRUE)
#' m <- mirai(Sys.getpid())
#' daemons(0)
#' m[]
#'
#' # Synchronous mode restricted to a specific compute profile
#' daemons(sync = TRUE, .compute = "sync")
#' with_daemons("sync", {
#'   m <- mirai(Sys.getpid())
#' })
#' daemons(0, .compute = "sync")
#' m[]
#'
#' @export
#'
daemons <- function(
  n,
  url = NULL,
  remote = NULL,
  dispatcher = TRUE,
  ...,
  sync = FALSE,
  seed = NULL,
  serial = NULL,
  tls = NULL,
  pass = NULL,
  .compute = NULL
) {
  if (is.null(.compute)) {
    .compute <- .[["cp"]]
  }
  envir <- ..[[.compute]]

  if (sync) {
    url <- local_url()
    dispatcher <- FALSE
    remote <- serial <- tls <- pass <- NULL
  }

  if (length(url)) {
    is.character(url) || stop(sprintf(._[["character_url"]], typeof(url)))
    res <- if (is.null(envir)) {
      url <- url[1L]
      envir <- init_envir_stream(seed)
      dots <- parse_dots(envir, ...)
      cfg <- configure_tls(url, tls, pass, envir)
      if (dispatcher) {
        launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass)
      } else {
        create_sock(envir, url, cfg[[2L]])
      }
      create_profile(envir, .compute, 1L, dots, sync)
      if (length(remote)) {
        withCallingHandlers(
          launch_remote(n = n, remote = remote, .compute = .compute),
          error = function(cnd) daemons(0, .compute = .compute)
        )
      }
      url
    }
  } else {
    signal <- is.null(n)
    if (signal) {
      n <- 0L
    }
    is.numeric(n) || stop(._[["numeric_n"]])
    n <- as.integer(n)

    n == 0L &&
      {
        is.null(envir) && return(invisible(FALSE))

        if (signal) {
          send_signal(envir)
        }
        reap(envir[["sock"]])
        otel_span("daemons reset", envir, links = list(envir[["otel_span"]]))
        ..[[.compute]] <- NULL -> envir
        msleep(.sleep_daemons)
        return(invisible(FALSE))
      }
    res <- if (is.null(envir)) {
      n > 0L || stop(._[["n_zero"]])
      dynGet(".mirai_within_map", ifnotfound = FALSE) && stop(._[["within_map"]])
      envir <- init_envir_stream(seed)
      dots <- parse_dots(envir, ...)
      if (dispatcher) {
        launch_dispatcher(n, dots, envir, serial)
      } else {
        launch_daemons(seq_len(n), dots, envir)
      }
      create_profile(envir, .compute, n, dots, sync)
    }
  }

  is.null(res) &&
    return({
      daemons(0L, .compute = .compute)
      daemons(
        n = n,
        url = url,
        remote = remote,
        dispatcher = dispatcher,
        ...,
        sync = sync,
        seed = seed,
        serial = serial,
        tls = tls,
        pass = pass,
        .compute = .compute
      )
    })

  `[[<-`(envir, "otel_span", otel_span("daemons set", envir))

  invisible(`class<-`(TRUE, c("miraiDaemons", .compute)))
}

#' @export
#'
print.miraiDaemons <- function(x, ...) print(unclass(x))

#' With Mirai Daemons
#'
#' Evaluate an expression with daemons that last for the duration of the
#' expression. Ensure each mirai within the statement is explicitly called (or
#' their values collected) so that daemons are not reset before they have all
#' completed.
#'
#' This function is an S3 method for the generic [with()] for class
#' 'miraiDaemons'.
#'
#' @param data (miraiDaemons) return value from [daemons()].
#' @param expr (expression) to evaluate with daemons active.
#' @param ... unused.
#'
#' @return The return value of `expr`.
#'
#' @examplesIf interactive()
#' with(
#'   daemons(2, dispatcher = FALSE),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' Sys.getpid()
#'
#' @export
#'
with.miraiDaemons <- function(data, expr, ...) {
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", class(data)[2L])
  on.exit({
    daemons(0L)
    `[[<-`(., "cp", prev_profile)
  })
  expr
}

#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @param .compute (character | miraiCluster) compute profile name, or `NULL`
#'   for 'default'. Also accepts a 'miraiCluster'.
#'
#' @return A named list comprising:
#'   \itemize{
#'     \item **connections** - integer number of active daemon connections.
#'     \item **daemons** - character URL at which host / dispatcher is
#'     listening, or else `0L` if daemons have not yet been set.
#'     \item **mirai** (present only if using dispatcher) - a named integer
#'     vector comprising: **awaiting** - number of tasks queued for execution at
#'     dispatcher, **executing** - number of tasks sent to a daemon for
#'     execution, and **completed** - number of tasks for which the result has
#'     been received (either completed or cancelled).
#'   }
#'
#' @seealso [info()] for more succinct information statistics.
#'
#' @keywords internal
#' @export
#'
status <- function(.compute = NULL) {
  is.list(.compute) && return(status(attr(.compute, "id")))
  envir <- compute_env(.compute)
  is.null(envir) && return(list(connections = 0L, daemons = 0L))
  is.null(envir[["dispatcher"]]) || return(dispatcher_status(envir))
  list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["url"]])
}

#' Information Statistics
#'
#' Retrieve statistics for the specified compute profile.
#'
#' The returned statistics are:
#'
#' - Connections: active daemon connections.
#' - Cumulative: total daemons that have ever connected.
#' - Awaiting: mirai tasks currently queued for execution at dispatcher.
#' - Executing: mirai tasks currently being evaluated on a daemon.
#' - Completed: mirai tasks that have been completed or cancelled.
#'
#' For non-dispatcher daemons: only 'connections' will be available and the
#' other values will be `NA`.
#'
#' @inheritParams mirai
#'
#' @return Named integer vector or else `NULL` if the compute profile is yet to
#'   be set up.
#'
#' @examples
#' info()
#' daemons(sync = TRUE)
#' info()
#' daemons(0)
#'
#' @export
#'
info <- function(.compute = NULL) {
  envir <- compute_env(.compute)
  is.null(envir) && return()
  if (is.null(envir[["dispatcher"]])) {
    res <- c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
  } else {
    res <- query_dispatcher(envir[["sock"]], c(0L, 0L))
    is.object(res) && return()
  }
  `names<-`(res, c("connections", "cumulative", "awaiting", "executing", "completed"))
}

#' Query if Daemons are Set
#'
#' Returns a logical value, whether or not daemons have been set for a given
#' compute profile.
#'
#' @inheritParams mirai
#'
#' @return Logical `TRUE` or `FALSE`.
#'
#' @examples
#' daemons_set()
#' daemons(sync = TRUE)
#' daemons_set()
#' daemons(0)
#'
#' @export
#'
daemons_set <- function(.compute = NULL) !is.null(compute_env(.compute))

#' Require Daemons
#'
#' Returns `TRUE` invisibly only if daemons are set, otherwise produces an
#' informative error for the user to set daemons, with a clickable function link
#' if the \pkg{cli} package is available.
#'
#' @inheritParams mirai
#' @param call (environment) execution environment for error attribution, e.g.
#'   `environment()`. Used by \pkg{cli} for error messages.
#'
#' @return Invisibly, logical `TRUE`, or else errors.
#'
#' @examples
#' daemons(sync = TRUE)
#' (require_daemons())
#' daemons(0)
#'
#' @export
#'
require_daemons <- function(.compute = NULL, call = environment()) {
  invisible(daemons_set(.compute = .compute) || stop_d(.compute, call))
}

#' With Daemons
#'
#' Evaluate an expression using a specific compute profile.
#'
#' Will error if the specified compute profile is not yet set up.
#'
#' @inheritParams require_daemons
#' @param expr (expression) to evaluate using the compute profile.
#'
#' @return For **with_daemons**: the return value of `expr`. \cr
#'   For **local_daemons**: invisible NULL.
#'
#' @examplesIf interactive()
#' daemons(1, dispatcher = FALSE, .compute = "cpu")
#' daemons(1, dispatcher = FALSE, .compute = "gpu")
#'
#' with_daemons("cpu", {
#'   m1 <- mirai(Sys.getpid())
#' })
#'
#' with_daemons("gpu", {
#'   m2 <- mirai(Sys.getpid())
#'   m3 <- mirai(Sys.getpid(), .compute = "cpu")
#'   local_daemons("cpu")
#'   m4 <- mirai(Sys.getpid())
#' })
#'
#' m1[]
#'
#' m2[] # different to m1
#'
#' m3[] # same as m1
#' m4[] # same as m1
#'
#' with_daemons("cpu", daemons(0))
#' with_daemons("gpu", daemons(0))
#'
#' @export
#'
with_daemons <- function(.compute, expr) {
  require_daemons(.compute = .compute, call = environment())
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", .compute)
  on.exit(`[[<-`(., "cp", prev_profile))
  expr
}

#' @param frame (environment) scope for the compute profile setting.
#'
#' @rdname with_daemons
#' @export
#'
local_daemons <- function(.compute, frame = parent.frame()) {
  require_daemons(.compute = .compute, call = frame)
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", .compute)
  defer(`[[<-`(., "cp", prev_profile), envir = frame)
}

#' Create Serialization Configuration
#'
#' Returns a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once set by passing to the `serial` argument of [daemons()], the functions
#' apply to all mirai requests for that compute profile.
#'
#' This feature utilises the 'refhook' system of R native serialization.
#'
#' @param class (character) class name(s) for custom serialization, e.g.
#'   `'ArrowTabular'` or `c('torch_tensor', 'ArrowTabular')`.
#' @param sfunc (function | list) serialization function(s) accepting a
#'   reference object and returning a raw vector.
#' @param ufunc (function | list) unserialization function(s) accepting a raw
#'   vector and returning a reference object.
#'
#' @return A list comprising the configuration. This should be passed to the
#'   `serial` argument of [daemons()].
#'
#' @examples
#' cfg <- serial_config("test_cls", function(x) serialize(x, NULL), unserialize)
#' cfg
#'
#' cfg2 <- serial_config(
#'   c("class_one", "class_two"),
#'   list(function(x) serialize(x, NULL), function(x) serialize(x, NULL)),
#'   list(unserialize, unserialize)
#' )
#' cfg2
#'
#' @export
#'
serial_config <- serial_config

#' Register Serialization Configuration
#'
#' Registers a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once registered, the functions apply to all [daemons()] calls where the
#' `serial` argument is `NULL`.
#'
#' @inheritParams serial_config
#'
#' @return Invisible NULL.
#'
#' @export
#'
register_serial <- function(class, sfunc, ufunc) {
  cfg <- serial_config(class, sfunc, ufunc)
  reg <- .[["serial"]]
  `[[<-`(., "serial", lapply(seq_along(cfg), function(i) c(reg[[i]], cfg[[i]])))
  invisible()
}

# internals --------------------------------------------------------------------

# Simplified version of withr standalone `defer()`
defer <- function(expr, envir) {
  thunk <- as.call(list(function() expr))
  do.call(on.exit, list(thunk, add = TRUE, after = FALSE), envir = envir)
}

compute_env <- function(x) ..[[if (is.null(x)) .[["cp"]] else x]]

configure_tls <- function(url, tls, pass, envir) {
  purl <- parse_url(url)
  sch <- purl[["scheme"]]
  if ((startsWith(sch, "tls") || startsWith(sch, "wss")) && is.null(tls)) {
    cert <- write_cert(cn = purl[["hostname"]])
    `[[<-`(envir, "tls", cert[["client"]])
    tls <- cert[["server"]]
  }
  list(tls, if (length(tls)) tls_config(server = tls, pass = pass))
}

create_profile <- function(envir, .compute, n, dots, sync) {
  `[[<-`(envir, "n", n)
  `[[<-`(envir, "dots", dots)
  `[[<-`(envir, "sync", sync)
  `[[<-`(envir, "compute", .compute)
  `[[<-`(.., .compute, envir)
}

init_envir_stream <- function(seed) {
  .advance()
  oseed <- globalenv()[[".Random.seed"]]
  on.exit(`[[<-`(globalenv(), ".Random.seed", oseed))
  RNGkind("L'Ecuyer-CMRG")
  if (length(seed)) {
    set.seed(seed)
  }
  envir <- new.env(hash = FALSE, parent = ..)
  `[[<-`(envir, "stream", globalenv()[[".Random.seed"]])
  `[[<-`(envir, "seed", seed)
}

req_socket <- function(url, tls = NULL) {
  `opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", 0L)
}

parse_dots <- function(envir, ...) {
  ...length() || return("")
  dots <- list(...)
  if (any(names(dots) == "tlscert")) {
    `[[<-`(envir, "tls", dots[["tlscert"]])
  }
  dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))]
  length(dots) || return("")
  sprintf(",%s", paste(names(dots), dots, sep = "=", collapse = ","))
}

parse_tls <- function(tls) {
  switch(
    length(tls) + 1L,
    "",
    sprintf(",tlscert=\"%s\"", tls),
    sprintf(",tlscert=c(\"%s\",\"%s\")", tls[1L], tls[2L])
  )
}

libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]

args_daemon_direct <- function(url, dots, rs, tls = NULL) {
  sprintf(
    "mirai::daemon(\"%s\",dispatcher=FALSE%s%s,rs=c(%s))",
    url,
    dots,
    parse_tls(tls),
    paste0(rs, collapse = ",")
  )
}

args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
  sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls))
}

args_dispatcher <- function(urld, url, n) {
  sprintf(".libPaths(\"%s\");mirai::dispatcher(\"%s\",url=\"%s\",n=%d)", libp(), urld, url, n)
}

launch_daemon <- function(args) system2(.command, args = c("-e", shQuote(args)), wait = FALSE)

query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
  r <- send(sock, command, mode = send_mode, block = block)
  r && return(r)
  recv(sock, mode = recv_mode, block = block)
}

sync_with <- function(cv, message_key, sync = 0L) {
  while (!until(cv, .limit_long)) {
    message(sprintf(._[[message_key]], sync <- sync + .limit_long_secs))
  }
}

launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
  cv <- cv()
  urld <- local_url()
  sock <- req_socket(urld)
  pipe_notify(sock, cv, add = TRUE)
  local <- is.numeric(url)
  n <- if (local) url else 0L
  if (local) {
    url <- local_url()
  }
  system2(
    .command,
    args = c("--default-packages=NULL", "--vanilla", "-e", shQuote(args_dispatcher(urld, url, n))),
    wait = FALSE
  )
  if (is.null(serial)) {
    serial <- .[["serial"]]
  }
  if (is.list(serial)) {
    `opt<-`(sock, "serial", serial)
  }
  `[[<-`(envir, "cv", cv)
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "dispatcher", urld)
  data <- list(Sys.getenv("R_DEFAULT_PACKAGES"), tls, pass, serial, envir[["stream"]])

  sync_with(cv, "sync_dispatcher")

  pipe_notify(sock, NULL, add = TRUE)
  send(sock, data, mode = 1L, block = TRUE)
  if (local) {
    launch_args <- args_daemon_disp(url, dots)
    for (i in seq_len(n)) {
      launch_daemon(launch_args)
    }
  }
  raio <- recv_aio(sock, mode = 2L, cv = cv)
  sync_with(cv, "sync_dispatcher")

  `[[<-`(envir, "url", collect_aio(raio))
}

launch_daemons <- function(seq, dots, envir) {
  cv <- cv()
  urld <- local_url()
  sock <- req_socket(urld)
  pipe_notify(sock, cv, add = TRUE)
  for (i in seq) {
    launch_daemon(args_daemon_direct(urld, dots, maybe_next_stream(envir)))
  }
  `[[<-`(envir, "cv", cv)
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "url", urld)
  for (i in seq) {
    sync_with(cv, "sync_daemons")
  }
  pipe_notify(sock, NULL, add = TRUE)
}

create_sock <- function(envir, url, tls) {
  sock <- req_socket(url, tls = tls)
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "url", attr(attr(sock, "listener")[[1L]], "url"))
}

send_signal <- function(envir) {
  signals <- if (is.null(envir[["dispatcher"]])) {
    stat(envir[["sock"]], "pipes")
  } else {
    query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
  }
  for (i in seq_len(signals)) {
    send(envir[["sock"]], ._scm_., mode = 2L)
    msleep(.sleep_signal)
  }
}

dispatcher_status <- function(envir) {
  status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
  is.object(status) && return(status)
  list(
    connections = status[1L],
    daemons = envir[["url"]],
    mirai = c(awaiting = status[3L], executing = status[4L], completed = status[5L])
  )
}

stop_d <- function(.compute, call) {
  profile <- is.character(.compute)
  msg <- if (profile) {
    sprintf("No daemons set for the '%s' compute profile.", .compute)
  } else {
    "No daemons set."
  }
  try <- if (profile) {
    sprintf("mirai::daemons(6, .compute = \"%s\")", .compute)
  } else {
    "mirai::daemons(6)"
  }
  cli_enabled || stop(sprintf("%s\nUse e.g. %s to set 6 local daemons.", msg, try), call. = FALSE)
  cli::cli_abort(c(msg, sprintf("Use e.g. {.run %s} to set 6 local daemons.", try)), call = call)
}

._scm_. <- as.raw(c(
  0x42,
  0x0a,
  0x03,
  0x00,
  0x00,
  0x00,
  0x02,
  0x03,
  0x04,
  0x00,
  0x00,
  0x05,
  0x03,
  0x00,
  0x05,
  0x00,
  0x00,
  0x00,
  0x55,
  0x54,
  0x46,
  0x2d,
  0x38,
  0xfc,
  0x00,
  0x00,
  0x00
))

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Feb. 13, 2026, 9:07 a.m.