R/daemons.R

Defines functions stop_d stop_d_cli dispatcher_status send_signal create_sock sub_real_port launch_daemons launch_dispatcher query_dispatcher launch_daemon args_dispatcher args_daemon_disp args_daemon_direct libp parse_tls parse_dots req_socket init_envir_stream create_profile configure_tls compute_env register_serial local_daemons with_daemons require_daemons daemons_set info status with.miraiDaemons print.miraiDaemons daemons

Documented in daemons daemons_set info local_daemons register_serial require_daemons status with_daemons with.miraiDaemons

# mirai ------------------------------------------------------------------------

#' Daemons (Set Persistent Processes)
#'
#' Set daemons, or persistent background processes, to receive [mirai()]
#' requests. Specify `n` to create daemons on the local machine. Specify `url`
#' to receive connections from remote daemons (for distributed computing across
#' the network). Specify `remote` to optionally launch remote daemons via a
#' remote configuration. Dispatcher (enabled by default) ensures optimal
#' scheduling.
#'
#' Use `daemons(0)` to reset daemon connections:
#' \itemize{
#'   \item All connected daemons and/or dispatchers exit automatically.
#'   \item Any as yet unresolved 'mirai' will return an 'errorValue' 19
#'   (Connection reset).
#'   \item [mirai()] reverts to the default behaviour of creating a new
#'   background process for each request.
#' }
#'
#' If the host session ends, all connected dispatcher and daemon processes
#' automatically exit as soon as their connections are dropped.
#'
#' Calling [daemons()] implicitly resets any existing daemons for the compute
#' profile with `daemons(0)`. Instead, [launch_local()] or [launch_remote()] may
#' be used to add daemons at any time without resetting daemons.
#'
#' @inheritParams mirai
#' @param n integer number of daemons to launch.
#' @param url \[default NULL\] if specified, a character string comprising a URL
#'   at which to listen for remote daemons, including a port accepting incoming
#'   connections, e.g. 'tcp://hostname:5555' or 'tcp://10.75.32.70:5555'.
#'   Specify a URL with scheme 'tls+tcp://' to use secure TLS connections (for
#'   details see Distributed Computing section below). Auxiliary function
#'   [host_url()] may be used to construct a valid host URL.
#' @param remote \[default NULL\] required only for launching remote daemons, a
#'   configuration generated by [ssh_config()], [cluster_config()], or
#'   [remote_config()].
#' @param dispatcher \[default TRUE\] logical value, whether to use dispatcher.
#'   Dispatcher runs in a separate process to ensure optimal scheduling,
#'   and should normally be kept on (for details see Dispatcher section below).
#' @param ... (optional) additional arguments passed through to [daemon()] if
#'   launching daemons. These include `asyncdial`, `autoexit`, `cleanup`,
#'   `output`, `maxtasks`, `idletime`, `walltime` and `tlscert`.
#' @param seed \[default NULL\] (optional) The default of `NULL` initializes
#'   L'Ecuyer-CMRG RNG streams for each daemon, the same as base R's parallel
#'   package. Results are statistically-sound, although generally
#'   non-reproducible, as which tasks are sent to which daemons may be
#'   non-deterministic, and also depends on the number of daemons.
#'   \cr (experimental) supply an integer value to instead initialize a
#'   L'Ecuyer-CMRG RNG stream for the compute profile. This is advanced for each
#'   mirai evaluation, hence allowing for reproducible results, as the random
#'   seed is always associated with a given mirai, independently of where it is
#'   evaluated.
#' @param serial \[default NULL\] (optional, requires dispatcher) a
#'   configuration created by [serial_config()] to register serialization and
#'   unserialization functions for normally non-exportable reference objects,
#'   such as Arrow Tables or torch tensors. If `NULL`, configurations registered
#'   with [register_serial()] are automatically applied.
#' @param tls \[default NULL\] (optional for secure TLS connections) if not
#'   supplied, zero-configuration single-use keys and certificates are
#'   automatically generated. If supplied, **either** the character path to
#'   a file containing the PEM-encoded TLS certificate and associated private
#'   key (may contain additional certificates leading to a validation chain,
#'   with the TLS certificate first), **or** a length 2 character vector
#'   comprising \[i\] the TLS certificate (optionally certificate chain) and
#'   \[ii\] the associated private key.
#' @param pass \[default NULL\] (required only if the private key supplied to
#'   `tls` is encrypted with a password) For security, should be provided
#'   through a function that returns this value, rather than directly.
#'
#' @return Invisibly, logical `TRUE` when creating daemons and `FALSE` when
#'   resetting.
#'
#' @section Local Daemons:
#'
#' Setting daemons, or persistent background processes, is typically more
#' efficient as it removes the need for, and overhead of, creating new processes
#' for each mirai evaluation. It also provides control over the total number of
#' processes at any one time.
#'
#' Supply the argument `n` to set the number of daemons. New background
#' [daemon()] processes are automatically launched on the local machine
#' connecting back to the host process, either directly or via dispatcher.
#'
#' @section Dispatcher:
#'
#' By default `dispatcher = TRUE` launches a background process running
#' [dispatcher()]. Dispatcher connects to daemons on behalf of the host, queues
#' tasks, and ensures optimal FIFO scheduling. Dispatcher also enables (i) mirai
#' cancellation using [stop_mirai()] or when using a `.timeout` argument to
#' [mirai()], and (ii) the use of custom serialization configurations.
#'
#' Specifying `dispatcher = FALSE`, daemons connect directly to the host and
#' tasks are distributed in a round-robin fashion, with tasks queued at each
#' daemon. Optimal scheduling is not guaranteed as, depending on the duration of
#' tasks, they can be queued at one daemon while others remain idle. However,
#' this solution is the most resource-light, and suited to similar-length tasks,
#' or where concurrent tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
#' Specify `url` as a character string to allow tasks to be distributed across
#' the network (`n` is only required in this case if also providing a launch
#' configuration to `remote`).
#'
#' The host / dispatcher listens at this URL, utilising a single port, and
#' [daemon()] processes dial in to this URL. Host / dispatcher automatically
#' adjusts to the number of daemons actually connected, allowing dynamic
#' upscaling / downscaling.
#'
#' The URL should have a 'tcp://' scheme, such as 'tcp://10.75.32.70:5555'.
#' Switching the URL scheme to 'tls+tcp://' automatically upgrades the
#' connection to use TLS. The auxiliary function [host_url()] may be used to
#' construct a valid host URL based on the computer's IP address.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' `[]` to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified as
#' 'tcp://\[::1\]:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://\[::1\]:0'
#' will automatically assign a free ephemeral port. Use [status()] to inspect
#' the actual assigned port at any time.
#'
#' Specify `remote` with a call to [ssh_config()], [cluster_config()] or
#' [remote_config()] to launch (programatically deploy) daemons on remote
#' machines, from where they dial back to `url`. If not launching daemons,
#' [launch_remote()] may be used to generate the shell commands for manual
#' deployment.
#'
#' @section Compute Profiles:
#'
#' If `NULL`, the `"default"` compute profile is used. Providing a character
#' value for `.compute` creates a new compute profile with the name specified.
#' Each compute profile retains its own daemons settings, and may be operated
#' independently of each other. Some usage examples follow:
#'
#' **local / remote** daemons may be set with a host URL and specifying
#' `.compute` as `"remote"`, which creates a new compute profile. Subsequent
#' [mirai()] calls may then be sent for local computation by not specifying the
#' `.compute` argument, or for remote computation to connected daemons by
#' specifying the `.compute` argument as `"remote"`.
#'
#' **cpu / gpu** some tasks may require access to different types of daemon,
#' such as those with GPUs. In this case, `daemons()` may be called to set up
#' host URLs for CPU-only daemons and for those with GPUs, specifying the
#' `.compute` argument as `"cpu"` and `"gpu"` respectively. By supplying the
#' `.compute` argument to subsequent [mirai()] calls, tasks may be sent to
#' either `cpu` or `gpu` daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via `daemons(0)` should
#' be carried out with the desired `.compute` argument specified.
#'
#' @seealso [with_daemons()] and [local_daemons()] for managing the compute
#'   profile used locally.
#'
#' @examplesIf interactive()
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set up dispatcher accepting TLS over TCP connections
#' daemons(url = host_url(tls = TRUE))
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into
#' daemons(url = host_url(), dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Use with() to evaluate with daemons for the duration of the expression
#' with(
#'   daemons(2),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' \dontrun{
#'
#' # Launch daemons on remotes 'nodeone' and 'nodetwo' using SSH
#' # connecting back directly to the host URL over a TLS connection:
#' daemons(
#'   url = host_url(tls = TRUE),
#'   remote = ssh_config(c('ssh://nodeone', 'ssh://nodetwo'))
#' )
#'
#' # Launch 4 daemons on the remote machine 10.75.32.90 using SSH tunnelling:
#' daemons(
#'   n = 4,
#'   url = local_url(tcp = TRUE),
#'   remote = ssh_config('ssh://10.75.32.90', tunnel = TRUE)
#' )
#'
#' }
#'
#' @export
#'
daemons <- function(
  n,
  url = NULL,
  remote = NULL,
  dispatcher = TRUE,
  ...,
  seed = NULL,
  serial = NULL,
  tls = NULL,
  pass = NULL,
  .compute = NULL
) {
  if (is.null(.compute)) .compute <- .[["cp"]]
  envir <- ..[[.compute]]

  if (is.character(url)) {
    res <- if (is.null(envir)) {
      url <- url[1L]
      envir <- init_envir_stream(seed)
      dots <- parse_dots(envir, ...)
      cfg <- configure_tls(url, tls, pass, envir)

      if (is.character(dispatcher) && dispatcher == "none") dispatcher <- FALSE
      if (dispatcher) {
        launch_dispatcher(url, dots, envir, serial, tls = cfg[[1L]], pass = pass)
      } else {
        create_sock(envir, url, cfg[[2L]])
      }
      create_profile(envir, .compute, 0L, dots)
      if (length(remote)) {
        on.exit(daemons(0L, .compute = .compute))
        launch_remote(n = n, remote = remote, .compute = .compute)
        on.exit()
      }
      url
    }
  } else {
    signal <- is.null(n)
    if (signal) n <- 0L
    is.numeric(n) || stop(._[["numeric_n"]])
    n <- as.integer(n)

    n == 0L && {
      is.null(envir) && return(invisible(FALSE))

      if (signal) send_signal(envir)
      reap(envir[["sock"]])
      if (otel_tracing) otel::end_span(envir[["otel_span"]])
      ..[[.compute]] <- NULL -> envir
      return(invisible(FALSE))
    }
    res <- if (is.null(envir)) {
      n > 0L || stop(._[["n_zero"]])
      dynGet(".mirai_within_map", ifnotfound = FALSE) && stop(._[["within_map"]])
      envir <- init_envir_stream(seed)
      dots <- parse_dots(envir, ...)

      if (is.character(dispatcher) && dispatcher == "none") dispatcher <- FALSE
      if (dispatcher) {
        launch_dispatcher(n, dots, envir, serial)
      } else {
        launch_daemons(seq_len(n), dots, envir)
      }
      create_profile(envir, .compute, n, dots)
    }
  }

  is.null(res) && return({
    daemons(0L, .compute = .compute)
    daemons(
      n = n,
      url = url,
      remote = remote,
      dispatcher = dispatcher,
      ...,
      seed = seed,
      serial = serial,
      tls = tls,
      pass = pass,
      .compute = .compute
    )
  })

  if (otel_tracing) {
    `[[<-`(
      envir,
      "otel_span",
      otel::start_span(
        "mirai::daemons",
        attributes = otel::as_attributes(list(
          url = envir[["url"]],
          n = envir[["n"]],
          dispatcher = if (is.null(envir[["dispatcher"]])) "false" else "true",
          compute_profile = .compute
        ))
      )
    )
  }

  invisible(`class<-`(TRUE, c("miraiDaemons", .compute)))
}

