R/capply.R

# Apply family of functions for ctools

#' Portable apply function
#'
#' Wrapper function for clusterApply(), Rhpc_lapply(), and an implementation of 'mcapply'.
#' @param X an object with dim > 1, each element of which is individually passed to FUN
#' @param MARGIN either 1 for row and 2 for column.
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.apply
#' @examples
#' mat <- matrix(1:12, 3, 4)
#' c.apply(mat, 1, sum)	# a parallel implementation of rowSums
#'                      # Note that the serial version is likely faster for this function
c.apply <- function(X, MARGIN, FUN, ...) {

	if (get("use.cluster", envir = cluster)) {
		cl.apply <- get("cluster.apply", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.apply(cl, X, MARGIN, FUN, ...)
	} else if (get("use.fork", envir = cluster)) {
		# Cpp function to turn X into a list with each element either a row or column
		# of X depending on MARGIN
		X <- array2list(X,MARGIN)
		res <- parallel::mclapply(X, FUN, ..., mc.cores = get("cores", envir = cluster))

		if (MARGIN == 1) {
			res <- do.call(cbind, res)
		} else{
			res <- do.call(rbind, res)
		}
		if (dim(res)[MARGIN] == 1) c(res)
		else res

	} else {
		apply(X = X, MARGIN = MARGIN, FUN = FUN, ...)
	}

}

#' Portable apply function with load balancing
#'
#' Wrapper function for clusterApplyLB(), Rhpc_apply(), and an implementation of 'mcapply' with load balancing.
#' Note that for MPI there is no load balancing.
#' @param X an object with dim > 1, each element of which is individually passed to FUN
#' @param MARGIN either 1 for row and 2 for collumn.
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.applyLB
#' @examples
#' mat <- matrix(1:12, 3, 4)
#' c.applyLB(mat, 1, sum)	# a parallel implementation of rowSums
#'                        # Note that the serial version is likely faster for this function
c.applyLB <- function(X, MARGIN, FUN, ...) {

	if (get("use.cluster", envir = cluster)) {
		cl.applyLB <- get("cluster.applyLB", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.applyLB(cl, X, MARGIN, FUN, ...)
	} else if (get("use.fork", envir = cluster)) {
		# Cpp function to turn X into a list with each element either a row or column
		# of X depending on MARGIN
		Y <- array2list(X,MARGIN)
		rm(X)
		res <- parallel::mclapply(Y, FUN, ..., mc.preschedule = FALSE, mc.cores = get("cores", envir = cluster))

		if (MARGIN == 1) {
			res <- do.call(cbind, res)
		} else{
			res <- do.call(rbind, res)
		}
		if (dim(res)[MARGIN] == 1) c(res)
		else res

	} else {
		apply(X = X, MARGIN = MARGIN, FUN = FUN, ...)
	}

}

#' Portable lapply function
#'
#' Wrapper function for parLapply(), mclapply(), and Rhpc_lapply().
#' @param X a list object, each element of which is individually passed to FUN
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.lapply
#' @examples
#' vec <- 1:10
#' c.lapply(vec, function(x) { x + 1})	# adds 1 to every element of vec, returns list
c.lapply <- function(X, FUN, ...) {

	if (get("use.cluster", envir = cluster)) {
		cl.lapply <- get("cluster.lapply", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.lapply(cl, X, FUN, ...)
	} else if (get("use.fork", envir = cluster)) {
		parallel::mclapply(X = X, FUN = FUN, ..., mc.cores = get("cores", envir = cluster))
	} else {
		lapply(X = X, FUN = FUN, ...)
	}
}

#' Portable lapply function with load balancing
#'
#' Wrapper function for parLapplyLB(), mclapply(mc.preschedule = FALSE), and Rhpc_lapplyLB().
#' @param X a list object, each element of which is individually passed to FUN
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.lapplyLB
#' @examples
#' vec <- 1:10
#' c.lapply(vec, function(x) { x + 1})	# adds 1 to every element of vec, returns list
c.lapplyLB <- function(X, FUN, ...) {

	if (get("use.cluster", envir = cluster)) {
		cl.lapplyLB <- get("cluster.lapplyLB", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.lapplyLB(cl, X, FUN, ...)
	} else if (get("use.fork", envir = cluster)) {
		parallel::mclapply(X = X, FUN = FUN, ..., mc.cores = get("cores", envir = cluster), mc.preschedule = FALSE)
	} else {
		lapply(X = X, FUN = FUN, ...)
	}
}


#' Portable sapply function
#'
#' Wrapper function for parSapply(), and Rhpc_sapply(), and an implementaction of mcsapply.
#' @param X a vector (or atomic list) object, each element of which is individually passed to FUN
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.sapply
#' @examples
#' vec <- 1:10
#' c.lapply(vec, function(x) { x + 1})	# adds 1 to every element of vec, returns list
c.sapply <- function(X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE) {

	if (get("use.cluster", envir = cluster)) {
		cl.sapply <- get("cluster.lapplyLB", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.sapply(cl, X, FUN, ..., simplify = simplify, USE.NAMES = USE.NAMES)
	} else if (get("use.fork", envir = cluster)) {
		res <- parallel::mclapply(X = X, FUN = FUN, ...)
		if (USE.NAMES && is.character(X) && is.null(names(res))) {
			names(res) <- X
		}
		if (!identical(simplify, FALSE) && length(res)) {
			simplify2array(res, higher = (simplify == "array"))
		}
		else res
	}
	else{
		sapply(X = X, FUN = FUN, ... , simplify = simplify, USE.NAMES = USE.NAMES)
	
	}
}

#' Portable sapply function with load balancing
#'
#' Wrapper function for parSapplyLB(), and Rhpc_sapplyLB(), and an implementaction of mcsapply with load balancing.
#' @param X a vector (or atomic list) object, each element of which is individually passed to FUN
#' @param FUN a function object, takes X as first argument
#' @param ... other arguments, possibly named, to be passed to every iteration of FUN
#' @export c.sapplyLB
#' @examples
#' vec <- 1:10
#' c.lapply(vec, function(x) { x + 1})	# adds 1 to every element of vec, returns list
c.sapplyLB <- function(X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE) {

	if (get("use.cluster", envir = cluster)) {
		cl.sapplyLB <- get("cluster.lapplyLB", envir = cluster)
		cl <- get("cluster.object", envir = cluster)
		cl.sapplyLB(cl, X, FUN, ..., simplify = simplify, USE.NAMES = USE.NAMES)
	} else if (get("use.fork", envir = cluster)) {
		res <- parallel::mclapply(X = X, FUN = FUN, ..., mc.cores = get("cores", envir = cluster), mc.preschedule = FALSE)
		if (USE.NAMES && is.character(X) && is.null(names(res))) {
			names(res) <- X
		}
		if (!identical(simplify, FALSE) && length(res)) {
			simplify2array(res, higher = (simplify == "array"))
		}
		else res
	}
	else{
		sapply(X = X, FUN = FUN, ... , simplify = simplify, USE.NAMES = USE.NAMES)
	}

}
bamonroe/ctools documentation built on May 11, 2019, 6:19 p.m.