Nothing
# Copyright (C) 2022-2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.
# mirai ------------------------------------------------------------------------
#' Daemon Instance
#'
#' Starts up an execution daemon to receive \code{\link{mirai}} requests. Awaits
#' data, evaluates an expression in an environment containing the supplied
#' data, and returns the value to the host caller. Daemon settings may be
#' controlled by \code{\link{daemons}} and this function should not need to
#' be invoked directly, unless deploying manually on remote resources.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param autoexit [default TRUE] logical value, whether the daemon should
#' exit automatically when its socket connection ends (see 'Persistence'
#' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore loaded packages and options to an
#' initial state after each evaluation. For more granular control, also
#' accepts an integer value (see 'Cleanup Options' section below).
#' @param output [default FALSE] logical value, to output generated stdout /
#' stderr if TRUE, or else discard if FALSE. Specify as TRUE in the '...'
#' argument to \code{\link{daemons}} or \code{\link{launch_local}} to provide
#' redirection of output to the host process (applicable only for local
#' daemons when not using dispatcher).
#' @param maxtasks [default Inf] the maximum number of tasks to execute (task
#' limit) before exiting.
#' @param idletime [default Inf] maximum idle time, since completion of the last
#' task (in milliseconds) before exiting.
#' @param walltime [default Inf] soft walltime, or the minimum amount of real
#' time taken (in milliseconds) before exiting.
#' @param timerstart [default 0L] number of completed tasks after which to start
#' the timer for 'idletime' and 'walltime'. 0L implies timers are started
#' upon launch.
#' @param ... reserved but not currently used.
#' @param tls [default NULL] required for secure TLS connections over 'tls+tcp://'
#' or 'wss://'. \strong{Either} the character path to a file containing
#' X.509 certificate(s) in PEM format, comprising the certificate authority
#' certificate chain starting with the TLS certificate and ending with the
#' CA certificate, \strong{or} a length 2 character vector comprising [i]
#' the certificate authority certificate chain and [ii] the empty character
#' \code{''}.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host
#' process and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @details The network topology is such that daemons dial into the host or
#' dispatcher, which listens at the 'url' address. In this way, network
#' resources may be added or removed dynamically and the host or
#' dispatcher automatically distributes tasks to all available daemons.
#'
#' @section Persistence:
#'
#' The 'autoexit' argument governs persistence settings for the daemon. The
#' default TRUE ensures that it will exit cleanly under all circumstances
#' once its socket connection has ended.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when
#' there is no longer a socket connection. This allows a host session to end
#' and a new session to connect at the URL where the daemon is dialled in.
#' Daemons must be terminated with \code{daemons(NULL)} in this case, which
#' sends an exit signal to all connected daemons.
#'
#' Persistence also implies that dials are performed asynchronously, which
#' means retries are attempted (indefinitely) if not immediately successful.
#' This is resilient behaviour but can mask potential connection issues.
#'
#' @section Cleanup Options:
#'
#' The 'cleanup' argument also accepts an integer value, which operates an
#' additive bitmask: perform cleanup of the global environment (1L), reset
#' loaded packages to an initial state (2L), restore options to an initial
#' state (4L), and perform garbage collection (8L).
#'
#' As an example, to perform cleanup of the global environment and garbage
#' collection, specify 9L (1L + 8L). The default argument value of TRUE
#' performs all actions apart from garbage collection and is equivalent to a
#' value of 7L.
#'
#' Caution: do not reset options but not loaded packages if packages set
#' options on load.
#'
#' @export
#'
daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L,
..., tls = NULL, rs = NULL) {
cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
autoexit && pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)
if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
cleanup <- parse_cleanup(cleanup)
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
`[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", ".Random.seed")
count <- 0L
start <- mclock()
repeat {
ctx <- .context(sock)
aio <- recv_aio_signal(ctx, cv = cv, mode = 1L, timeout = idletime)
wait(cv) || break
._mirai_. <- .subset2(aio, "data")
is.environment(._mirai_.) || {
count < timerstart && {
start <- mclock()
next
}
break
}
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
count <- count + 1L
(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
send(ctx, data = data, mode = 3L)
data <- recv_aio_signal(sock, cv = cv, mode = 8L, timeout = .timelimit)
wait(cv)
break
}
send(ctx, data = data, mode = 1L)
perform_cleanup(cleanup)
if (count <= timerstart) start <- mclock()
}
}
#' dot Daemon
#'
#' Implements an ephemeral executor for the remote process.
#'
#' @inheritParams daemon
#'
#' @return Invisible NULL.
#'
#' @keywords internal
#' @export
#'
.daemon <- function(url) {
sock <- socket(protocol = "rep", dial = url, autostart = NA)
on.exit(reap(sock))
._mirai_. <- recv(sock, mode = 1L, block = TRUE)
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
send(sock, data = data, mode = 1L, block = TRUE)
msleep(2000L)
}
# internals --------------------------------------------------------------------
dial_and_sync_socket <- function(sock, url, asyncdial, tls = NULL) {
cv <- cv()
pipe_notify(sock, cv = cv, add = TRUE, remove = FALSE, flag = FALSE)
dial(sock, url = url, autostart = asyncdial || NA, tls = tls, error = TRUE)
wait(cv)
}
parse_cleanup <- function(cleanup) {
is.logical(cleanup) ||
return(c(as.integer(cleanup) %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L)))
c(cleanup, cleanup, cleanup, FALSE)
}
perform_cleanup <- function(cleanup) {
if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
if (cleanup[2L]) lapply((new <- search())[!new %in% .[["se"]]], detach, unload = TRUE, character.only = TRUE)
if (cleanup[3L]) options(.[["op"]])
if (cleanup[4L]) gc(verbose = FALSE)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.