#' @export
#'
print.miraiDaemons <- function(x, ...) print(unclass(x))

#' With Mirai Daemons
#'
#' Evaluate an expression with daemons that last for the duration of the
#' expression. Ensure each mirai within the statement is explicitly called (or
#' their values collected) so that daemons are not reset before they have all
#' completed.
#'
#' This function is an S3 method for the generic [with()] for class
#' 'miraiDaemons'.
#'
#' @param data a call to [daemons()].
#' @param expr an expression to evaluate.
#' @param ... not used.
#'
#' @return The return value of `expr`.
#'
#' @examplesIf interactive()
#' with(
#'   daemons(2, dispatcher = FALSE),
#'   {
#'     m1 <- mirai(Sys.getpid())
#'     m2 <- mirai(Sys.getpid())
#'     cat(m1[], m2[], "\n")
#'   }
#' )
#'
#' status()
#'
#' @export
#'
with.miraiDaemons <- function(data, expr, ...) {
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", class(data)[2L])
  on.exit({
    daemons(0L)
    `[[<-`(., "cp", prev_profile)
  })
  expr
}

#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @param .compute \[default NULL\] character value for the compute profile to
#'   query, or NULL to query the 'default' profile.
#'
#'   **or** a 'miraiCluster' to obtain its status.
#'
#' @return A named list comprising:
#'   \itemize{
#'     \item **connections** - integer number of active daemon connections.
#'     \item **daemons** - character URL at which host / dispatcher is
#'     listening, or else `0L` if daemons have not yet been set.
#'     \item **mirai** (present only if using dispatcher) - a named integer
#'     vector comprising: **awaiting** - number of tasks queued for execution at
#'     dispatcher, **executing** - number of tasks sent to a daemon for
#'     execution, and **completed** - number of tasks for which the result has
#'     been received (either completed or cancelled).
#'   }
#'
#' @section Events:
#'
#'   If dispatcher is used combined with daemon IDs, an additional element
#'   **events** will report the positive integer ID when the daemon connects and
#'   the negative value when it disconnects. Only the events since the previous
#'   status query are returned.
#'
#' @seealso [info()] for more succinct information statistics.
#'
#' @examplesIf interactive()
#' status()
#' daemons(url = "tcp://[::1]:0")
#' status()
#' daemons(0)
#'
#' @export
#'
status <- function(.compute = NULL) {
  is.list(.compute) && return(status(attr(.compute, "id")))
  envir <- compute_env(.compute)
  is.null(envir) && return(list(connections = 0L, daemons = 0L))
  is.null(envir[["dispatcher"]]) || return(dispatcher_status(envir))
  list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["url"]])
}

