R/daemons.R

Defines functions stop_d stop_d_cli dispatcher_status send_signal check_store_sock_url store_sock_url sub_real_port store_dispatcher launch_daemons launch_dispatcher query_dispatcher launch_daemon wa5 wa4 wa3 wa2 libp parse_tls parse_dots parse_dispatcher req_socket init_envir_stream create_profile configure_tls register_serial require_daemons daemons_set status with.miraiDaemons print.miraiDaemons daemons

Documented in daemons daemons_set register_serial require_daemons status with.miraiDaemons

# mirai ------------------------------------------------------------------------

#' Daemons (Set Persistent Processes)
#'
#' Set daemons, or persistent background processes, to receive [mirai()]
#' requests. Specify `n` to create daemons on the local machine. Specify `url`
#' to receive connections from remote daemons (for distributed computing across
#' the network). Specify `remote` to optionally launch remote daemons via a
#' remote configuration. Dispatcher (enabled by default) ensures optimal
#' scheduling.
#'
#' Use `daemons(0)` to reset daemon connections:
#' \itemize{
#'   \item All connected daemons and/or dispatchers exit automatically.
#'   \item \pkg{mirai} reverts to the default behaviour of creating a new
#'   background process for each request.
#'   \item Any unresolved 'mirai' will return an 'errorValue' 19 (Connection
#'   reset) after a reset.
#'   \item Daemons must be reset before calling `daemons()` with revised
#'   settings for a compute profile. Daemons may be added at any time by using
#'   [launch_local()] or [launch_remote()] without needing to revise daemons
#'   settings.
#' }
#'
#' If the host session ends, all connected dispatcher and daemon processes
#' automatically exit as soon as their connections are dropped (unless the
#' daemons were started with `autoexit = FALSE`). If a daemon is processing
#' a task, it will exit as soon as the task is complete.
#'
#' To reset persistent daemons started with `autoexit = FALSE`, use
#' `daemons(NULL)` instead, which also sends exit signals to all connected
#' daemons prior to resetting.
#'
#' For historical reasons, `daemons()` with no arguments (other than
#' optionally `.compute`) returns the value of [status()].
#'
#' @inheritParams mirai
#' @inheritParams dispatcher
#' @param n integer number of daemons to launch.
#' @param url \[default NULL\] if specified, a character string comprising a URL
#'   at which to listen for remote daemons, including a port accepting incoming
#'   connections, e.g. 'tcp://hostname:5555' or 'tcp://10.75.32.70:5555'.
#'   Specify a URL with scheme 'tls+tcp://' to use secure TLS connections (for
#'   details see Distributed Computing section below). Auxiliary function
#'   [host_url()] may be used to construct a valid host URL.
#' @param remote \[default NULL\] required only for launching remote daemons, a
#'   configuration generated by [remote_config()] or [ssh_config()].
#' @param dispatcher \[default TRUE\] logical value, whether to use dispatcher.
#'   Dispatcher runs in a separate process to ensure optimal scheduling,
#'   and should normally be kept on (for details see Dispatcher section below).
#' @param ... (optional) additional arguments passed through to
#'   [daemon()] if launching daemons. These include `asyncdial`, `autoexit`,
#'   `cleanup`, `output`, `maxtasks`, `idletime` and `walltime`.
#' @param seed \[default NULL\] (optional) supply a random seed (single value,
#'   interpreted as an integer). This is used to inititalise the L'Ecuyer-CMRG
#'   RNG streams sent to each daemon. Note that reproducible results can be
#'   expected only for `dispatcher = FALSE`, as the unpredictable timing of task
#'   completions would otherwise influence the tasks sent to each daemon. Even
#'   for `dispatcher = FALSE`, reproducibility is not guaranteed if the order in
#'   which tasks are sent is not deterministic.
#' @param serial \[default NULL\] (optional, requires dispatcher) a
#'   configuration created by [serial_config()] to register serialization and
#'   unserialization functions for normally non-exportable reference objects,
#'   such as Arrow Tables or torch tensors. If `NULL`, configurations registered
#'   with [register_serial()] are automatically applied.
#' @param tls \[default NULL\] (optional for secure TLS connections) if not
#'   supplied, zero-configuration single-use keys and certificates are
#'   automatically generated. If supplied, **either** the character path to
#'   a file containing the PEM-encoded TLS certificate and associated private
#'   key (may contain additional certificates leading to a validation chain,
#'   with the TLS certificate first), **or** a length 2 character vector
#'   comprising \[i\] the TLS certificate (optionally certificate chain) and
#'   \[ii\] the associated private key.
#'
#' @return The integer number of daemons launched locally (zero if specifying
#'   `url` or using a remote launcher).
#'
#' @section Local Daemons:
#'
#' Daemons provide a potentially more efficient solution for asynchronous
#' operations as new processes no longer need to be created on an *ad hoc*
#' basis.
#'
#' Supply the argument `n` to set the number of daemons. New background
#' [daemon()] processes are automatically created on the local machine
#' connecting back to the host process, either directly or via dispatcher.
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#'
#' Specifying `dispatcher = FALSE`, daemons connect directly to the host and
#' tasks are distributed in a round-robin fashion, with tasks queued at each
#' daemon. Optimal scheduling is not guaranteed as, depending on the duration of
#' tasks, they can be queued at one daemon while others remain idle. However,
#' this solution is the most resource-light, and suited to similar-length tasks,
#' or where concurrent tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
#' Specifying `url` as a character string allows tasks to be distributed across
#' the network. `n` is only required in this case if providing a launch
#' configuration to `remote` to launch remote daemons.
#'
#' Supply a URL with a 'tcp://' scheme, such as 'tcp://10.75.32.70:5555'. The
#' host / dispatcher listens at this address, utilising a single port.
#' Individual daemons (started with [daemon()]) may then dial in to this URL.
#' Host / dispatcher automatically adjusts to the number of daemons actually
#' connected, allowing dynamic upscaling or downscaling as required.
#'
#' Switching the URL scheme to 'tls+tcp://' automatically upgrades the
#' connection to use TLS. The auxiliary function [host_url()] may be used to
#' construct a valid host URL based on the computer's IP address.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' `[]` to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified as
#' 'tcp://\[::1\]:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://\[::1\]:0'
#' will automatically assign a free ephemeral port. Use [status()] to inspect
#' the actual assigned port at any time.
#'
#' Specify `remote` with a call to [remote_config()] or [ssh_config()] to launch
#' daemons on remote machines. Otherwise, [launch_remote()] may be used to
#' generate the shell commands to deploy daemons manually on remote resources.
#'
#' @section Compute Profiles:
#'
#' If `NULL`, the `"default"` compute profile is used. Providing a character value
#' for `.compute` creates a new compute profile with the name specified. Each
#' compute profile retains its own daemons settings, and may be operated
#' independently of each other. Some usage examples follow:
#'
#' **local / remote** daemons may be set with a host URL and specifying
#' `.compute` as `"remote"`, which creates a new compute profile. Subsequent
#' [mirai()] calls may then be sent for local computation by not specifying the
#' `.compute` argument, or for remote computation to connected daemons by
#' specifying the `.compute` argument as `"remote"`.
#'
#' **cpu / gpu** some tasks may require access to different types of daemon,
#' such as those with GPUs. In this case, `daemons()` may be called to set up
#' host URLs for CPU-only daemons and for those with GPUs, specifying the
#' `.compute` argument as `"cpu"` and `"gpu"` respectively. By supplying the
#' `.compute` argument to subsequent [mirai()] calls, tasks may be sent to
#' either `cpu` or `gpu` daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via `daemons(0)` should
#' be carried out with the desired `.compute` argument specified.
#'
#' @examplesIf interactive()
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set up dispatcher accepting TLS over TCP connections
#' daemons(url = host_url(tls = TRUE))
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Use with() to evaluate with daemons for the duration of the expression
#' with(
#'   daemons(2),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' \dontrun{
#'
#' # Launch daemons on remotes 'nodeone' and 'nodetwo' using SSH
#' # connecting back directly to the host URL over a TLS connection:
#' daemons(
#'   url = host_url(tls = TRUE),
#'   remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo'))
#' )
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#'   n = 4,
#'   url = local_url(tcp = TRUE),
#'   remote = ssh_config('ssh://10.75.32.90', tunnel = TRUE)
#' )
#'
#' }
#'
#' @export
#'
daemons <- function(
  n,
  url = NULL,
  remote = NULL,
  dispatcher = TRUE,
  ...,
  seed = NULL,
  serial = NULL,
  tls = NULL,
  pass = NULL,
  .compute = NULL
) {
  missing(n) && missing(url) && return(status(.compute))

  if (is.null(.compute)) .compute <- .[["cp"]]
  envir <- ..[[.compute]]

  if (is.character(url)) {
    if (is.null(envir)) {
      url <- url[1L]
      envir <- init_envir_stream(seed)
      launches <- 0L
      dots <- parse_dots(...)
      output <- attr(dots, "output")

      switch(
        parse_dispatcher(dispatcher),
        {
          tls <- configure_tls(url, tls, pass, envir)
          sock <- req_socket(url, tls = tls)
          check_store_sock_url(envir, sock)
        },
        {
          cv <- cv()
          tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE)
          urld <- local_url()
          sock <- req_socket(urld)
          if (is.null(serial)) serial <- .[["serial"]]
          if (is.list(serial)) `opt<-`(sock, "serial", serial)
          args <- wa5(urld, url, dots)
          res <- launch_dispatcher(sock, args, output, serial, tls = tls, pass = pass)
          store_dispatcher(envir, sock, cv, urld, res)
        },
        stop(._[["dispatcher_args"]])
      )
      create_profile(envir, .compute, launches, dots)
      if (length(remote)) {
        on.exit(daemons(0L, .compute = .compute))
        launch_remote(
          n = n,
          remote = remote,
          ...,
          tls = envir[["tls"]],
          .compute = .compute
        )
        on.exit()
      }
    } else {
      stop(sprintf(._[["daemons_set"]], .compute))
    }
  } else {
    signal <- is.null(n)
    if (signal) n <- 0L
    is.numeric(n) || stop(._[["numeric_n"]])
    n <- as.integer(n)

    if (n == 0L) {
      is.null(envir) && return(0L)

      if (signal) send_signal(envir)
      reap(envir[["sock"]])
      ..[[.compute]] <- NULL -> envir
    } else if (is.null(envir)) {
      n > 0L || stop(._[["n_zero"]])
      dynGet(".mirai_within_map", ifnotfound = FALSE) && stop(._[["within_map"]])
      envir <- init_envir_stream(seed)
      urld <- local_url()
      dots <- parse_dots(...)
      output <- attr(dots, "output")
      switch(
        parse_dispatcher(dispatcher),
        {
          sock <- req_socket(urld)
          launch_daemons(seq_len(n), sock, urld, dots, envir, output)
          store_sock_url(envir, sock, urld)
        },
        {
          cv <- cv()
          sock <- req_socket(urld)
          if (is.null(serial)) serial <- .[["serial"]]
          if (is.list(serial)) `opt<-`(sock, "serial", serial)
          args <- wa4(urld, n, envir[["stream"]], dots)
          res <- launch_dispatcher(sock, args, output, serial)
          store_dispatcher(envir, sock, cv, urld, res)
        },
        stop(._[["dispatcher_args"]])
      )
      create_profile(envir, .compute, n, dots)
    } else {
      stop(sprintf(._[["daemons_set"]], .compute))
    }
  }

  is.null(envir) && return(0L)
  `class<-`(envir[["n"]], c("miraiDaemons", .compute))
}

