Nothing
#' Generate a full DQ report, v2
#'
#' @param study_data [data.frame] the data frame that contains the measurements
#' @param meta_data [data.frame] the data frame that contains metadata
#' attributes of study data
#' @param label_col [variable attribute] the name of the column in the metadata
#' with labels of variables
#' @param cores [integer] number of cpu cores to use or a named list with
#' arguments for [parallelMap::parallelStart] or NULL,
#' if parallel has already been started by the caller.
#' Can also be a cluster.
#' @param debug_parallel [logical] print blocks currently evaluated in parallel
#' @param meta_data_segment [data.frame] -- optional: Segment level metadata
#' @param meta_data_dataframe [data.frame] -- optional: Data frame level
#' metadata
#' @param all_calls [list] a list of calls
#' @param meta_data_cross_item [data.frame] -- optional: cross-item level
#' metadata
#' @param resp_vars [variable list] the name of the measurement variables
#' for the report.
#' @param filter_result_slots [character] regular expressions, only
#' if an indicator function's
#' result's name
#' matches one of these, it'll
#' be used for the report. If
#' of length zero, no filtering
#' is performed.
#' @inheritParams dq_report2
#'
#' @return a [dataquieR_resultset2]. Can be printed creating a RMarkdown-report.
#'
#' @family reporting_functions
#' @concept process
#' @keywords internal
util_evaluate_calls <-
function(all_calls,
study_data,
meta_data,
label_col,
meta_data_segment,
meta_data_dataframe,
meta_data_cross_item,
resp_vars,
filter_result_slots,
cores,
debug_parallel,
mode = c("default", "futures", "queue", "parallel"),
mode_args) {
if (length(mode_args) > 0) {
if (!is.list(mode_args) || is.null(names(mode_args)) ||
any(util_empty(names(mode_args)))) {
util_message("%s needs to be a named list",
sQuote("mode_args"))
mode_args <- list()
}
} else {
mode_args <- list()
}
mode <- util_match_arg(mode)
invisible(force(parallelMap::parallelGetOptions()$settings$mode)) # ensure, that this package is loaded here: it sets onLoad some options that
# we want to overwrite below. doing so w/o having th package loaded will
# overwrite the options, if parallelMap is acutally loaded after the option
# setting code below.
# maybe also https://cran.r-project.org/web/packages/parabar/readme/README.html
# TODO: add all the objects, that Square2 writes for being full compatible
r <- NULL
util_ensure_suggested("parallel")
util_setup_rstudio_job(
"Computing dq_report2, parallel computation")
progress_msg("Cluster setup", "initializing parallel mode, if applicable")
if (mode == "queue") {
if ((eval.parent(call("missing", as.symbol("cores"))) &&
identical(cores,
eval.parent(formals(rlang::caller_fn())$cores))) ||
missing(cores)) {
cores <- util_detect_cores()
}
if (length(cores) != 1 ||
!util_is_integer(cores) ||
is.na(cores) ||
cores > util_detect_cores()) {
cores <- util_detect_cores()
util_message(c("For mode %s, %s can only be an integer(1) <= %d,",
"it to its maximum"),
dQuote(mode),
sQuote("cores"),
cores)
}
q <- util_queue_cluster_setup(n_nodes = cores,
progress = progress,
debug_parallel = debug_parallel)
} else {
q <- NULL
}
if (length(all_calls) > 0) {
if (is.null(q)) {
if (!is.null(cores)) {
if (inherits(cores, "cluster")) {
old_def_cl <- parallel::getDefaultCluster()
parallel::setDefaultCluster(cores)
on.exit(parallel::setDefaultCluster(old_def_cl), add = TRUE)
old_o_def_cl <- options(
parallelMap.cpus = length(parallel::getDefaultCluster()),
parallelMap.load.balancing = TRUE,
parallelMap.mode = "socket"
)
on.exit(options(old_o_def_cl), add = TRUE)
} else if (inherits(cores, "list")) {
suppressMessages(do.call(parallelMap::parallelStart, cores))
on.exit(suppressMessages(parallelMap::parallelStop()), add = TRUE)
} else {
suppressMessages(parallelMap::parallelStart("socket", cpus = cores,
logging = FALSE,
load.balancing = TRUE))
on.exit(suppressMessages(parallelMap::parallelStop()), add = TRUE)
}
cores <- NULL
} else if
(getOption("parallelMap.mode") != "BatchJobs" &&
getOption("parallelMap.mode") != "batchtools" &&
!is.null(parallel::getDefaultCluster())) {
old_o_def_cl <- options(
parallelMap.cpus = length(parallel::getDefaultCluster()),
parallelMap.load.balancing = TRUE,
parallelMap.mode = "socket"
)
on.exit(options(old_o_def_cl), add = TRUE)
}
oldO <- options(parallelMap.show.info = FALSE)
on.exit(options(oldO), add = TRUE)
parlib <- parallelMap::parallelLibrary
parexp <- parallelMap::parallelExport
par_eval_q <- function(expr) {
do.call(
parallel::clusterEvalQ,
list(cl = NULL, expr)
)
}
} else {
parlib <- function(lib) {
q$workerEval(function(lib) {
require(lib, character.only = TRUE)
}, list(lib = lib))
}
parexp <- function(...) {
q$export(...)
}
par_eval_q <- function(expr) { # may not work as expected
q$workerEval(function(expr) {
eval(expr)
}, list(expr = substitute(expr)))
}
}
progress_msg("Cluster setup: initializing parallel mode, if applicable", "loading library")
parlib(utils::packageName())
progress_msg("Cluster setup: initializing parallel mode, if applicable",
"consolidating data types...")
## Adjust data type and compute int_data_type_matrix before
int_datatype_matrix_res <- int_datatype_matrix(
study_data = study_data,
meta_data = meta_data,
label_col = label_col
)
study_data <- util_adjust_data_type(
study_data = study_data,
meta_data = meta_data,
relevant_vars_for_warnings = NULL)
progress_msg("Cluster setup: initializing parallel mode, if applicable", "exporting data")
suppressWarnings(parexp("study_data", "meta_data", "label_col", "meta_data_segment", "meta_data_dataframe", "meta_data_cross_item"))
.options <- options() #options to be copied to the children (child process)
.options <- .options[startsWith(names(.options), "dataquieR.")] #only dataquieR options selected
progress_msg("Cluster setup: initializing parallel mode, if applicable", "exporting options")
suppressWarnings(parexp(".options"))
progress_msg("Cluster setup: initializing parallel mode, if applicable", "exporting data frame cache")
suppressWarnings(parexp(".dataframe_environment"))
progress_msg("Cluster setup: initializing parallel mode, if applicable", "exporting other caches")
if (getOption("dataquieR.precomputeStudyData", default = FALSE)) {
cache_as_list <- as.list(.cache[[".cache"]])
study_data_cache <- as.list(.study_data_cache)
} else {
cache_as_list <- list()
study_data_cache <- list()
}
suppressWarnings(parexp("cache_as_list"))
suppressWarnings(parexp("study_data_cache"))
progress_msg("Cluster setup: initializing parallel mode, if applicable", "finalizing setup of compute nodes")
if (!is.null(q) || !is.null(parallel::getDefaultCluster())) {
par_eval_q(options(.options))
par_eval_q(dataquieR::prep_add_data_frames(
data_frame_list = as.list(.dataframe_environment)))
par_eval_q({
assign(
x = ".cache",
envir = get(".cache", envir = asNamespace("dataquieR")),
value = as.environment(cache_as_list))
list2env(study_data_cache, get(".study_data_cache",
envir = asNamespace("dataquieR")))
})
u8 <- par_eval_q(l10n_info()[["UTF-8"]])
u8 <- vapply(u8, identity, FUN.VALUE = logical(1))
if (!all(u8)) {
util_warning(c("%d of the %d cluster nodes do not support",
"UTF-8, this may cause trouble with the encoding.",
"For Windows nodes, you should use R > 4.2.0",
"on all nodes. Also, all nodes should use a UTF-8",
"character set by default (see Sys.setlocale())"),
sum(!u8), length(u8))
}
} else {
dataquieR::prep_add_data_frames(
data_frame_list = as.list(.dataframe_environment))
assign(
x = ".cache",
envir = get(".cache", envir = asNamespace("dataquieR")),
value = as.environment(cache_as_list))
}
# parallelMap::parallelLapply(1:10, function(x) {dataquieR::prep_list_dataframes()})
# this exports the static data to the cluster, this is always done, even if
# already avail. functions don't touch the exported data.
currentCpus <- parallelMap::parallelGetOptions()$settings$cpus
worker <- util_eval_to_dataquieR_result
formals(worker)$filter_result_slots = filter_result_slots
force(formals(worker)$filter_result_slots)
if (parallelMap::parallelGetOptions()$settings$mode == "local") {
environment(worker) <- new.env(parent = asNamespace(utils::packageName()))
} else if (parallelMap::parallelGetOptions()$settings$mode == "multicore") {
environment(worker) <- environment()
} else {
environment(worker) <- asNamespace(utils::packageName())
}
progress_msg("Computation", "computing report")
n_nodes <- max(1, as.integer(currentCpus[[1]]), na.rm = TRUE)
if (mode == "futures") { # have/use futures
r <- util_parallel_futures(
all_calls = all_calls,
n_nodes = n_nodes,
progress = progress,
worker = worker,
debug_parallel = debug_parallel
)
} else if (mode == "queue") {
step <- 15
if ("step" %in% names(mode_args)) {
step <- mode_args[["step"]]
if (length(step) != 1 || !is.numeric(step) ||
!is.vector(step) || !is.finite(step) || !util_is_integer(step) ||
step <= 0 || step > 10000) {
util_message(
c("%s needs to be a positive scalar integer value <= %d, falling",
"back to default %d"),
dQuote("step"),
10000,
15)
step <- 15
}
}
r <- q$compute_report(all_calls = all_calls,
worker = worker,
step = step)
} else {
r <- util_parallel_classic(
all_calls = all_calls,
n_nodes = n_nodes,
progress = progress,
worker = worker,
debug_parallel = debug_parallel
)
}
util_message(
sprintf("%s [%s], %s", Sys.time(), "INFO",
"DQ -- done")
) # TODO: Use RStudio job if available
}
progress_msg("Computation", "finalizing report")
function_names <- vapply(lapply(all_calls, `[[`, 1), as.character,
FUN.VALUE = character(1))
r[] <- mapply(function_names, all_calls, r, attr(all_calls, "cn"),
SIMPLIFY = FALSE,
FUN = function(nm, cl, r, cn) {
# added to fix bug in unexpected crashing single jobs.
if (is.null(r)) {
r <-
util_eval_to_dataquieR_result(
util_error(paste("No result available for unkown reasons",
"(out of memory? try to reduce the number",
"of parallel running jobs using the",
"`cores` argument)")),
filter_result_slots = ".*")
}
attr(r, "function_name") <- nm
attr(r, "cn") <- cn
attr(r, "call") <- cl
r
})
# overwrite the result for int_datatype_matrix_res
if (!is.null(r)) {
idtm_variable_labels <-
vapply(lapply(r[startsWith(names(r), "int_datatype_matrix.")],
attr, "call"), attr, "entity_name",
FUN.VALUE = character(1))
r[startsWith(names(r), "int_datatype_matrix.")] <-
lapply(idtm_variable_labels, function(lab) {
res <- r[[paste0("int_datatype_matrix.", lab)]]
res$SummaryTable <-
int_datatype_matrix_res$SummaryTable[
int_datatype_matrix_res$SummaryTable$Variables == lab,
, FALSE
]
res$SummaryData <-
int_datatype_matrix_res$SummaryData[
int_datatype_matrix_res$SummaryData$Variables == lab,
, FALSE
]
res$ReportSummaryTable <-
int_datatype_matrix_res$ReportSummaryTable[
int_datatype_matrix_res$ReportSummaryTable$Variables == lab,
, FALSE
]
res
})
}
# overwrite the result for int_datatype_matrix_res / DONE
function_names <- setNames(nm = function_names)
aliases <- gsub("\\..*$", "", names(r))
matrix_list <- lapply(setNames(nm = setdiff(aliases,
attr(all_calls,
"multivariatcol"))),
function(alias) {
lapply(setNames(nm = resp_vars),
function(vn) {
if (is.call(
all_calls[[paste0(alias, ".", vn)]])) {
rlang::call_args(
all_calls[[paste0(alias, ".", vn)]]
)
} else {
setNames(list(), nm = character(0))
}
})
})
function_names <- function_names[!duplicated(aliases)]
aliases <- aliases[!duplicated(aliases)]
descriptions = .manual$descriptions[function_names]
if (length(descriptions) == 0) {
descriptions <- rep(NA_character_, length(function_names))
}
descriptions[vapply(descriptions, is.null, FUN.VALUE = logical(1))] <-
NA_character_
descriptions = unlist(descriptions, recursive = FALSE)
function_alias_map <- data.frame(
fk_report = rep(NA_integer_, length(aliases)),
alias = aliases,
acronym = util_abbreviate(aliases),
description = descriptions,
fk_function = rep(NA_integer_, length(aliases)),
# TODO: check if .manual actually exists
function_description = descriptions,
name = function_names,
stringsAsFactors = FALSE)
attr(matrix_list, "function_alias_map") <- function_alias_map
fn <- unique(function_names)
function2category <- setNames(c(
des = "Descriptors",
int = "Integrity",
com = "Completeness",
con = "Consistency",
acc = "Accuracy"
)[gsub(
"_.*$",
"",
fn)],
nm = fn)
attr(matrix_list, "function2category") <- function2category
dim_ranks <- setNames(c(
int = 1,
com = 2,
con = 3,
acc = 4,
des = 9
)[gsub(
"_.*$",
"",
aliases)],
nm = aliases)
drr <- rank(dim_ranks)
fnr <- rank(function_names)
alr <- rank(aliases)
base <- max(c(drr, fnr, alr, 10), na.rm = TRUE)
attr(matrix_list, "col_indices") <- setNames(
(drr * base * base + fnr * base + alr) * 10,
nm = aliases)
vo <- as.numeric(meta_data[[VARIABLE_ORDER]])
if (0 == length(vo)) {
vo <- seq_len(nrow(meta_data)) * 10
}
offset <- max(vo, na.rm = TRUE) + 1
if (is.infinite(offset))
offset <- 1
vo[util_empty(vo)] <- offset + seq_len(sum(util_empty(vo)))
attr(matrix_list, "row_indices") <- setNames(vo,
nm = meta_data[[label_col]])
if (!length(r)) {
r <- list()
}
attr(r, "matrix_list") <- matrix_list
attr(r, "label_col") <- label_col
attr(r, "meta_data") <- meta_data # one discapency from SQ2 reults, here: We have *always* v2.0 metadata, here. Will not yet write a back-converter, as long as this is not really needed.
attr(r, "meta_data_segment") <- meta_data_segment
attr(r, "meta_data_dataframe") <- meta_data_dataframe
attr(r, "meta_data_cross_item") <- meta_data_cross_item
# Include all tables referred to by the standard metadata ----
meta_data_frames <-
grep("^meta_data", names(attributes(r)), value = TRUE)
refs <- unname(unlist(lapply(meta_data_frames,
function(mdf_name) {
mdf <- attr(r, mdf_name)
cls <- grep("_TABLE$", colnames(mdf), value = TRUE)
if (length(cls)) {
unlist(mdf[, cls], recursive = TRUE)
} else {
character(0)
}
}), recursive = FALSE))
refs <- refs[!util_empty(refs)]
refs <- gsub(sprintf("\\s*\\%s\\s*", SPLIT_CHAR),
SPLIT_CHAR, refs)
refs <- gsub(sprintf("(\\%s.*?)\\%s.*$", SPLIT_CHAR, SPLIT_CHAR),
"\\1", refs)
refs <- unique(sort(refs))
refs <- lapply(setNames(nm = refs), function(dfn) {
r <- NULL
try(
r <- prep_get_data_frame(dfn),
silent = TRUE
)
r
})
attr(r, "referred_tables") <- refs
# add information on the dimension names ----
attr(r, "study_data_dimnames") <- dimnames(study_data)
attr(r, "cn") <- attr(all_calls, "cn")
attr(r, "rn") <- attr(all_calls, "rn")
# class is to be compatible with Square2 -----
class(r) <- union(dataquieR_resultset_class2,
"square_results")
progress_msg("Computation", "finished")
# Return report ----
r
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.