#' Information Statistics
#'
#' Retrieve statistics for the specified compute profile.
#'
#' The returned statistics are:
#'
#' - Connections: active daemon connections.
#' - Cumulative: total daemons that have ever connected.
#' - Awaiting: mirai tasks currently queued for execution at dispatcher.
#' - Executing: mirai tasks currently being evaluated on a daemon.
#' - Completed: mirai tasks that have been completed or cancelled.
#'
#' For non-dispatcher daemons: only 'connections' will be available and the
#' other values will be `NA`.
#'
#' @inheritParams mirai
#'
#' @return Named integer vector or else `NULL` if the compute profile is yet to
#'   be set up.
#'
#' @seealso [status()] for more verbose status information.
#'
#' @examplesIf interactive()
#' info()
#' daemons(1)
#' info()
#' daemons(0)
#'
#' @export
#'
info <- function(.compute = NULL) {
  envir <- compute_env(.compute)
  is.null(envir) && return()
  if (is.null(envir[["dispatcher"]])) {
    res <- c(as.integer(stat(envir[["sock"]], "pipes")), NA, NA, NA, NA)
  } else {
    res <- query_dispatcher(envir[["sock"]], c(0L, 0L))
    is.object(res) && return()
  }
  `names<-`(res, c("connections", "cumulative", "awaiting", "executing", "completed"))
}