#' @export
#'
print.miraiDaemons <- function(x, ...) print(unclass(x))

#' With Mirai Daemons
#'
#' Evaluate an expression with daemons that last for the duration of the
#' expression. Ensure each mirai within the statement is explicitly called (or
#' their values collected) so that daemons are not reset before they have all
#' completed.
#'
#' This function is an S3 method for the generic [with()] for class
#' 'miraiDaemons'.
#'
#' @param data a call to [daemons()].
#' @param expr an expression to evaluate.
#' @param ... not used.
#'
#' @return The return value of `expr`.
#'
#' @examplesIf interactive()
#' with(
#'   daemons(2, dispatcher = FALSE),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' status()
#'
#' @export
#'
with.miraiDaemons <- function(data, expr, ...) {
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", class(data)[2L])
  on.exit({
    daemons(0L)
    `[[<-`(., "cp", prev_profile)
  })
  expr
}

#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @param .compute \[default NULL\] character value for the compute profile to
#'   query, or NULL to query the 'default' profile.
#'
#'   **or** a 'miraiCluster' to obtain its status.
#'
#' @return A named list comprising:
#'   \itemize{
#'     \item **connections** - integer number of active daemon connections.
#'     \item **daemons** - character URL at which host / dispatcher is
#'     listening, or else `0L` if daemons have not yet been set.
#'     \item **mirai** (present only if using dispatcher) - a named integer
#'     vector comprising: **awaiting** - number of tasks queued for execution at
#'     dispatcher, **executing** - number of tasks sent to a daemon for
#'     execution, and **completed** - number of tasks for which the result has
#'     been received (either completed or cancelled).
#'   }
#'
#' @section Events:
#'
#'   If dispatcher is used combined with daemon IDs, an additional element
#'   **events** will report the positive integer ID when the daemon connects and
#'   the negative value when it disconnects. Only the events since the previous
#'   status query are returned.
#'
#' @examplesIf interactive()
#' status()
#' daemons(url = "tcp://[::1]:0")
#' status()
#' daemons(0)
#'
#' @export
#'
status <- function(.compute = NULL) {
  if (is.null(.compute)) .compute <- .[["cp"]]
  is.list(.compute) && return(status(attr(.compute, "id")))
  envir <- ..[[.compute]]
  is.null(envir) && return(list(connections = 0L, daemons = 0L))
  is.null(envir[["dispatcher"]]) || return(dispatcher_status(envir))
  list(
    connections = as.integer(stat(envir[["sock"]], "pipes")),
    daemons = envir[["url"]]
  )
}

