R/parallel.R

Defines functions node_unresolved print.miraiNode print.miraiCluster recvOneData.miraiCluster recvData.miraiNode sendData.miraiNode stop_cluster make_cluster

Documented in make_cluster stop_cluster

# Copyright (C) 2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.

# mirai x parallel -------------------------------------------------------------

#' Make Mirai Cluster
#'
#' \code{make_cluster} creates a cluster of type 'miraiCluster', which may be
#'     used as a cluster object for any function in the \pkg{parallel} base
#'     package such as \code{\link[parallel]{clusterApply}} or
#'     \code{\link[parallel]{parLapply}}.
#'
#' @param n integer number of nodes (automatically launched on the local machine
#'     unless 'url' is supplied).
#' @param url [default NULL] (specify for remote nodes) the character URL on the
#'     host for remote nodes to dial into, including a port accepting incoming
#'     connections, e.g. 'tcp://10.75.37.40:5555'. Specify a URL with the scheme
#'     'tls+tcp://' to use secure TLS connections.
#' @param remote [default NULL] (specify to launch remote nodes) a remote launch
#'     configuration generated by \code{\link{remote_config}} or \code{\link{ssh_config}}.
#'     If not supplied, nodes may be deployed manually on remote resources.
#' @param ... additional arguments passed onto \code{\link{daemons}}.
#'
#' @return For \strong{make_cluster}: An object of class 'miraiCluster' and
#'     'cluster'. Each 'miraiCluster' has an automatically assigned ID and 'n'
#'     nodes of class 'miraiNode'. If 'url' is supplied but not 'remote', the
#'     shell commands for deployment of nodes on remote resources is printed in
#'     interactive sessions.
#'
#'     For \strong{stop_cluster}: invisible NULL.
#'
#' @section Remote Nodes:
#'
#'     Specify 'url' and 'n' to set up a host connection for remote nodes to
#'     dial into. 'n' defaults to one if not specified.
#'
#'     Also specify 'remote' to launch the nodes using a configuration generated
#'     by \code{\link{remote_config}} or \code{\link{ssh_config}}. In this case,
#'     the number of nodes is inferred from the configuration provided and 'n'
#'     is disregarded.
#'
#'     If 'remote' is not supplied, the shell commands for deploying nodes
#'     manually on remote resources are automatically printed in interactive
#'     sessions.
#'
#'     \code{\link{launch_remote}} may be called at any time on a 'miraiCluster'
#'     to return the shell commands for deployment of all nodes, or on a
#'     'miraiNode' to return the command for a single node.
#'
#' @section Status:
#'
#'     Call \code{\link{status}} on a 'miraiCluster' to check the number of
#'     currently active connections as well as the host URL.
#'
#' @note Requires R >= 4.4 (currently R-devel). Clusters created with this
#'     function will not work with prior R versions. The functionality is
#'     experimental prior to release of R 4.4 and the interface is consequently
#'     subject to change at any time.
#'
#'     The default behaviour of clusters created by this function is
#'     designed to map as closely as possible to clusters created by the
#'     \pkg{parallel} package. However, '...' arguments are passed onto
#'     \code{\link{daemons}} for additional customisation if desired, although
#'     resultant behaviour may not be supported.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' cl <- make_cluster(2)
#' cl
#' cl[[1L]]
#'
#' Sys.sleep(0.5)
#' status(cl)
#'
#' stop_cluster(cl)
#'
#' }
#'
#' @export
#'
make_cluster <- function(n, url = NULL, remote = NULL, ...) {

  id <- sprintf("`%d`", length(..))
  printLaunchCmd <- FALSE

  if (is.character(url)) {

    length(url) == 1L || stop(.messages[["single_url"]])
    daemons(url = url, remote = remote, dispatcher = FALSE, resilience = FALSE, cleanup = FALSE, ..., .compute = id)

    if (length(remote)) {
      args <- remote[["args"]]
      n <- if (is.list(args)) length(args) else 1L
    } else {
      if (missing(n)) n <- 1L
      is.numeric(n) || stop(.messages[["numeric_n"]])
      if (interactive()) {
        cat("Shell commands for deployment on nodes:\n\n", file = stdout())
        print(launch_remote(rep(..[[id]][["urls"]], n), .compute = id))
      }
    }

  } else {
    is.numeric(n) || stop(.messages[["numeric_n"]])
    daemons(n = n, dispatcher = FALSE, resilience = FALSE, cleanup = FALSE, ..., .compute = id)
  }

  pipe_notify(..[[id]][["sock"]], cv = ..[[id]][["cv"]], add = FALSE, remove = TRUE, flag = TRUE)

  cl <- vector(mode = "list", length = n)
  for (i in seq_along(cl))
    cl[[i]] <- `attributes<-`(new.env(), list(class = "miraiNode", node = i, id = id))

  `attributes<-`(cl, list(class = c("miraiCluster", "cluster"), id = id))

}

#' Stop Mirai Cluster
#'
#' \code{stop_cluster} stops a cluster created by \code{make_cluster}.
#'
#' @param cl a 'miraiCluster'.
#'
#' @rdname make_cluster
#' @export
#'
stop_cluster <- function(cl) {

  daemons(0L, .compute = attr(cl, "id"))
  invisible()

}

#' @method stopCluster miraiCluster
#' @export
#'
stopCluster.miraiCluster <- stop_cluster

#' @method sendData miraiNode
#' @export
#'
sendData.miraiNode <- function(node, data) {

  length(..[[attr(node, "id")]]) || stop(.messages[["cluster_inactive"]])

  value <- data[["data"]]
  has_tag <- !is.null(value[["tag"]])

  m <- mirai(do.call(node, data, quote = TRUE), node = value[["fun"]], data = value[["args"]],
             .signal = has_tag, .compute = attr(node, "id"))

  if (has_tag)
    assign("tag", value[["tag"]], m)

  `[[<-`(node, "mirai", m)

}

#' @method recvData miraiNode
#' @export
#'
recvData.miraiNode <- function(node) call_mirai(.subset2(node, "mirai"))

#' @method recvOneData miraiCluster
#' @export
#'
recvOneData.miraiCluster <- function(cl) {

  wait(..[[attr(cl, "id")]][["cv"]]) || {
    stop_cluster(cl)
    stop(.messages[["nodes_failed"]])
  }

  node <- which.min(lapply(cl, node_unresolved))
  m <- .subset2(.subset2(cl, node), "mirai")
  `class<-`(m, NULL)
  list(node = node, value = m)

}

#' @export
#'
print.miraiCluster <- function(x, ...) {

  id <- attr(.subset2(x, 1L), "id")
  cat(sprintf("< miraiCluster >\n - cluster ID: %s\n - nodes: %d\n - active: %s\n",
              id, length(x), as.logical(length(..[[id]]))), file = stdout())
  invisible(x)

}

#' @export
#'
print.miraiNode <- function(x, ...) {

  cat(sprintf("< miraiNode >\n - node: %d\n - cluster ID: %s\n", attr(x, "node"), attr(x, "id")), file = stdout())
  invisible(x)

}

# internals --------------------------------------------------------------------

node_unresolved <- function(node) {
  m <- .subset2(node, "mirai")
  unresolved(m) || !is.object(m)
}

Try the mirai package in your browser

Any scripts or data that you put into this service are public.

mirai documentation built on Nov. 16, 2023, 5:08 p.m.