#' Query if Daemons are Set
#'
#' Returns a logical value, whether or not daemons have been set for a given
#' compute profile.
#'
#' @inheritParams mirai
#'
#' @return Logical `TRUE` or `FALSE`.
#'
#' @examplesIf interactive()
#' daemons_set()
#' daemons(1)
#' daemons_set()
#' daemons(0)
#'
#' @export
#'
daemons_set <- function(.compute = NULL) !is.null(compute_env(.compute))

#' Require Daemons
#'
#' Returns TRUE only if daemons are set, otherwise produces an informative
#' error for the user to set daemons, with a clickable function link if the
#' \CRANpkg{cli} package is available.
#'
#' @inheritParams mirai
#' @param call (only used if the \CRANpkg{cli} package is installed) the
#'   execution environment of a currently running function, e.g.
#'   `environment()`. The function will be mentioned in error messages as the
#'   source of the error.
#'
#' @return Logical `TRUE`, or else errors.
#'
#' @note
#' Previously the arguments were reversed with `call` coming before `.compute`.
#' Specifying an environment to the first argument works for the time being,
#' although is deprecated and will be defunct in a future version.
#'
#' @examplesIf interactive()
#' daemons(1)
#' require_daemons()
#' daemons(0)
#'
#' @export
#'
require_daemons <- function(.compute = NULL, call = environment()) {
  ensure_cli_initialized()
  is.environment(.compute) && {
    temp <- .compute
    .compute <- if (is.character(call)) call
    call <- temp
    TRUE
  }
  daemons_set(.compute = .compute) || .[["require_daemons"]](.compute, call)
}

