Nothing
# Copyright (C) 2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.
# mirai x parallel -------------------------------------------------------------
#' Make Mirai Cluster
#'
#' \code{make_cluster} creates a cluster of type 'miraiCluster', which may be
#' used as a cluster object for any function in the \pkg{parallel} base
#' package such as \code{\link[parallel]{clusterApply}} or
#' \code{\link[parallel]{parLapply}}.
#'
#' @param n integer number of nodes (automatically launched on the local machine
#' unless 'url' is supplied).
#' @param url [default NULL] (specify for remote nodes) the character URL on the
#' host for remote nodes to dial into, including a port accepting incoming
#' connections, e.g. 'tcp://10.75.37.40:5555'. Specify a URL with the scheme
#' 'tls+tcp://' to use secure TLS connections.
#' @param remote [default NULL] (specify to launch remote nodes) a remote launch
#' configuration generated by \code{\link{remote_config}} or \code{\link{ssh_config}}.
#' If not supplied, nodes may be deployed manually on remote resources.
#' @param ... additional arguments passed onto \code{\link{daemons}}.
#'
#' @return For \strong{make_cluster}: An object of class 'miraiCluster' and
#' 'cluster'. Each 'miraiCluster' has an automatically assigned ID and 'n'
#' nodes of class 'miraiNode'. If 'url' is supplied but not 'remote', the
#' shell commands for deployment of nodes on remote resources is printed in
#' interactive sessions.
#'
#' For \strong{stop_cluster}: invisible NULL.
#'
#' @section Remote Nodes:
#'
#' Specify 'url' and 'n' to set up a host connection for remote nodes to
#' dial into. 'n' defaults to one if not specified.
#'
#' Also specify 'remote' to launch the nodes using a configuration generated
#' by \code{\link{remote_config}} or \code{\link{ssh_config}}. In this case,
#' the number of nodes is inferred from the configuration provided and 'n'
#' is disregarded.
#'
#' If 'remote' is not supplied, the shell commands for deploying nodes
#' manually on remote resources are automatically printed in interactive
#' sessions.
#'
#' \code{\link{launch_remote}} may be called at any time on a 'miraiCluster'
#' to return the shell commands for deployment of all nodes, or on a
#' 'miraiNode' to return the command for a single node.
#'
#' @section Status:
#'
#' Call \code{\link{status}} on a 'miraiCluster' to check the number of
#' currently active connections as well as the host URL.
#'
#' @note Requires R >= 4.4 (currently R-devel). Clusters created with this
#' function will not work with prior R versions. The functionality is
#' experimental prior to release of R 4.4 and the interface is consequently
#' subject to change at any time.
#'
#' The default behaviour of clusters created by this function is
#' designed to map as closely as possible to clusters created by the
#' \pkg{parallel} package. However, '...' arguments are passed onto
#' \code{\link{daemons}} for additional customisation if desired, although
#' resultant behaviour may not be supported.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' cl <- make_cluster(2)
#' cl
#' cl[[1L]]
#'
#' Sys.sleep(0.5)
#' status(cl)
#'
#' stop_cluster(cl)
#'
#' }
#'
#' @export
#'
make_cluster <- function(n, url = NULL, remote = NULL, ...) {
id <- sprintf("`%d`", length(..))
printLaunchCmd <- FALSE
if (is.character(url)) {
length(url) == 1L || stop(.messages[["single_url"]])
daemons(url = url, remote = remote, dispatcher = FALSE, resilience = FALSE, cleanup = FALSE, ..., .compute = id)
if (length(remote)) {
args <- remote[["args"]]
n <- if (is.list(args)) length(args) else 1L
} else {
if (missing(n)) n <- 1L
is.numeric(n) || stop(.messages[["numeric_n"]])
if (interactive()) {
cat("Shell commands for deployment on nodes:\n\n", file = stdout())
print(launch_remote(rep(..[[id]][["urls"]], n), .compute = id))
}
}
} else {
is.numeric(n) || stop(.messages[["numeric_n"]])
daemons(n = n, dispatcher = FALSE, resilience = FALSE, cleanup = FALSE, ..., .compute = id)
}
pipe_notify(..[[id]][["sock"]], cv = ..[[id]][["cv"]], add = FALSE, remove = TRUE, flag = TRUE)
cl <- vector(mode = "list", length = n)
for (i in seq_along(cl))
cl[[i]] <- `attributes<-`(new.env(), list(class = "miraiNode", node = i, id = id))
`attributes<-`(cl, list(class = c("miraiCluster", "cluster"), id = id))
}
#' Stop Mirai Cluster
#'
#' \code{stop_cluster} stops a cluster created by \code{make_cluster}.
#'
#' @param cl a 'miraiCluster'.
#'
#' @rdname make_cluster
#' @export
#'
stop_cluster <- function(cl) {
daemons(0L, .compute = attr(cl, "id"))
invisible()
}
#' @method stopCluster miraiCluster
#' @export
#'
stopCluster.miraiCluster <- stop_cluster
#' @method sendData miraiNode
#' @export
#'
sendData.miraiNode <- function(node, data) {
length(..[[attr(node, "id")]]) || stop(.messages[["cluster_inactive"]])
value <- data[["data"]]
has_tag <- !is.null(value[["tag"]])
m <- mirai(do.call(node, data, quote = TRUE), node = value[["fun"]], data = value[["args"]],
.signal = has_tag, .compute = attr(node, "id"))
if (has_tag)
assign("tag", value[["tag"]], m)
`[[<-`(node, "mirai", m)
}
#' @method recvData miraiNode
#' @export
#'
recvData.miraiNode <- function(node) call_mirai(.subset2(node, "mirai"))
#' @method recvOneData miraiCluster
#' @export
#'
recvOneData.miraiCluster <- function(cl) {
wait(..[[attr(cl, "id")]][["cv"]]) || {
stop_cluster(cl)
stop(.messages[["nodes_failed"]])
}
node <- which.min(lapply(cl, node_unresolved))
m <- .subset2(.subset2(cl, node), "mirai")
`class<-`(m, NULL)
list(node = node, value = m)
}
#' @export
#'
print.miraiCluster <- function(x, ...) {
id <- attr(.subset2(x, 1L), "id")
cat(sprintf("< miraiCluster >\n - cluster ID: %s\n - nodes: %d\n - active: %s\n",
id, length(x), as.logical(length(..[[id]]))), file = stdout())
invisible(x)
}
#' @export
#'
print.miraiNode <- function(x, ...) {
cat(sprintf("< miraiNode >\n - node: %d\n - cluster ID: %s\n", attr(x, "node"), attr(x, "id")), file = stdout())
invisible(x)
}
# internals --------------------------------------------------------------------
node_unresolved <- function(node) {
m <- .subset2(node, "mirai")
unresolved(m) || !is.object(m)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.