#' Daemons Set
#'
#' Returns a logical value, whether or not daemons have been set for a given
#' compute profile.
#'
#' @inheritParams status
#'
#' @return Logical `TRUE` or `FALSE`.
#'
#' @examplesIf interactive()
#' daemons_set()
#' daemons(1)
#' daemons_set()
#' daemons(0)
#'
#' @export
#'
daemons_set <- function(.compute = NULL) {
  if (is.null(.compute)) .compute <- .[["cp"]]
  !is.null(..[[.compute]])
}

#' Require Daemons
#'
#' Returns TRUE only if daemons are set, otherwise produces an informative
#' error for the user to set daemons, with a clickable function link if the
#' \CRANpkg{cli} package is available.
#'
#' @param call (only used if the \CRANpkg{cli} package is installed) the
#'   execution environment of a currently running function, e.g.
#'   `environment()`. The function will be mentioned in error messages as the
#'   source of the error.
#' @inheritParams status
#'
#' @return Logical `TRUE`, or else errors.
#'
#' @examplesIf interactive()
#' daemons(1)
#' require_daemons()
#' daemons(0)
#'
#' @export
#'
require_daemons <- function(call = environment(), .compute = NULL) {
  ensure_cli_initialized()
  daemons_set(.compute = .compute) || .[["require_daemons"]](call)
}

