R/concurrent.R

Defines functions doInStataCluster doInStataClusterLB stopStataCluster startStataCluster

Documented in doInStataCluster doInStataClusterLB startStataCluster stopStataCluster

#' Start a "cluster" of multiple Stata instances
#' 
#' A convenience wrapper around \code{\link[RStataLink]{startStata}}.
#' @param n Number of instances. Default value determined by
#' \code{\link[parallel]{detectCores}}.
#' @param ... Arguments passed to \code{\link[RStataLink]{startStata}}.
#' @return A list of objects of S3 class 'StataID' if all Stata instances confirm that
#' they are ready, else an error.
#' @export
startStataCluster <- function(n=parallel::detectCores(), ...) {
	noargs <- missing(...)
	args <- if (noargs) list() else list(...)
	replicate(n,
			  do.call(startStata, args),
			  simplify=FALSE)
}

#' Stop multiple Stata instances
#' 
#' A convenience wrapper around \code{\link[RStataLink]{stopStata}}.
#' @param cl A "cluster" -- a list of objects of S3 class 'StataID', possibly
#' generated by \code{\link[RStataLink]{startStataCluster}}.
#' @param ... Other arguments passed to \code{\link[RStataLink]{stopStata}}.
#' @export
stopStataCluster <- function(cl, ...) {
	stopifnot(cl %>% sapply(class) %>% equals('StataID') %>% all)
	lapply(cl, stopStata, ...) %>% invisible
}


#' Execute multiple pieces of Stata code (tasks/jobs) in a "cluster" of multiple Stata
#' instances -- a "load balancing" version
#' 
#' This is a \code{\link[parallel]{parLapplyLB}}-like wrapper around
#' \code{\link[RStataLink]{doInStata}} that allows executing
#' multiple pieces of Stata code in a parallel/concurrent manner.
#' @param cl A "cluster" -- a list of objects of S3 class 'StataID', possibly
#' generated by \code{\link[RStataLink]{startStataCluster}}.
#' @param X A vector (list of atomic character vectors, or an atomic character vector) of
#' jobs/tasks (Stata code) to be executed across different Stata instances.
#' @param isStataReadyTimeout Passed to argument \code{timeout} of \code{\link[RStataLink]{isStataReady}}
#' that is used internally to allocate jobs/tasks only to non-busy Stata instances.
#' Default: 1. See the warning for argument \code{timeout} in \code{\link[RStataLink]{isStataReady}}
#' documentation.
#' @param ... Further arguments passed to \code{\link[RStataLink]{doInStata}}.
#' @return A list of objects returned by \code{\link[RStataLink]{doInStata}} in the order
#' corresponding to the order of jobs/tasks in \code{X}.
#' @export
doInStataClusterLB <- function(cl, X, isStataReadyTimeout=1, ...) {
	stopifnot(cl %>% sapply(class) %>% equals('StataID') %>% all,
			  X %>% sapply(class) %>% equals('character') %>% all,
			  is.numeric(isStataReadyTimeout), isStataReadyTimeout>=0)
	cl <- onlyThoseReady(cl, timeout=isStataReadyTimeout)
	lapply(X, function(x) {
		ID <- NULL
		repeat {
			for (id in cl) {
				if (isStataReady(id, timeout=isStataReadyTimeout)) {ID <- id; break}
			}
			if (!is.null(ID)) break
			Sys.sleep(.01)
		}
		doInStata(id=ID, code=x, future=TRUE, ...)
	}) %>%
		lapply(getStataFuture)
}

#' Execute multiple pieces of Stata code (tasks/jobs) in a "cluster" of multiple Stata
#' instances
#' 
#' This is a \code{\link[parallel]{parLapply}}-like wrapper around
#' \code{\link[RStataLink]{doInStata}} that allows executing
#' multiple pieces of Stata code in a parallel/concurrent manner.
#' @param cl A "cluster" -- a list of objects of S3 class 'StataID', possibly
#' generated by \code{\link[RStataLink]{startStataCluster}}.
#' @param X A vector (list of atomic character vectors, or an atomic character vector) of
#' jobs/tasks (Stata code) to be executed across different Stata instances.
#' @param isStataReadyTimeout Passed to argument \code{timeout} of \code{\link[RStataLink]{isStataReady}}
#' that is used internally to allocate jobs/tasks only to non-busy Stata instances.
#' Default: 1. See the warning for argument \code{timeout} in \code{\link[RStataLink]{isStataReady}}
#' documentation.
#' @param ... Further arguments passed to \code{\link[RStataLink]{doInStata}}.
#' @return A list of objects returned by \code{\link[RStataLink]{doInStata}} in the order
#' corresponding to the order of jobs/tasks in \code{X}.
#' @export
doInStataCluster <- function(cl, X, isStataReadyTimeout=1, ...) {
	stopifnot(cl %>% sapply(class) %>% equals('StataID') %>% all,
			  X %>% sapply(class) %>% equals('character') %>% all,
			  is.numeric(isStataReadyTimeout), isStataReadyTimeout>=0)
	noargs <- missing(...)
	args <- if (noargs) list() else list(...)
	cl <- onlyThoseReady(cl, timeout=isStataReadyTimeout)
	if (length(cl) < length(X)) cl <- replicate(ceiling(length(X)/length(cl)),
												cl)
	if (length(cl) > length(X)) cl <- cl %>% extract(1:length(X))
	mapply(doInStata, id=cl, code=X, future=TRUE,
		   SIMPLIFY=FALSE, MoreArgs=args) %>%
		lapply(getStataFuture)
}
alekrutkowski/RStataLink documentation built on March 22, 2023, 2:18 a.m.