R/dispatcher.R

Defines functions get_and_reset_env init_monitor query_status query_dispatcher sub_real_port new_tokenized_url auto_tokenized_url saisei dispatcher

Documented in dispatcher saisei

# Copyright (C) 2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.

# mirai ------------------------------------------------------------------------

#' Dispatcher
#'
#' Dispatches tasks from a host to multiple daemons for processing, using a FIFO
#'     scheduling rule, queuing tasks as required. Daemon / dispatcher settings
#'     may be controlled by \code{\link{daemons}} and this function should not
#'     need to be invoked directly.
#'
#' @inheritParams daemon
#' @param host the character host URL to dial (where tasks are sent from),
#'     including the port to connect to (and optionally for websockets, a path),
#'     e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param url (optional) the character URL or vector of URLs dispatcher should
#'     listen at, including the port to connect to (and optionally for websockets,
#'     a path), e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#'     Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Tasks are
#'     sent to daemons dialled into these URLs. If not supplied, 'n' local
#'     inter-process URLs will be assigned automatically.
#' @param n (optional) if specified, the integer number of daemons to listen for.
#'     Otherwise 'n' will be inferred from the number of URLs supplied in 'url'.
#'     Where a single URL is supplied and 'n' > 1, 'n' unique URLs will be
#'     automatically assigned for daemons to dial into.
#' @param ... (optional) additional arguments passed through to \code{\link{daemon}}.
#'     These include 'autoexit', 'cleanup', 'maxtasks', 'idletime', 'walltime'
#'     and 'timerstart'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#'     default FALSE will error if a connection is not immediately possible
#'     (e.g. \code{\link{daemons}} has yet to be called on the host, or the
#'     specified port is not open etc.). Specifying TRUE continues retrying
#'     (indefinitely) if not immediately successful, which is more resilient but
#'     can mask potential connection issues.
#' @param token [default FALSE] if TRUE, appends a unique 24-character token
#'     to each URL path the dispatcher listens at (not applicable for TCP URLs
#'     which do not accept a path).
#' @param tls [default NULL] (required for secure TLS connections) \strong{either}
#'     the character path to a file containing the PEM-encoded TLS certificate
#'     and associated private key (may contain additional certificates leading
#'     to a validation chain, with the TLS certificate first), \strong{or} a
#'     length 2 character vector comprising [i] the TLS certificate (optionally
#'     certificate chain) and [ii] the associated private key.
#' @param pass [default NULL] (required only if the private key supplied to 'tls'
#'     is encrypted with a password) For security, should be provided through a
#'     function that returns this value, rather than directly.
#' @param monitor (for package internal use only) do not set this parameter.
#'
#' @return Invisible NULL.
#'
#' @details The network topology is such that a dispatcher acts as a gateway
#'     between the host and daemons, ensuring that tasks received from the host
#'     are dispatched on a FIFO basis for processing. Tasks are queued at the
#'     dispatcher to ensure tasks are only sent to daemons that can begin
#'     immediate execution of the task.
#'
#' @export
#'
dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
                       token = FALSE, tls = NULL, pass = NULL, rs = NULL,
                       monitor = NULL) {

  n <- if (is.numeric(n)) as.integer(n) else length(url)
  n > 0L || stop(.messages[["missing_url"]])

  cv <- cv()
  sock <- socket(protocol = "rep")
  on.exit(reap(sock))
  pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
  dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial)

  auto <- is.null(url)
  vectorised <- length(url) == n
  seq_n <- seq_len(n)
  basenames <- servernames <- character(n)
  activestore <- instance <- complete <- assigned <- integer(n)
  serverfree <- !integer(n)
  active <- servers <- queue <- vector(mode = "list", length = n)
  if (auto) {
    dots <- parse_dots(...)
  } else {
    baseurl <- parse_url(url)
    if (substr(baseurl[["scheme"]], 1L, 1L) == "t") {
      ports <- if (baseurl[["port"]] == "0") integer(n) else seq.int(baseurl[["port"]], length.out = n)
      token <- FALSE
    } else {
      ports <- NULL
    }

    if (substr(baseurl[["scheme"]], 1L, 3L) %in% c("wss", "tls") && is.null(tls)) {
      tls <- get_and_reset_env("MIRAI_TEMP_FIELD1")
      if (length(tls))
        tls <- c(tls, get_and_reset_env("MIRAI_TEMP_FIELD2"))
    }
    if (length(tls)) {
      if (is.null(pass))
        pass <- get_and_reset_env("MIRAI_TEMP_VAR")
      tls <- tls_config(server = tls, pass = pass)
      pass <- NULL
    }
  }

  envir <- new.env(hash = FALSE)
  if (length(rs))
    `[[<-`(envir, "stream", as.integer(rs))

  for (i in seq_n) {
    burl <- if (auto) .urlscheme else
      if (vectorised) url[i] else
        if (is.null(ports)) sprintf("%s/%d", url, i) else
          sub(ports[1L], ports[i], url, fixed = TRUE)
    basenames[i] <- burl
    nurl <- if (auto) auto_tokenized_url() else if (token) new_tokenized_url(burl) else burl
    nsock <- req_socket(NULL)
    ncv <- cv()
    pipe_notify(nsock, cv = ncv, cv2 = cv, add = TRUE, remove = TRUE, flag = FALSE)
    lock(nsock, cv = ncv)
    listen(nsock, url = nurl, tls = tls, error = TRUE)
    listener <- attr(nsock, "listener")[[1L]]
    if (i == 1L && !auto && parse_url(opt(listener, "url"))[["port"]] == "0") {
      realport <- opt(listener, "tcp-bound-port")
      servernames[i] <- sub_real_port(port = realport, url = nurl)
      if (!vectorised || n == 1L) {
        url <- sub_real_port(port = realport, url = url)
        basenames[1L] <- sub_real_port(port = realport, url = burl)
      }
    } else {
      servernames[i] <- opt(listener, "url")
    }

    auto && launch_daemon(nurl, dots, next_stream(envir))

    servers[[i]] <- nsock
    active[[i]] <- ncv
    ctx <- .context(sock)
    req <- recv_aio_signal(ctx, cv = cv, mode = 8L)
    queue[[i]] <- list(ctx = ctx, req = req)
  }

  on.exit(lapply(servers, reap), add = TRUE, after = TRUE)

  ctrchannel <- is.character(monitor)
  if (ctrchannel) {
    sockc <- socket(protocol = "rep")
    on.exit(reap(sockc), add = TRUE, after = FALSE)
    dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
    recv(sockc, mode = 6L, block = .timelimit) && stop(.messages[["sync_timeout"]])
    send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
    cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
  }

  suspendInterrupts(
    repeat {

      wait(cv) || break

      cv_values <- as.integer(lapply(active, cv_value))
      activevec <- cv_values %% 2L
      changes <- (activevec - activestore) > 0L
      activestore <- activevec
      if (any(changes)) {
        instance[changes] <- abs(instance[changes]) + 1L
        serverfree <- serverfree | changes
      }

      ctrchannel && !unresolved(cmessage) && {
        i <- .subset2(cmessage, "value")
        if (i) {
          if (i > 0L && !activevec[[i]]) {
            reap(attr(servers[[i]], "listener")[[1L]])
            attr(servers[[i]], "listener") <- NULL
            data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
            instance[i] <- -abs(instance[i])
            listen(servers[[i]], url = data, tls = tls, error = TRUE)

          } else if (i < 0L) {
            i <- -i
            reap(servers[[i]])
            servers[[i]] <- nsock <- req_socket(NULL)
            pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE, flag = FALSE)
            lock(nsock, cv = active[[i]])
            data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
            instance[i] <- -abs(instance[i])
            listen(nsock, url = data, tls = tls, error = TRUE)

          } else {
            data <- ""

          }
        } else {
          data <- as.integer(c(seq_n, activevec, instance, assigned, complete))
        }
        send_aio(sockc, data = data, mode = 2L)
        cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
        next
      }

      for (i in seq_n)
        if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["res"]])) {
          req <- .subset2(queue[[i]][["res"]], "value")
          if (is.object(req)) req <- serialize(req, NULL)
          send_aio(queue[[i]][["ctx"]], data = req, mode = 2L)
          q <- queue[[i]][["daemon"]]
          if (req[1L] == .nextmode) {
            ctx <- .context(servers[[q]])
            send(ctx, data = NULL, mode = 2L, block = FALSE)
            reap(ctx)
          } else {
            serverfree[q] <- TRUE
          }
          complete[q] <- complete[q] + 1L
          ctx <- .context(sock)
          req <- recv_aio_signal(ctx, cv = cv, mode = 8L)
          queue[[i]] <- list(ctx = ctx, req = req)
        }

      free <- which(serverfree & activevec)

      if (length(free))
        for (q in free)
          for (i in seq_n) {
            if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) {
              queue[[i]][["res"]] <- request_signal(.context(servers[[q]]), data = .subset2(queue[[i]][["req"]], "value"), cv = cv, send_mode = 2L, recv_mode = 8L)
              queue[[i]][["daemon"]] <- q
              serverfree[q] <- FALSE
              assigned[q] <- assigned[q] + 1L
              break
            }
            serverfree[q] || break
          }

    }
  )

}