#' Create Serialization Configuration
#'
#' Returns a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once set by passing to the `serial` argument of [daemons()], the functions
#' apply to all mirai requests for that compute profile.
#'
#' This feature utilises the 'refhook' system of R native serialization.
#'
#' @param class a character string (or vector) of the class of object custom
#'   serialization functions are applied to, e.g. `'ArrowTabular'` or
#'   `c('torch_tensor', 'ArrowTabular')`.
#' @param sfunc a function (or list of functions) that accepts a reference
#'   object inheriting from `class` and returns a raw vector.
#' @param ufunc a function (or list of functions) that accepts a raw vector and
#'   returns a reference object.
#'
#' @return A list comprising the configuration. This should be passed to the
#'   `serial` argument of [daemons()].
#'
#' @examples
#' cfg <- serial_config("test_cls", function(x) serialize(x, NULL), unserialize)
#' cfg
#'
#' cfg2 <- serial_config(
#'   c("class_one", "class_two"),
#'   list(function(x) serialize(x, NULL), function(x) serialize(x, NULL)),
#'   list(unserialize, unserialize)
#' )
#' cfg2
#'
#' @export
#'
serial_config <- serial_config

#' Register Serialization Configuration
#'
#' Registers a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once registered, the functions apply to all [daemons()] calls where the
#' `serial` argument is `NULL`.
#'
#' @inheritParams serial_config
#'
#' @return Invisible NULL.
#'
#' @export
#'
register_serial <- function(class, sfunc, ufunc) {
  cfg <- serial_config(class, sfunc, ufunc)
  reg <- .[["serial"]]
  `[[<-`(., "serial", lapply(1:3, function(i) c(reg[[i]], cfg[[i]])))
  invisible()
}

