R/daemon.R

Defines functions perform_cleanup parse_cleanup dial_and_sync_socket .daemon daemon

Documented in daemon .daemon

# Copyright (C) 2022-2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.

# mirai ------------------------------------------------------------------------

#' Daemon Instance
#'
#' Starts up an execution daemon to receive \code{\link{mirai}} requests. Awaits
#'     data, evaluates an expression in an environment containing the supplied
#'     data, and returns the value to the host caller. Daemon settings may be
#'     controlled by \code{\link{daemons}} and this function should not need to
#'     be invoked directly, unless deploying manually on remote resources.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#'     port to connect to (and optionally for websockets, a path), e.g.
#'     'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param autoexit [default TRUE] logical value, whether the daemon should
#'     exit automatically when its socket connection ends (see 'Persistence'
#'     section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#'     the global environment and restore loaded packages and options to an
#'     initial state after each evaluation. For more granular control, also
#'     accepts an integer value (see 'Cleanup Options' section below).
#' @param output [default FALSE] logical value, to output generated stdout /
#'     stderr if TRUE, or else discard if FALSE. Specify as TRUE in the '...'
#'     argument to \code{\link{daemons}} or \code{\link{launch_local}} to provide
#'     redirection of output to the host process (applicable only for local
#'     daemons when not using dispatcher).
#' @param maxtasks [default Inf] the maximum number of tasks to execute (task
#'     limit) before exiting.
#' @param idletime [default Inf] maximum idle time, since completion of the last
#'     task (in milliseconds) before exiting.
#' @param walltime [default Inf] soft walltime, or the minimum amount of real
#'     time taken (in milliseconds) before exiting.
#' @param timerstart [default 0L] number of completed tasks after which to start
#'     the timer for 'idletime' and 'walltime'. 0L implies timers are started
#'     upon launch.
#' @param ... reserved but not currently used.
#' @param tls [default NULL] required for secure TLS connections over 'tls+tcp://'
#'     or 'wss://'. \strong{Either} the character path to a file containing
#'     X.509 certificate(s) in PEM format, comprising the certificate authority
#'     certificate chain starting with the TLS certificate and ending with the
#'     CA certificate, \strong{or} a length 2 character vector comprising [i]
#'     the certificate authority certificate chain and [ii] the empty character
#'     \code{''}.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#'     automatically using L'Ecuyer-CMRG RNG streams generated by the host
#'     process and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @details The network topology is such that daemons dial into the host or
#'     dispatcher, which listens at the 'url' address. In this way, network
#'     resources may be added or removed dynamically and the host or
#'     dispatcher automatically distributes tasks to all available daemons.
#'
#' @section Persistence:
#'
#'     The 'autoexit' argument governs persistence settings for the daemon. The
#'     default TRUE ensures that it will exit cleanly under all circumstances
#'     once its socket connection has ended.
#'
#'     Setting to FALSE allows the daemon to persist indefinitely even when
#'     there is no longer a socket connection. This allows a host session to end
#'     and a new session to connect at the URL where the daemon is dialled in.
#'     Daemons must be terminated with \code{daemons(NULL)} in this case, which
#'     sends an exit signal to all connected daemons.
#'
#'     Persistence also implies that dials are performed asynchronously, which
#'     means retries are attempted (indefinitely) if not immediately successful.
#'     This is resilient behaviour but can mask potential connection issues.
#'
#' @section Cleanup Options:
#'
#'     The 'cleanup' argument also accepts an integer value, which operates an
#'     additive bitmask: perform cleanup of the global environment (1L), reset
#'     loaded packages to an initial state (2L), restore options to an initial
#'     state (4L), and perform garbage collection (8L).
#'
#'     As an example, to perform cleanup of the global environment and garbage
#'     collection, specify 9L (1L + 8L). The default argument value of TRUE
#'     performs all actions apart from garbage collection and is equivalent to a
#'     value of 7L.
#'
#'     Caution: do not reset options but not loaded packages if packages set
#'     options on load.
#'
#' @export
#'
daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
                   maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L,
                   ..., tls = NULL, rs = NULL) {

  cv <- cv()
  sock <- socket(protocol = "rep")
  on.exit(reap(sock))
  autoexit && pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
  if (length(tls)) tls <- tls_config(client = tls)
  dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)

  if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
  if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
  cleanup <- parse_cleanup(cleanup)
  if (!output) {
    devnull <- file(nullfile(), open = "w", blocking = FALSE)
    sink(file = devnull)
    sink(file = devnull, type = "message")
    on.exit({
      sink(type = "message")
      sink()
      close(devnull)
    }, add = TRUE)
  }
  `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", ".Random.seed")
  count <- 0L
  start <- mclock()

  repeat {

    ctx <- .context(sock)
    aio <- recv_aio_signal(ctx, cv = cv, mode = 1L, timeout = idletime)
    wait(cv) || break
    ._mirai_. <- .subset2(aio, "data")
    is.environment(._mirai_.) || {
      count < timerstart && {
        start <- mclock()
        next
      }
      break
    }
    data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
                     error = mk_mirai_error, interrupt = mk_interrupt_error)
    count <- count + 1L

    (count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
      send(ctx, data = data, mode = 3L)
      data <- recv_aio_signal(sock, cv = cv, mode = 8L, timeout = .timelimit)
      wait(cv)
      break
    }

    send(ctx, data = data, mode = 1L)
    perform_cleanup(cleanup)
    if (count <= timerstart) start <- mclock()

  }

}

#' dot Daemon
#'
#' Implements an ephemeral executor for the remote process.
#'
#' @inheritParams daemon
#'
#' @return Invisible NULL.
#'
#' @keywords internal
#' @export
#'
.daemon <- function(url) {

  sock <- socket(protocol = "rep", dial = url, autostart = NA)
  on.exit(reap(sock))
  ._mirai_. <- recv(sock, mode = 1L, block = TRUE)
  data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
                   error = mk_mirai_error, interrupt = mk_interrupt_error)
  send(sock, data = data, mode = 1L, block = TRUE)
  msleep(2000L)

}

# internals --------------------------------------------------------------------

dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) {
  cv <- cv()
  pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = FALSE)
  dial(sock, url = url, autostart = asyncdial || NA, tls = tls, error = TRUE)
  wait(cv)
}

parse_cleanup <- function(cleanup) {
  is.logical(cleanup) ||
    return(c(as.integer(cleanup) %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L)))
  c(cleanup, cleanup, cleanup, FALSE)
}

perform_cleanup <- function(cleanup) {
  if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
  if (cleanup[2L]) lapply((new <- search())[!new %in% .[["se"]]], detach, unload = TRUE, character.only = TRUE)
  if (cleanup[3L]) options(.[["op"]])
  if (cleanup[4L]) gc(verbose = FALSE)
}

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Nov. 16, 2023, 5:08 p.m.