Nothing
#' Partition as set of tables into a list.
#'
#' Partition a set of tables into a list of sets of tables. Note: removes rownames.
#'
#' @param tables_used character, names of tables to look for.
#' @param partition_column character, name of column to partition by (tables should not have NAs in this column).
#' @param ... force later arguments to bind by name.
#' @param source_usage optional named map from tables_used names to sets of columns used.
#' @param source_limit optional numeric scalar limit on rows wanted every source.
#' @param tables named map from tables_used names to data.frames.
#' @param env environment to also look for tables named by tables_used
#' @return list of names maps of data.frames partitioned by partition_column.
#'
#' @seealso \code{\link{execute_parallel}}
#'
#' @examples
#'
#' d1 <- data.frame(a = 1:5, g = c(1, 1, 2, 2, 2))
#' d2 <- data.frame(x = 1:3, g = 1:3)
#' d3 <- data.frame(y = 1)
#' partition_tables(c("d1", "d2", "d3"), "g", tables = list(d1 = d1, d2 = d2, d3 = d3))
#'
#' @export
#'
partition_tables <- function(tables_used,
partition_column,
...,
source_usage = NULL,
source_limit = NULL,
tables = NULL,
env = NULL) {
wrapr::stop_if_dot_args(substitute(list(...)), "rqdatatable::partition_tables")
if((!is.character(partition_column)) || (length(partition_column)!=1)) {
stop("rqdatatable::partition_tables partition column must be a single string")
}
# make sure all needed tables are in the ntables list
ntables <- list()
for(ni in tables_used) {
if(!is.null(tables)) {
ti <- tables[[ni]]
}
if(is.null(ti) && (!is.null(env))) {
ti <- get(ni, envir = env) # should throw if not found
}
if(is.null(ti)) {
stop(paste("rqdatatable::partition_tables could not find table:", ni))
}
# front-load some error checking
if(!is.data.frame(ti)) {
stop(paste("rqdatatable::partition_tables all arguments must resolve to data.frames",
ni, paste(class(ti), collapse = " ")))
}
nti <- colnames(ti)
if(!is.null(source_usage)) {
nsi <- source_usage[[ni]]
missing <- setdiff(nsi, nti)
if(length(missing)>0) {
stop(paste("rqdatatable::ex_data_table_parallel missing required columns",
ni, paste(missing, collapse = " ")))
}
if((partition_column %in% nti) && (!(partition_column %in% nsi))) {
# preserve partition column
nsi <- c(nsi, partition_column)
}
} else {
nsi <- nti
}
if(is.null(source_limit) || (nrow(ti)>source_limit)) {
ti <- ti[ , nsi, drop = FALSE]
} else {
ti <- ti[seq_len(source_limit), nsi, drop = FALSE]
}
rownames(ti) <- NULL
ntables[[ni]] <- ti
}
ti <- NULL
env <- NULL
tables <- NULL
# get a list of values of the partition column
levels <- character(0)
for(ni in names(ntables)) {
ti <- ntables[[ni]]
if(partition_column %in% colnames(ti)) {
if(any(is.na(ti[[partition_column]]))) {
stop(paste("rqdatatable::ex_data_table_parallel NAs in partation_column for table ",
ni))
}
levels <- unique(c(levels, as.character(ti[[partition_column]])))
}
}
if(length(levels)<=0) {
stop(paste("rqdatatable::ex_data_table_parallel no values found for partition column", partition_column))
}
split_tabs <- lapply(
names(ntables),
function(ni) {
ti <- ntables[[ni]]
if(partition_column %in% colnames(ti)) {
split(ti,
factor(as.character(ti[[partition_column]]), levels = levels),
drop = FALSE)
} else {
NULL
}
})
names(split_tabs) <- names(ntables)
# build a list of tablesets
tablesets <- lapply(levels,
function(li) {
nti <- ntables
for(ni in names(ntables)) {
ti <- ntables[[ni]]
if(partition_column %in% colnames(ti)) {
ti <- split_tabs[[ni]][[li]]
}
rownames(ti) <- NULL
nti[[ni]] <- ti
}
nti
})
names(tablesets) <- levels
tablesets
}
#' Execute f in parallel partitioned by partition_column.
#'
#' Execute f in parallel partitioned by \code{partition_column}, see
#' \code{\link{partition_tables}} for details.
#'
#' @param tables named map of tables to use.
#' @param partition_column character name of column to partition on
#' @param f function to apply to each tableset signature is function takes a single argument that is a named list of data.frames.
#' @param ... force later arguments to bind by name.
#' @param cl parallel cluster.
#' @param debug logical if TRUE use lapply instead of parallel::clusterApplyLB.
#' @param env environment to look for values in.
#' @return list of f evaluations.
#'
#' @seealso \code{\link{partition_tables}}
#'
#' @examples
#'
#' if(requireNamespace("parallel", quietly = TRUE)) {
#' cl <- parallel::makeCluster(2)
#'
#' d <- data.frame(x = 1:5, g = c(1, 1, 2, 2 ,2))
#' f <- function(dl) {
#' d <- dl$d
#' d$s <- sqrt(d$x)
#' d
#' }
#' r <- execute_parallel(list(d = d), f,
#' partition_column = "g",
#' cl = cl) %.>%
#' do.call(rbind, .) %.>%
#' print(.)
#'
#' parallel::stopCluster(cl)
#' }
#'
#' @export
#'
execute_parallel <- function(tables,
f,
partition_column,
...,
cl = NULL,
debug = FALSE,
env = parent.frame()) {
force(env)
tablesets <- partition_tables(names(tables),
partition_column = partition_column,
tables = tables,
env = env)
if(debug || (!requireNamespace("parallel", quietly = TRUE))) {
res <- lapply(tablesets, f)
} else {
# dispatch the operation in parallel
res <- parallel::clusterApplyLB(cl, tablesets, f)
}
names(res) <- names(tablesets)
res
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.