#' Saisei (Regenerate Token)
#'
#' When using daemons with dispatcher, regenerates the token for the URL a
#'     dispatcher socket listens at.
#'
#' @param i integer index number URL to regenerate at dispatcher.
#' @param force [default FALSE] logical value whether to regenerate the URL even
#'     when there is an existing active connection.
#' @param .compute [default 'default'] character compute profile (each compute
#'     profile has its own set of daemons for connecting to different resources).
#'
#' @return The regenerated character URL upon success, or else NULL.
#'
#' @details When a URL is regenerated, the listener at the specified socket is
#'     closed and replaced immediately, hence this function will only be
#'     successful if there are no existing connections at the socket (i.e.
#'     'online' status shows 0), unless the argument 'force' is specified as TRUE.
#'
#'     If 'force' is specified as TRUE, the socket is immediately closed and
#'     regenerated. If this happens while a mirai is still ongoing, it will be
#'     returned as an errorValue 7 'Object closed'. This may be used to cancel a
#'     task that consistently hangs or crashes to prevent it from failing
#'     repeatedly when new daemons connect.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(1L)
#' Sys.sleep(1L)
#' status()
#' saisei(i = 1L, force = TRUE)
#' status()
#'
#' daemons(0)
#'
#' }
#'
#' @export
#'
saisei <- function(i, force = FALSE, .compute = "default") {

  envir <- ..[[.compute]]
  i <- as.integer(`length<-`(i, 1L))
  length(envir[["sockc"]]) && i > 0L && i <= envir[["n"]] && substr(envir[["urls"]][i], 1L, 1L) != "t" || return()
  r <- query_dispatcher(sock = envir[["sockc"]], command = if (force) -i else i, mode = 9L)
  is.character(r) && nzchar(r) || return()
  envir[["urls"]][i] <- r
  r

}

# internals --------------------------------------------------------------------

auto_tokenized_url <- function() strcat(.urlscheme, random(12L))

new_tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))

sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

query_dispatcher <- function(sock, command, mode) {
  send(sock, data = command, mode = 2L, block = .timelimit)
  recv(sock, mode = mode, block = .timelimit)
}

query_status <- function(envir) {
  res <- query_dispatcher(sock = envir[["sockc"]], command = 0L, mode = 5L)
  is.object(res) && return(res)
  `attributes<-`(res, list(dim = c(envir[["n"]], 5L),
                           dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete"))))
}

init_monitor <- function(sockc, envir) {
  res <- query_dispatcher(sockc, command = FALSE, mode = 2L)
  is.object(res) && stop(.messages[["sync_timeout"]])
  `[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L]))
}

get_and_reset_env <- function(x) {
  candidate <- Sys.getenv(x)
  if (nzchar(candidate)) {
    Sys.unsetenv(x)
    candidate
  }
}

.nextmode <- as.raw(7L)

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Nov. 16, 2023, 5:08 p.m.