#' With Daemons
#'
#' Evaluate an expression using a specific compute profile.
#'
#' Will error if the specified compute profile is not yet set up.
#'
#' @inheritParams require_daemons
#' @param expr an expression to evaluate.
#'
#' @return For **with_daemons**: the return value of `expr`. \cr
#'   For **local_daemons**: invisible NULL.
#'
#' @examplesIf interactive()
#' daemons(1, dispatcher = FALSE, .compute = "cpu")
#' daemons(1, dispatcher = FALSE, .compute = "gpu")
#'
#' with_daemons("cpu", {
#'   s1 <- status()
#'   m1 <- mirai(Sys.getpid())
#' })
#'
#' with_daemons("gpu", {
#'   s2 <- status()
#'   m2 <- mirai(Sys.getpid())
#'   m3 <- mirai(Sys.getpid(), .compute = "cpu")
#'   local_daemons("cpu")
#'   m4 <- mirai(Sys.getpid())
#' })
#'
#' s1$daemons
#' m1[]
#'
#' s2$daemons
#' m2[] # different to m1
#'
#' m3[] # same as m1
#' m4[] # same as m1
#'
#' with_daemons("cpu", daemons(0))
#' with_daemons("gpu", daemons(0))
#'
#' @export
#'
with_daemons <- function(.compute, expr) {
  require_daemons(.compute = .compute, call = environment())
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", .compute)
  on.exit(`[[<-`(., "cp", prev_profile))
  expr
}

#' @param frame \[default parent.frame()\] the frame (environment) to which the
#'   daemons compute profile is scoped.
#'
#' @rdname with_daemons
#' @export
#'
local_daemons <- function(.compute, frame = parent.frame()) {
  require_daemons(.compute = .compute, call = frame)
  prev_profile <- .[["cp"]]
  `[[<-`(., "cp", .compute)
  expr <- as.call(list(function() `[[<-`(., "cp", prev_profile)))
  do.call(on.exit, list(expr, TRUE, FALSE), envir = frame)
}

#' Create Serialization Configuration
#'
#' Returns a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once set by passing to the `serial` argument of [daemons()], the functions
#' apply to all mirai requests for that compute profile.
#'
#' This feature utilises the 'refhook' system of R native serialization.
#'
#' @param class a character string (or vector) of the class of object custom
#'   serialization functions are applied to, e.g. `'ArrowTabular'` or
#'   `c('torch_tensor', 'ArrowTabular')`.
#' @param sfunc a function (or list of functions) that accepts a reference
#'   object inheriting from `class` and returns a raw vector.
#' @param ufunc a function (or list of functions) that accepts a raw vector and
#'   returns a reference object.
#'
#' @return A list comprising the configuration. This should be passed to the
#'   `serial` argument of [daemons()].
#'
#' @examples
#' cfg <- serial_config("test_cls", function(x) serialize(x, NULL), unserialize)
#' cfg
#'
#' cfg2 <- serial_config(
#'   c("class_one", "class_two"),
#'   list(function(x) serialize(x, NULL), function(x) serialize(x, NULL)),
#'   list(unserialize, unserialize)
#' )
#' cfg2
#'
#' @export
#'
serial_config <- serial_config

