R/cfuns.R

# Source file iterator for c.source
source.apply <- function(flist) {
	lapply(flist,source)
}

# Source file iterator for c.sourceCpp
sourceCpp.apply <- function(flist) {
	lapply(flist,Rcpp::sourceCpp)
}

#' Portable cluster style call function
#'
#' Wrapper function for clusterCall, Rhpc_worker_call and a FORK implementation of
#' these functions. Note that this is not at all the same as base::call().
#' @param FUN a function object
#' @param ... other arguments, possibly named, to be passed to FUN
#' @export c.call
c.call <- function(FUN, ...) {

	if (get("use.cluster", envir = cluster)) {
		cl.call <- get("cluster.call", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.call(cl, FUN, ...)
	}else {
		ncall <- list(call("FUN", c(...)))
		ncall <- rep(ncall, times = get("cores", envir = cluster))
		ncall <- do.call(list, ncall)

		c.lapply(ncall, eval)
	}

}

#' Set options on all worker nodes
#'
#' Drop-in replacement for options() function, but called on all cluster nodes and master if cluster is used
#' @param ... any options can be defined, using 'name = value'. However, only the ones listed in ?options are used in base R. 
#' Options can also be passed by giving a single unnamed argument which is a named list.
#' @export c.options
c.options <- function(...) {

	if (get("use.cluster", envir = cluster)) {
		cl.call <- get("cluster.call", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		out <- cl.call(cl, options, ...)
		out[[length(out) + 1]] <- options(...)
		out
	}else {
		options(...)
	}

}

#' Get options on all worker nodes
#'
#' Drop-in replacement for getOption() function, but called on all cluster nodes and master if cluster is used
#' @param x a character string holding an option name
#' @param default if the specified option is not set in the options list, this value is returned. This facilitates retrieving an option and checking whether it is set and setting it separately if not.
#' @export c.getOption
c.getOption <- function(x, default = NULL) {

	if (get("use.cluster", envir = cluster)) {
		cl.call <- get("cluster.call", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		out <- cl.call(cl, getOptions, x = x, default = default)
		out[[length(out) + 1]] <- getOptions(x = x, default = default)
		out

		cl.call(cl, options, ...)
	}else {
		getOption(x = x, default = default)
	}

}

#' Source function for cluster parallel
#'
#' Wrapper function for c.call(source, FILE).
#' @param ... string objects containing paths to files to source. 
#' @param push logical of whether or not to push the export list, defaults to TRUE
#' @param clear logical of whether or not to erase the export list and add the included options to it, defaults to FALSE
#' @param on.main logical set to TRUE to source to the main process in a cluster, defaults to TRUE, no effect for FORK
#' @param on.worker logical set to TRUE to source to the Worker process in a cluster, defaults to TRUE, no effect for FORK
#' @export c.source
#' @examples
#' FILE <- "/home/user/useful.R"
#' c.source(FILE, push=FALSE)                    # adds object 'vec' to list of things to export, but doesn't export it
#' c.source("/home/user/useful.R", push=FALSE)   # adds object 'vec' to list of things to export, but doesn't export it
#' c.source(push=T)                              # export all objects in the to.export list
#' c.source("vec")                               # add "vec" to the to.export list, then export all items in list
c.source <- function(..., push=TRUE, clear=FALSE, on.main=TRUE, on.worker=TRUE) {

	obj <- list(...)
	obj <- unlist(obj)

	to.source <- get("source.list", envir = cluster)

	if (clear == TRUE) {
		to.source <- list()
	}

	if (length(obj) > 0) {
		to.source <- append(obj, to.source)
		assign("source.list", to.source, envir = cluster)
	}

	if (push == TRUE) {

		if (get("use.cluster", envir = cluster)) {

			if (on.main) source.apply(to.source)

			if (on.worker) {

				cl.call <- get("cluster.call", envir = cluster)
				cl <- get("cluster.object", envir = cluster)
				cl.call(cl, source.apply, to.source )
			
			}

		}else{

			source.apply(to.source)

		}
		# Flush the list after all its elements have been sourced
		to.source <- list()
		assign("source.list", to.source, envir = cluster)

	}

}

#' SourceCPP function for cluster parallel
#'
#' Wrapper function for c.call(sourceCpp, FILE).
#' @param ... string objects containing paths to files to source. 
#' @param push logical of whether or not to push the export list, defaults to TRUE
#' @param clear logical of whether or not to erase the export list and add the included options to it, defaults to FALSE
#' @param on.main logical set to TRUE to source to the main process in a cluster, defaults to TRUE, no effect for FORK
#' @param on.worker logical set to TRUE to source to the Worker process in a cluster, defaults to TRUE, no effect for FORK
#' @export c.sourceCpp
#' @examples
#' FILE <- "/home/user/useful.cpp"
#' c.sourceCpp(FILE, push=FALSE)                    # adds files in FILE to list of things to export, but doesn't export it
#' c.sourceCpp("/home/user/useful.cpp", push=FALSE) # adds files in FILE to list of things to export, but doesn't export it
#' c.sourceCpp(push=T)                              # source all files in the in the 'sourceCpp' list
#' c.sourceCpp("vec")                               # add "vec" to the to.export list, then export all items in list
c.sourceCpp <- function(..., push=TRUE, clear=FALSE, on.main=TRUE, on.worker=TRUE) {

	obj <- list(...)
	obj <- unlist(obj)

	to.sourceCpp <- get("sourceCpp.list", envir = cluster)

	if (clear == TRUE) {
		to.sourceCpp <- list()
	}

	if (length(obj) > 0) {
		to.sourceCpp <- append(obj, to.sourceCpp)
		assign("sourceCpp.list", to.sourceCpp, envir = cluster)
	}

	if (push == TRUE) {

		if (get("use.cluster", envir = cluster)) {

			if (on.main) sourceCpp.apply(to.sourceCpp)

			if (on.worker) {

				cl.call <- get("cluster.call", envir = cluster)
				cl <- get("cluster.object", envir = cluster)
				cl.call(cl, sourceCpp.apply, to.sourceCpp )
			
			}

		}else{

			sourceCpp.apply(to.sourceCpp)

		}
		# Flush the list after all its elements have been sourced
		to.sourceCpp <- list()
		assign("sourceCpp.list", to.sourceCpp, envir = cluster)

	}

}
bamonroe/ctools documentation built on May 11, 2019, 6:19 p.m.