# internals --------------------------------------------------------------------

configure_tls <- function(url, tls, pass, envir, returnconfig = TRUE) {
  purl <- parse_url(url)
  sch <- purl[["scheme"]]
  if ((startsWith(sch, "wss") || startsWith(sch, "tls")) && is.null(tls)) {
    cert <- write_cert(cn = purl[["hostname"]])
    `[[<-`(envir, "tls", cert[["client"]])
    tls <- cert[["server"]]
  }
  cfg <- if (length(tls)) tls_config(server = tls, pass = pass)
  returnconfig || return(tls)
  cfg
}

create_profile <- function(envir, .compute, n, dots) {
  `[[<-`(envir, "n", n)
  `[[<-`(envir, "dots", dots)
  `[[<-`(.., .compute, envir)
}

init_envir_stream <- function(seed) {
  .advance()
  oseed <- .GlobalEnv[[".Random.seed"]]
  RNGkind("L'Ecuyer-CMRG")
  if (length(seed)) set.seed(seed)
  envir <- `[[<-`(
    new.env(hash = FALSE, parent = ..),
    "stream",
    .GlobalEnv[[".Random.seed"]]
  )
  `[[<-`(.GlobalEnv, ".Random.seed", oseed)
  envir
}

req_socket <- function(url, tls = NULL)
  `opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", 0L)

parse_dispatcher <- function(x)
  if (is.logical(x)) 1L + (!is.na(x) && x) else if (is.character(x)) 1L else 3L

parse_dots <- function(...) {
  ...length() || return("")
  dots <- list(...)
  dots <- dots[as.logical(lapply(
    dots,
    function(x) is.logical(x) || is.numeric(x)
  ))]
  length(dots) || return("")
  dnames <- names(dots)
  out <- sprintf(",%s", paste(dnames, dots, sep = "=", collapse = ","))
  is.logical(dots[["output"]]) && dots[["output"]] && return(`attr<-`(out, "output", ""))
  out
}

parse_tls <- function(tls)
  switch(
    length(tls) + 1L,
    "",
    sprintf(",tls=\"%s\"", tls),
    sprintf(",tls=c(\"%s\",\"%s\")", tls[1L], tls[2L])
  )

libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]

wa2 <- function(url, dots, rs, tls = NULL)
  shQuote(sprintf(
    "mirai::daemon(\"%s\",dispatcher=FALSE%s%s,rs=c(%s))",
    url,
    dots,
    parse_tls(tls),
    paste0(rs, collapse = ",")
  ))

wa3 <- function(url, dots, rs = NULL, tls = NULL)
  shQuote(sprintf(
    "mirai::daemon(\"%s\"%s%s)",
    url,
    dots,
    parse_tls(tls)
  ))

wa4 <- function(urld, n, rs, dots)
  shQuote(sprintf(
    ".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)",
    libp(),
    urld,
    n,
    paste0(rs, collapse = ","),
    dots
  ))

wa5 <- function(urld, url, dots)
  shQuote(sprintf(
    ".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)",
    libp(),
    urld,
    url,
    dots
  ))

launch_daemon <- function(args, output)
  system2(
    .command,
    args = c("-e", args),
    stdout = output,
    stderr = output,
    wait = FALSE
  )

query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
  r <- send(sock, command, mode = send_mode, block = block)
  r && return(r)
  recv(sock, mode = recv_mode, block = block)
}

launch_dispatcher <- function(sock, args, output, serial, tls = NULL, pass = NULL) {
  pkgs <- Sys.getenv("R_DEFAULT_PACKAGES")
  system2(
    .command,
    args = c("--default-packages=NULL", "--vanilla", "-e", args),
    stdout = output,
    stderr = output,
    wait = FALSE
  )
  sync <- 0L
  cv <- cv()
  pipe_notify(sock, cv, add = TRUE)
  while(!until(cv, .limit_long))
    message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
  pipe_notify(sock, NULL, add = TRUE)
  res <- request(.context(sock), list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, cv = cv)
  while(!until(cv, .limit_long))
    message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
  collect_aio(res)
}

launch_daemons <- function(seq, sock, urld, dots, envir, output) {
  cv <- cv()
  pipe_notify(sock, cv, add = TRUE)
  for (i in seq)
    launch_daemon(wa2(urld, dots, next_stream(envir)), output)
  sync <- 0L
  for (i in seq)
    while(!until(cv, .limit_long))
      message(sprintf(._[["sync_daemons"]], sync <- sync + .limit_long_secs))
  pipe_notify(sock, NULL, add = TRUE)
}

store_dispatcher <- function(envir, sock, cv, urld, res) {
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "dispatcher", urld)
  `[[<-`(envir, "cv", cv)
  `[[<-`(envir, "stream", NULL)
  `[[<-`(envir, "url", res[2L])
  `[[<-`(envir, "pid", as.integer(res[1L]))
}

sub_real_port <- function(port, url)
  sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

store_sock_url <- function(envir, sock, url) {
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "url", url)
}

check_store_sock_url <- function(envir, sock) {
  listener <- attr(sock, "listener")[[1L]]
  url <- opt(listener, "url")
  if (parse_url(url)[["port"]] == "0")
    url <- sub_real_port(opt(listener, "tcp-bound-port"), url)
  store_sock_url(envir, sock, url)
}

send_signal <- function(envir) {
  signals <- if (is.null(envir[["dispatcher"]])) stat(envir[["sock"]], "pipes") else
    query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
  for (i in seq_len(signals)) {
    send(envir[["sock"]], ._scm_., mode = 2L)
    msleep(10L)
  }
}

dispatcher_status <- function(envir) {
  status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
  is.object(status) && return(status)
  out <- list(
    connections = status[1L],
    daemons = envir[["url"]],
    mirai = c(
      awaiting = status[2L],
      executing = status[3L],
      completed = status[4L] - status[2L] - status[3L]
    )
  )
  if (length(status) > 4L)
    out <- c(out, list(events = status[5:length(status)]))
  out
}

stop_d_cli <- function(call)
  cli::cli_abort("No daemons set - use e.g. {.run mirai::daemons(6)} to set 6 local daemons.", call = call)

stop_d <- function(call)
  stop("No daemons set - use e.g. mirai::daemons(6) to set 6 local daemons.", call. = FALSE)

._scm_. <- as.raw(c(0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on June 26, 2025, 1:08 a.m.