#' Register Serialization Configuration
#'
#' Registers a serialization configuration, which may be set to perform custom
#' serialization and unserialization of normally non-exportable reference
#' objects, allowing these to be used seamlessly between different R sessions.
#' Once registered, the functions apply to all [daemons()] calls where the
#' `serial` argument is `NULL`.
#'
#' @inheritParams serial_config
#'
#' @return Invisible NULL.
#'
#' @export
#'
register_serial <- function(class, sfunc, ufunc) {
  cfg <- serial_config(class, sfunc, ufunc)
  reg <- .[["serial"]]
  `[[<-`(., "serial", lapply(seq_along(cfg), function(i) c(reg[[i]], cfg[[i]])))
  invisible()
}

# internals --------------------------------------------------------------------

compute_env <- function(x) ..[[if (is.null(x)) .[["cp"]] else x]]

configure_tls <- function(url, tls, pass, envir, config = TRUE) {
  purl <- parse_url(url)
  sch <- purl[["scheme"]]
  if ((startsWith(sch, "wss") || startsWith(sch, "tls")) && is.null(tls)) {
    cert <- write_cert(cn = purl[["hostname"]])
    `[[<-`(envir, "tls", cert[["client"]])
    tls <- cert[["server"]]
  }
  cfg <- if (length(tls)) tls_config(server = tls, pass = pass)
  list(tls, cfg)
}

create_profile <- function(envir, .compute, n, dots) {
  `[[<-`(envir, "n", n)
  `[[<-`(envir, "dots", dots)
  `[[<-`(.., .compute, envir)
}

init_envir_stream <- function(seed) {
  .advance()
  oseed <- .GlobalEnv[[".Random.seed"]]
  RNGkind("L'Ecuyer-CMRG")
  if (length(seed)) set.seed(seed)
  envir <- new.env(hash = FALSE, parent = ..)
  `[[<-`(envir, "stream", .GlobalEnv[[".Random.seed"]])
  `[[<-`(.GlobalEnv, ".Random.seed", oseed)
  `[[<-`(envir, "seed", seed)
}

req_socket <- function(url, tls = NULL) {
  `opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", 0L)
}

parse_dots <- function(envir, ...) {
  ...length() || return("")
  dots <- list(...)
  if (any(names(dots) == "tlscert")) {
    `[[<-`(envir, "tls", dots[["tlscert"]])
  }
  dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))]
  length(dots) || return("")
  sprintf(",%s", paste(names(dots), dots, sep = "=", collapse = ","))
}

parse_tls <- function(tls) {
  switch(
    length(tls) + 1L,
    "",
    sprintf(",tlscert=\"%s\"", tls),
    sprintf(",tlscert=c(\"%s\",\"%s\")", tls[1L], tls[2L])
  )
}

libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]

args_daemon_direct <- function(url, dots, rs, tls = NULL) {
  shQuote(sprintf(
    "mirai::daemon(\"%s\",dispatcher=FALSE%s%s,rs=c(%s))",
    url,
    dots,
    parse_tls(tls),
    paste0(rs, collapse = ",")
  ))
}

args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
  shQuote(sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls)))
}

args_dispatcher <- function(urld, url, n) {
  shQuote(sprintf(
    ".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\",n=%d)",
    libp(),
    urld,
    url,
    n
  ))
}

launch_daemon <- function(args) system2(.command, args = c("-e", args), wait = FALSE)

query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) {
  r <- send(sock, command, mode = send_mode, block = block)
  r && return(r)
  recv(sock, mode = recv_mode, block = block)
}

launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
  cv <- cv()
  urld <- local_url()
  sock <- req_socket(urld)
  pipe_notify(sock, cv, add = TRUE)
  local <- is.numeric(url)
  n <- if (local) url else 0L
  if (local) url <- local_url()
  system2(
    .command,
    args = c("--default-packages=NULL", "--vanilla", "-e", args_dispatcher(urld, url, n)),
    wait = FALSE
  )
  if (is.null(serial)) serial <- .[["serial"]]
  if (is.list(serial)) `opt<-`(sock, "serial", serial)
  `[[<-`(envir, "cv", cv)
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "dispatcher", urld)
  data <- list(Sys.getenv("R_DEFAULT_PACKAGES"), tls, pass, serial, envir[["stream"]])
  sync <- 0L

  while(!until(cv, .limit_long))
    message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))

  pipe_notify(sock, NULL, add = TRUE)
  send(sock, data, mode = 1L, block = TRUE)
  if (local) {
    launch_args <- args_daemon_disp(url, dots)
    for (i in seq_len(n)) {
      launch_daemon(launch_args)
    }
  }
  req <- recv_aio(sock, mode = 2L, cv = cv)
  while(!until(cv, .limit_long))
    message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))

  res <- collect_aio(req)
  `[[<-`(envir, "pid", as.integer(res[1L]))
  `[[<-`(envir, "url", res[2L])
}

launch_daemons <- function(seq, dots, envir) {
  cv <- cv()
  urld <- local_url()
  sock <- req_socket(urld)
  pipe_notify(sock, cv, add = TRUE)
  for (i in seq) {
    launch_daemon(args_daemon_direct(urld, dots, maybe_next_stream(envir)))
  }
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "url", urld)
  sync <- 0L
  for (i in seq)
    while(!until(cv, .limit_long))
      message(sprintf(._[["sync_daemons"]], sync <- sync + .limit_long_secs))
  pipe_notify(sock, NULL, add = TRUE)
}

sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

create_sock <- function(envir, url, tls) {
  sock <- req_socket(url, tls = tls)
  listener <- attr(sock, "listener")[[1L]]
  url <- opt(listener, "url")
  if (parse_url(url)[["port"]] == "0") {
    url <- sub_real_port(opt(listener, "tcp-bound-port"), url)
  }
  `[[<-`(envir, "sock", sock)
  `[[<-`(envir, "url", url)
}

send_signal <- function(envir) {
  signals <- if (is.null(envir[["dispatcher"]])) {
    stat(envir[["sock"]], "pipes")
  } else {
    query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
  }
  for (i in seq_len(signals)) {
    send(envir[["sock"]], ._scm_., mode = 2L)
    msleep(10L)
  }
}

dispatcher_status <- function(envir) {
  status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
  is.object(status) && return(status)
  out <- list(
    connections = status[1L],
    daemons = envir[["url"]],
    mirai = c(
      awaiting = status[3L],
      executing = status[4L],
      completed = status[5L]
    )
  )
  if (length(status) > 5L) {
    out <- c(out, list(events = status[6:length(status)]))
  }
  out
}

stop_d_cli <- function(.compute, call) {
  cli::cli_abort(
    if (is.character(.compute)) c(
      sprintf("No daemons set for the '%s' compute profile.", .compute),
      sprintf("Use e.g. {.run mirai::daemons(6, .compute = \"%s\")} to set 6 local daemons.", .compute)
    ) else c(
      "No daemons set.",
      "Use e.g. {.run mirai::daemons(6)} to set 6 local daemons."
    ),
    call = call
  )
}

stop_d <- function(.compute, call) {
  stop(
    if (is.character(.compute)) {
      sprintf("No daemons set for the '%1$s' compute profile.\nUse e.g. mirai::daemons(6, .compute = \"%1$s\") to set 6 local daemons.", .compute)
    } else {
      "No daemons set.\nUse e.g. mirai::daemons(6) to set 6 local daemons."
    },
    call. = FALSE
  )
}

._scm_. <- as.raw(c(0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Sept. 9, 2025, 5:51 p.m.