Nothing
#' R6 class representing Cohort object.
#'
#' Cohort object is designed to make operations on source data possible.
#' @param source Source object created with \link{set_source}.
#' @param run_flow If `TRUE`, data flow is run after the operation is completed.
#' @param hook List of hooks describing methods before/after the Cohort is created.
#' See \link{hooks} for more details.
Cohort <- R6::R6Class(
"Cohort",
public = list(
#' @description
#' Create Cohort object.
#' @param ... Steps definition (optional). Can be also defined as a sequence of
#' filters - the filters will be added to the first step.
#' @return The object of class `Cohort`.
initialize = function(source, ..., run_flow = FALSE,
hook = list(
pre = get_hook("pre_cohort_hook"),
post = get_hook("post_cohort_hook")
)) {
run_hooks(hook$pre, self, private)
if (!missing(source)) {
private$init_source(source, ...)
run_hooks(hook$post, self, private)
if (run_flow) {
self$run_flow()
}
}
},
#' @description
#' Add Source to Cohort object.
add_source = function(source) {
private$init_source(source)
},
#' @description
#' Update Source in the Cohort object.
#' @param keep_steps If `TRUE`, steps definition remains unchanged when updating source.
#' If `FALSE` steps configuration is deleted.
#' If vector of type integer, specified steps will remain.
update_source = function(source, keep_steps = !has_steps(source), run_flow = FALSE,
hook = list(
pre = get_hook("pre_update_source_hook"),
post = get_hook("post_update_source_hook")
)) {
run_hooks(hook$pre, self, private, keep_steps)
private$data_objects <- list()
private$source <- NULL
private$cache <- NULL
if (identical(keep_steps, FALSE)) {
private$steps <- list()
private$init_source(source)
} else {
state <- self$get_state(json = FALSE)
if (!isTRUE(keep_steps)) {
state <- state[as.integer(keep_steps)]
}
steps <- list()
for (step_state in state) {
steps[[step_state$step]] <- do.call(
step,
step_state$filters %>% purrr::map(~do.call(filter, .))
)
}
do.call(private$init_source, append(list(source = source), steps))
}
run_hooks(hook$post, self, private, keep_steps)
if (run_flow) {
self$run_flow()
}
},
#' @description
#' Return Source object attached to Cohort.
get_source = function() {
private$source
},
#' @description
#' Add filtering step definition
#' @param step Step definition created with \link{step}.
add_step = function(step, run_flow = FALSE,
hook = list(
pre = get_hook("pre_add_step_hook"),
post = get_hook("post_add_step_hook")
)) {
new_step_id <- as.character(as.integer(self$last_step_id()) + 1)
run_hooks(hook$pre, self, private, new_step_id)
private$steps[new_step_id] <- step %>%
attach_step_id(new_step_id) %>%
list() %>%
purrr::map(eval_step_filters, source = private$source)
names(private$steps[new_step_id]) <- new_step_id
run_hooks(hook$post, self, private, new_step_id)
if (run_flow) {
self$run_flow(min_step = new_step_id)
}
},
#' @description
#' Copy selected step.
#' @param step_id Id of the step to be copied. If missing the last step is taken.
#' The copied step is added as the last one in the Cohort.
#' @param filters List of Source-evaluated filters to copy to new step.
copy_step = function(step_id, filters, run_flow = FALSE) {
if (missing(step_id)) {
step_id <- self$last_step_id()
}
if (!missing(filters)) {
step_id <- self$last_step_id()
step_config <- list(
step = next_step(step_id),
filters = purrr::map(filters, get_filter_state, extra_fields = NULL)
)
} else {
step_config <- self$get_state(step_id, json = FALSE)[[1]]
step_config$step <- next_step(step_id)
}
self$add_step(
do.call(
step,
step_config$filters %>% purrr::map(~do.call(filter, .))
)
)
if (run_flow) {
self$run_flow(min_step = step_config$step)
}
},
#' @description
#' Remove filtering step definition
#' @param step_id Id of the step to remove.
remove_step = function(step_id, run_flow = FALSE,
hook = list(
pre = get_hook("pre_rm_step_hook"),
post = get_hook("post_rm_step_hook")
)) {
if (missing(step_id)) {
step_id <- self$last_step_id()
}
run_hooks(hook$pre, self, private, step_id)
step_id <- as.character(step_id)
clear_data_ids <- steps_range(step_id, rev(names(private$steps))[1])
private$steps[[step_id]] <- NULL
private$cache[clear_data_ids] <- NULL
private$data_objects[clear_data_ids] <- NULL
private$steps <- adjust_names(private$steps)
private$steps <- purrr::imodify(private$steps, readjust_step)
if (!is.null(private$steps) && run_flow) {
self$run_flow(min_step = step_id)
}
run_hooks(hook$post, self, private, step_id)
},
#' @description
#' Add filter definition
#' @param filter Filter definition created with \link{filter}.
#' @param step_id Id of the step to add the filter to.
#' If missing, filter is added to the last step.
add_filter = function(filter, step_id, run_flow = FALSE) {
if (missing(step_id)) {
step_id <- self$last_step_id()
if (step_id == "0") {
step_id <- "1"
}
}
step_id <- as.character(step_id)
evaled_filter <- eval_filter(filter, step_id, private$source)
private$steps[[step_id]]$filters[[evaled_filter$id]] <- evaled_filter
private$steps[[step_id]]$id <- step_id
if (run_flow) {
self$run_flow(min_step = step_id)
}
},
#' @description
#' Remove filter definition
#' @param step_id Id of the step from which filter should be removed.
#' @param filter_id Id of the filter to be removed.
remove_filter = function(step_id, filter_id, run_flow = FALSE) {
step_id <- as.character(step_id)
filter_id <- as.character(filter_id)
private$steps[[step_id]]$filters[[filter_id]] <- NULL
if (length(private$steps[[step_id]]$filters) == 0) {
self$remove_step(step_id, run_flow)
} else {
if (run_flow) {
self$run_flow(min_step = step_id)
}
}
},
#' @description
#' Update filter definition
#' @param step_id Id of the step where filter is defined.
#' @param filter_id Id of the filter to be updated.
#' @param ... Filter parameters that should be updated.
#' @param active Mark filter as active (`TRUE`) or inactive (`FALSE`).
update_filter = function(step_id, filter_id, ..., active, run_flow = FALSE) {
step_id <- as.character(step_id)
filter_id <- as.character(filter_id)
filter_env <- environment(private$steps[[step_id]]$filters[[filter_id]]$filter_data)
new_args <- list(...)
if (any(static_params %in% names(new_args))) {
warning(glue::glue("Cannot modify filter {paste(sQuote(static_params), collapse = ', ')} parameters."))
}
params_to_change <- setdiff(names(new_args), static_params)
any_changed <- FALSE # checking if any parameter changed to optimize calculations
for (param_name in params_to_change) {
new_val <- new_args[[param_name]]
if (!identical(filter_env[[param_name]], new_val)) {
any_changed <- TRUE
filter_env[[param_name]] <- new_val
}
}
if (!missing(active)) {
filter_env[["active"]] <- active
}
if (run_flow && (!missing(active) || any_changed)) {
self$run_flow(step_id)
}
},
#' @description
#' Reset filter to its default values.
#' @param step_id Id of the step where filter is defined.
#' @param filter_id Id of the filter which should be cleared.
clear_filter = function(step_id, filter_id, run_flow = FALSE) {
step_id <- as.character(step_id)
filterd_id <- as.character(filter_id)
do.call(
self$update_filter,
append(
list(step_id = step_id, filter_id = filter_id, run_flow = run_flow),
self$get_filter(step_id, filter_id)$get_defaults(
self$get_data(step_id, collect = FALSE, state = "pre"),
self$get_cache(step_id, filter_id, state = "pre")
)
)
)
},
#' @description
#' Reset all filters included in selected step.
#' @param step_id Id of the step where filters should be cleared.
clear_step = function(step_id, run_flow = FALSE) {
step_id <- as.character(step_id)
for (filter_id in names(self$get_step(step_id)$filters)) {
self$clear_filter(step_id, filter_id, run_flow = FALSE)
}
if (run_flow) {
self$run_flow(step_id)
}
},
#' @description
#' Sum up Cohort configuration - Source, steps definition and evaluated data.
sum_up_state = function() {
list(
source = !is.null(private$source),
source_call = attr(private$source, "call"),
source_vars = names(private$source$attributes),
n_steps = length(private$steps),
steps_structure = step_filter_state(private$steps, method = names, raw = TRUE),
n_filters = step_filter_state(private$steps),
n_evaled_data = length(private$data_objects)
)
},
#' @description
#' Get Cohort configuration state.
#' @param step_id If provided, the selected step state is returned.
#' @param json If TRUE, return state in JSON format.
#' @param extra_fields Names of extra fields included in filter to be added to state.
get_state = function(step_id, json = FALSE, extra_fields = NULL) {
if (missing(step_id)) {
step_id <- names(private$steps)
}
get_filters_state <- function(filters) {
filters %>% purrr::map(get_filter_state, extra_fields = extra_fields) %>% unname()
}
filters_state <- private$steps[step_id] %>%
purrr::imap(~ list(step = .y, filters = get_filters_state(.x$filters))) %>%
unname()
if (json) {
return(jsonlite::toJSON(filters_state, auto_unbox = TRUE, na = "string", null = "null"))
}
return(filters_state)
},
#' Restore Cohort configuration.
#' @param state List or JSON string containing steps and filters configuration.
#' @param modifier Function two parameters combining the previous and provided state.
#' The returned state is then restored.
restore = function(state, modifier = function(prev_state, state) {state},
run_flow = FALSE, hook = list(
pre = get_hook("pre_restore_hook"),
post = get_hook("post_restore_hook")
)) {
self$attributes$pre_restore_state <- self$get_state(json = FALSE)
run_hooks(hook$pre, self, private, state)
if (is.null(state)) {
return(invisible(FALSE))
}
if (is.character(state)) {
state <- jsonlite::fromJSON(txt = state, simplifyVector = TRUE, simplifyMatrix = FALSE, simplifyDataFrame = FALSE)
}
state <- modifier(self$attributes$pre_restore_state, state)
private$steps <- NULL
private$cache <- NULL
private$data_objects <- private$data_objects["0"]
na_fix <- function(params) {
params %>%
purrr::modify_if(~ identical(., "NA"), ~ NA)
}
for (step_state in state) {
for (filter_state in step_state$filters) {
if (filter_state$type == "date_range") {
filter_state$range <- na_fix(filter_state$range)
filter_state$range <- as.Date(filter_state$range)
}
add_filter(
self,
do.call(filter, na_fix(filter_state)),
step_id = step_state$step
)
}
}
if (run_flow) {
self$run_flow()
}
run_hooks(hook$post, self, private, state)
},
#' @description
#' Get step related data
#' @param step_id Id of the step from which to source data.
#' @param state Return data before ("pre") or after ("post") step filtering?
#' @param collect Return raw data source (`FALSE`) object or collected (to R memory) data (`TRUE`).
get_data = function(step_id, state = "post", collect = TRUE) {
if (missing(step_id)) {
step_id <- self$last_step_id()
}
data_id <- as.character(step_id)
if (state == "pre") {
data_id <- prev_step(step_id)
}
if (collect) {
.collect_data(private$source, private$data_objects[[data_id]])
} else {
private$data_objects[[data_id]]
}
},
#' @description
#' Plot filter specific data summary.
#' @param step_id Id of the step where filter is defined.
#' @param filter_id Id of the filter for which the plot should be returned
#' @param ... Another parameters passed to filter specific method.
#' @param state Generate plot on data before ("pre") or after ("post") step filtering?
plot_data = function(step_id, filter_id, ..., state = "post") {
data_id <- as.character(step_id)
if (state == "pre") {
data_id <- prev_step(step_id)
}
private$steps[[step_id]]$filters[[filter_id]]$plot_data(
private$data_objects[[data_id]],
...
)
},
#' @description
#' Show attrition plot.
#' @param ... Source specific parameters required to generate attrition.
#' @param percent Should attrition changes be presented with percentage values.
show_attrition = function(..., percent = FALSE) {
keep_active_state <- function(step_state) {
step_state$filters <- step_state$filters %>%
purrr::keep(~.$active)
step_state
}
get_filter_meta <- function(filter_state) {
filter_state$name <- filter_state$id
filter_state$value_name <- filter_state$input_param
filter_state$value <- filter_state[[filter_state$input_param]]
return(filter_state)
}
active_states <- self$get_state(json = FALSE, extra_fields = "input_param") %>%
purrr::map(keep_active_state)
attrition_labels <- .get_attrition_label(
source = self$get_source(),
step_id = "0",
step_filters = NULL,
...
)
for (active_state in active_states) {
attrition_labels[length(attrition_labels) + 1] <- .get_attrition_label(
source = self$get_source(),
step_id = active_state$step,
step_filters = purrr::map(active_state$filters, get_filter_meta),
...
)
}
attrition_count <- .get_attrition_count(
source = self$get_source(),
data_stats = private$cache,
...
)
attrition_table <- get_attrition_coords(
attrition_labels,
attrition_count,
percent = percent
)
get_attrition_plot(attrition_table)
},
#' @description
#' Get Cohort related statistics.
#' @param step_id When `filter_id` specified, `step_id` precises from which step the filter comes from.
#' Otherwise data from specified step is used to calculate required statistics.
#' @param filter_id If not missing, filter related data statistics are returned.
#' @param ... Specific parameters passed to filter related method.
#' @param state Should the stats be calculated on data before ("pre") or after ("post")
#' filtering in specified step.
get_stats = function(step_id, filter_id, ..., state = "post") {
data_id <- as.character(step_id)
if (state == "pre") {
data_id <- prev_step(step_id)
}
if (missing(filter_id)) {
return(
.get_stats(private$source, private$data_objects[[data_id]])
)
}
private$steps[[step_id]]$filters[[filter_id]]$get_stats(
private$data_objects[[data_id]],
...
)
},
#' @description
#' Show source data or filter description
#'
#' @param field Name of the source description field provided as `description` argument to \link{set_source}.
#' If missing, `step_id` and `filter_id` are used to return filter description.
#' @param filter_id Id of the filter to return description of.
#' @param step_id Id of the filter step to return description of.
#' @param modifier A function taking the description as argument.
#' The function can be used to modify its argument (convert to html, display in browser etc.).
show_help = function(
field, step_id, filter_id,
modifier = getOption("cb_help_modifier", default = function(x) x)
) {
description <- NULL
if (!missing(field)) {
if (is.null(self$get_source()$description)) return(NULL)
description <- self$get_source()$description[[field]]
}
if (!missing(step_id) && !missing(filter_id)) {
filter <- self$get_filter(step_id, filter_id)
description <- filter$get_params("description")
}
return(modifier(description))
},
#' @description
#' Return reproducible data filtering code.
#' @param include_source If `TRUE` source generating code will be included.
#' @param include_methods Which methods definition should be included in the result.
#' @param include_action Which action should be returned in the result.
#' `pre_filtering`/`.post_filtering` - to include data transformation before/after filtering.
#' s`run_binding` - data binding transformation.
#' @param modifier A function taking data frame (storing reproducible code metadata) as
#' an argument, and returning data frame with `expr` column which is then
#' combined into a single expression (final result of `get_code`).
#' See \link{.repro_code_tweak}.
#' @param mark_step Include information which filtering step is performed.
#' @param ... Other parameters passed to \link[formatR]{tidy_source}.
get_code = function(
include_source = TRUE, include_methods = c(".pre_filtering", ".post_filtering", ".run_binding"),
include_action = c("pre_filtering", "post_filtering", "run_binding"),
modifier = .repro_code_tweak, mark_step = TRUE, ...) {
source_type <- class(private$source)[1]
# todo improve
fun_args <- environment()
code_params <- c(
"include_source", "include_methods", "include_action", "modifier", "mark_step"
) %>%
stats::setNames(nm = .) %>%
purrr::map(
~if (is.null(self$attributes[[.x]]) & !.x %in% names(self$attributes)) {
fun_args[[.x]]
} else {
self$attributes[[.x]]
}
)
code_components <- list()
for (extra_method in code_params$include_methods) {
code_components <- append(
code_components,
type_expr(type = "meta", expr = method_to_expr(extra_method, source_type))
)
}
if (code_params$include_source) {
code_components <- append(
code_components,
type_expr(type = "source", expr = get_source_expr(source_type, self, private))
)
}
for (step_id in names(self$get_step())) {
if (code_params$mark_step) {
code_components <- append(
code_components,
type_expr(
type = "step_init", step = step_id,
expr = rlang::expr(step_id <- !!step_id)
)
)
}
# todo add binding keys
if ("run_binding" %in% code_params$include_action) {
code_components <- append(
code_components,
type_expr(
type = "run_binding", step = step_id,
expr = rlang::expr(
pre_data_object <- data_object
)
)
)
}
if ("pre_filtering" %in% code_params$include_action) {
code_components <- append(
code_components,
type_expr(
type = "pre_filtering", step = step_id,
expr = rlang::expr(
data_object <- .pre_filtering(source, data_object, !!step_id)
)
)
)
}
active_filters <- private$steps[[step_id]]$filters %>%
purrr::keep(~ .x$get_params("active"))
for (filter in active_filters) {
filter_params <- filter$get_params()
code_components <- append(
code_components,
type_expr(
type = "filtering", step = step_id,
expr = parse_filter_expr(filter),
!!!filter_params
)
)
}
if ("post_filtering" %in% code_params$include_action) {
code_components <- append(
code_components,
type_expr(
type = "post_filtering", step = step_id,
expr = rlang::expr(
data_object <- .post_filtering(source, data_object, !!step_id)
)
)
)
}
if ("run_binding" %in% code_params$include_action) {
code_components <- append(
code_components,
type_expr(
type = "run_binding", step = step_id,
expr = rlang::expr(
for (binding_key in binding_keys) {
data_object <- .run_binding(
source, binding_key,
pre_data_object, data_object
)
}
)
)
)
}
}
code_components_df <- code_components %>%
purrr::map_dfr(function(x) x) %>%
dplyr::filter(purrr::map_lgl(expr, ~!is.null(.)))
code_components_df <- code_params$modifier(private$source, code_components_df)
# todo code include once?
res_quote <- combine_expressions(unlist(code_components_df$expr))
formatR::tidy_source(
text = as.character(res_quote)[-1],
...
)
},
#' @description
#' Trigger data calculations sequentially.
#' @param min_step Step id starting from the calculation will be started.
run_flow = function(min_step,
hook = list(pre = get_hook("pre_run_flow_hook"), post = get_hook("post_run_flow_hook"))) {
run_hooks(hook$pre, self, private)
if (missing(min_step)) {
min_step <- 1
}
min_step <- min(length(private$data_objects), as.integer(min_step)) # make sure all steps data is evaluated
steps_to_execute <- steps_range(min_step, length(private$steps))
for (data_idx in steps_to_execute) {
self$run_step(data_idx)
}
run_hooks(hook$post, self, private)
},
#' @description
#' Trigger data calculations for selected step.
#' @param step_id Id of the step for which to run data calculation.
run_step = function(step_id,
hook = list(
pre = get_hook("pre_run_step_hook"),
post = get_hook("post_run_step_hook")
)) {
step_id <- as.character(step_id)
run_hooks(hook$pre, self, private, step_id)
temp_data_object <- .pre_filtering(
source = private$source,
data_object = private$data_objects[[prev_step(step_id)]],
step_id = step_id
)
active_filters <- self$list_active_filters(step_id)
for (filter_id in active_filters) {
data_filter <- self$get_filter(step_id, filter_id)
temp_data_object <- temp_data_object %>%
data_filter$filter_data()
}
private$data_objects[[step_id]] <- .post_filtering(
source = private$source,
data_object = temp_data_object,
step_id = step_id
)
self$bind_data(step_id)
private$data_objects[[step_id]] <- .post_binding(
source = private$source,
data_object = private$data_objects[[step_id]],
step_id = step_id
)
filter_ids <- names(self$get_step(step_id)$filters)
is_cached <- !is.null(self$get_cache(step_id, state = "pre"))
# todo make sure is_cached logic is correct
if (!is_cached) {
self$update_cache(step_id, state = "pre")
}
self$update_cache(step_id, state = "post")
for (filter_id in filter_ids) {
is_cached <- !is.null(self$get_cache(step_id, filter_id, state = "pre"))
if (!is_cached) {
self$update_cache(step_id, filter_id, state = "pre")
}
}
for (filter_id in active_filters) {
self$update_cache(step_id, filter_id, state = "post")
}
run_hooks(hook$post, self, private, step_id)
},
#' @description
#' Run data binding for selected step.
#' See more at \link{binding-keys}.
#' @param step_id Id of the step for which to bind the data.
bind_data = function(step_id) {
binding_keys <- private$source$binding_keys
if (is.null(binding_keys)) {
return(FALSE)
}
for (binding_key in binding_keys) {
private$data_objects[[step_id]] <- .run_binding(
private$source, binding_key,
self$get_data(step_id, state = "pre", FALSE),
self$get_data(step_id, state = "post", FALSE)
)
}
return(TRUE)
},
#' @description
#' Print defined steps configuration.
describe_state = function() {
if (length(private$steps) == 0) {
cat("No steps configuration found.")
} else {
private$steps %>% purrr::walk(print_step)
}
},
#' @description
#' Get selected step configuration.
#' @param step_id Id of the step to be returned.
get_step = function(step_id) {
if (!missing(step_id)) {
private$steps[[as.character(step_id)]]
} else {
private$steps
}
},
#' @description
#' Get selected filter configuration.
#' @param step_id Id of the step where filter is defined.
#' @param filter_id If of the filter to be returned.
#' @param method Custom function taking filters list as argument.
get_filter = function(step_id, filter_id, method = function(x) x) {
if (!missing(filter_id)) {
method(private$steps[[as.character(step_id)]]$filters[[filter_id]])
} else {
method(private$steps[[as.character(step_id)]]$filters)
}
},
#' @description
#' Update filter or step cache.
#' Caching is saving step and filter attached data statistics such as number of
#' data rows, filter choices or frequencies.
#' @param step_id Id of the step for which caching should be applied.
#' If `filter_id` is not missing, the parameter describes id of the step where filter should be found.
#' @param filter_id Id of the filter for which caching should be applied.
#' @param state Should caching be done on data before ("pre") or after ("post")
#' filtering in specified step.
update_cache = function(step_id, filter_id, state = "post") {
cache_id <- step_id
if (state == "pre") {
cache_id <- prev_step(step_id)
}
if (missing(filter_id)) {
prev_cache <- private$cache[[cache_id]]
cache_changed <- FALSE
private$cache[[cache_id]] <- .get_stats(private$source, self$get_data(step_id, state, FALSE))
if (!identical(prev_cache, private$cache[[cache_id]])) {
cache_changed <- TRUE
}
private$cache[[cache_id]]$changed <- cache_changed
} else {
filter <- self$get_filter(step_id, filter_id)
prev_cache <- private$cache[[cache_id]]$filters[[filter_id]]
cache_changes <- FALSE
private$cache[[cache_id]]$filters[[filter_id]] <- filter$get_stats(self$get_data(step_id, state, FALSE))
if (!identical(prev_cache, private$cache[[cache_id]]$filters[[filter_id]])) {
cache_changed <- TRUE
}
private$cache[[cache_id]]$filters[[filter_id]]$changed <- cache_changed
}
},
#' @description
#' Return step of filter specific cache.
#' @param step_id Id of the step for which cached data should be returned
#' If `filter_id` is not missing, the parameter describes id of the step where filter should be found.
#' @param filter_id Id of the filter for which cache data should be returned.
#' @param state Should cache be returned on data before ("pre") or after ("post")
#' filtering in specified step.
get_cache = function(step_id, filter_id, state = "post") {
step_id <- as.character(step_id)
if (state == "pre") {
step_id <- prev_step(step_id)
}
if (missing(filter_id)) {
private$cache[[step_id]]
} else {
private$cache[[step_id]]$filters[[filter_id]]
}
},
#' @description
#' List active filters included in selected step.
#' @param step_id Id of the step where filters should be found.
list_active_filters = function(step_id) {
get_active_filters <- function(step_id, self) {
active_names <- self$get_filter(step_id) %>%
purrr::keep(~ .x$get_params("active")) %>%
names()
active_names
}
if (missing(step_id)) {
names(self$get_step()) %>%
purrr::map(get_active_filters, self = self) %>%
unlist()
} else {
step_id <- as.character(step_id)
get_active_filters(step_id, self)
}
},
#' @description
#' Return id of the last existing step in Cohort.
last_step_id = function() {
as.character(length(private$steps))
},
#' @description
#' Helper method enabling to run non-standard operation on Cohort object.
#' @param modifier Function of two arguments `self` and `private`.
modify = function(modifier) {
modifier(self, private)
},
#' @field attributes List of Cohort attributes defined while creating a new Cohort object.
attributes = list()
),
private = list(
source = NULL,
steps = list(),
cache = list(),
data_objects = list(),
init_source = function(source, ...) {
private$source <- source
private$steps <- register_steps_and_filters(source, ...)
initial_data <- .init_step(source)
if (!is.null(initial_data)) {
# important note: data objects are indexed from 0, whereas steps and filters from 1
private$data_objects[["0"]] <- initial_data
}
}
)
)
#' Create new 'Cohort' object
#'
#' Cohort object is designed to make operations on source data possible.
#' @param source Source object created with \link{set_source}.
#' @param run_flow If `TRUE`, data flow is run after the operation is completed.
#' @param hook List of hooks describing methods before/after the Cohort is created.
#' See \link{hooks} for more details.
#' @param ... Steps definition (optional). Can be also defined as a sequence of
#' filters - the filters will be added to the first step.
#' @return The object of class `Cohort`.
#'
#' @name create-cohort
#' @export
cohort <- function(source, ..., run_flow = FALSE,
hook = list(
pre = get_hook("pre_cohort_hook"),
post = get_hook("post_cohort_hook")
)) {
Cohort$new(source, ..., run_flow = run_flow, hook = hook)
}
#' @title Managing the Cohort object
#'
#' @description
#' The list of methods designed for managing the Cohort configuration and state.
#'
#' \itemize{
#' \item{\link{add_source} - Add source to Cohort object.}
#' \item{\link{update_source} - Update Cohort object source.}
#' \item{\link{add_step} - Add step to Cohort object.}
#' \item{\link{rm_step} - Remove step from Cohort object.}
#' \item{\link{add_filter} - Add filter to Cohort step.}
#' \item{\link{rm_filter} - Remove filter from Cohort step.}
#' \item{\link{update_filter} - Update filter configuration.}
#' \item{\link{run} - Run data filtering.}
#' }
#'
#' @return The object of class `Cohort` having the modified configuration dependent on the used method.
#' @name managing-cohort
NULL
#' Add source to Cohort object.
#'
#' When Cohort object has been created without source, the method allows to
#' attach it.
#'
#' @param x Cohort object.
#' @param source Source object to be attached.
#' @return The `Cohort` class object with `Source` attached to it.
#'
#' @seealso \link{managing-cohort}
#' @export
add_source <- function(x, source) {
x$add_source(source)
return(invisible(x))
}
#' Update source in Cohort object.
#'
#' @param x Cohort object.
#' @param source Source object to be updated in Cohort.
#' @param keep_steps If `TRUE`, steps definition remain unchanged when updating source.
#' If `FALSE` steps configuration is deleted.
#' If vector of type integer, specified steps will remain.
#' @param run_flow If `TRUE`, data flow is run after the source is updated.
#' @return The `Cohort` class object with updated `Source` definition.
#'
#' @seealso \link{managing-cohort}
#' @export
update_source <- function(x, source, keep_steps = !has_steps(source), run_flow = FALSE) {
x$update_source(source, keep_steps, run_flow)
return(invisible(x))
}
#' Add filtering step definition
#'
#' @param x An object to add step to.
#' @param step Step definition created with \link{step}.
#' @param ... Other parameters passed to specific S3 method.
#' @return Method dependent object (i.e. `Cohort` or `Source`) having new step added.
#'
#' @seealso \link{managing-cohort}, \link{managing-source}
#' @export
add_step <- function(x, step, ...) {
UseMethod("add_step", x)
}
#' @rdname add_step
#' @param run_flow If `TRUE`, data flow is run after the step is added.
#' @param hook List of hooks describing methods to run before/after the step is added.
#' See \link{hooks} for more details.
#' @export
add_step.Cohort <- function(x, step, run_flow = FALSE,
hook = list(
pre = get_hook("pre_add_step_hook"),
post = get_hook("post_add_step_hook")
), ...) {
x$add_step(step, run_flow = run_flow, hook = hook)
return(invisible(x))
}
#' Remove filtering step definition
#'
#' @param x An object from which step should be removed.
#' @param step_id Id of the step to remove.
#' @param ... Other parameters passed to specific S3 method.
#' @return Method dependent object (i.e. `Cohort` or `Source`) having selected step removed.
#'
#' @seealso \link{managing-cohort}, \link{managing-source}
#' @export
rm_step <- function(x, step_id, ...) {
UseMethod("rm_step", x)
}
#' @rdname rm_step
#' @param run_flow If `TRUE`, data flow is run after the step is removed.
#' @param hook List of hooks describing methods before/after the Cohort is created.
#' See \link{hooks} for more details.
#' @export
rm_step.Cohort <- function(x, step_id, run_flow = FALSE,
hook = list(
pre = get_hook("pre_rm_step_hook"),
post = get_hook("post_rm_step_hook")
), ...) {
x$remove_step(step_id, run_flow = run_flow, hook = hook)
return(invisible(x))
}
#' Add filter definition
#'
#' @param x An object to add filter to.
#' @param filter Filter definition created with \link{filter}.
#' @param step_id Id of the step to add the filter to.
#' If missing, filter is added to the last step.
#' @param ... Other parameters passed to specific S3 method.
#' @return Method dependent object (i.e. `Cohort` or `Source`) having filter added in selected step.
#'
#' @seealso \link{managing-cohort}, \link{managing-source}
#' @export
add_filter <- function(x, filter, step_id, ...) {
UseMethod("add_filter", x)
}
#' @rdname add_filter
#' @param run_flow If `TRUE`, data flow is run after the filter is added.
#' @export
add_filter.Cohort <- function(x, filter, step_id, run_flow = FALSE, ...) {
x$add_filter(filter, step_id, run_flow)
return(invisible(x))
}
#' Remove filter definition
#'
#' @param x An object from which filter should be removed.
#' @param step_id Id of the step from which filter should be removed.
#' @param filter_id Id of the filter to be removed.
#' @param ... Other parameters passed to specific S3 method.
#' @return Method dependent object (i.e. `Cohort` or `Source`) having selected filter removed.
#'
#' @seealso \link{managing-cohort}, \link{managing-source}
#' @export
rm_filter <- function(x, step_id, filter_id, ...) {
UseMethod("rm_filter", x)
}
#' @rdname rm_filter
#' @param run_flow If `TRUE`, data flow is run after the filter is removed.
#' @export
rm_filter.Cohort <- function(x, step_id, filter_id, run_flow = FALSE, ...) {
x$remove_filter(step_id, filter_id, run_flow)
return(invisible(x))
}
#' Update filter definition
#'
#' @param x An object in which the filter should be updated.
#' @param step_id Id of the step where filter is defined.
#' @param filter_id Id of the filter to be updated.
#' @param ... Filter parameters that should be updated.
#' @return Method dependent object (i.e. `Cohort` or `Source`) having selected filter updated.
#'
#' @seealso \link{managing-cohort}, \link{managing-source}
#' @export
update_filter <- function(x, step_id, filter_id, ...) {
UseMethod("update_filter", x)
}
#' @rdname update_filter
#' @param run_flow If `TRUE`, data flow is run after the filter is updated.
#' @export
update_filter.Cohort <- function(x, step_id, filter_id, ..., run_flow = FALSE) {
x$update_filter(step_id, filter_id, ..., run_flow = run_flow)
return(invisible(x))
}
#' Trigger data calculations.
#'
#' @param x Cohort object.
#' @param min_step_id Step id starting from the calculation will be started.
#' Used only when `step_id` is missing.
#' @param step_id Id of the step for which to run data calculation.
#' @return The object of class `Cohort` having up-to-date data based on the Cohort state.
#'
#' @seealso \link{managing-cohort}
#' @export
run <- function(x, min_step_id, step_id) {
if (!missing(step_id)) {
x$run_step(step_id)
return(invisible(x))
}
x$run_flow(min_step_id)
return(invisible(x))
}
#' Cohort related methods
#'
#' @description
#' The list of methods designed for getting Cohort-related details.
#'
#' \itemize{
#' \item{\link{plot_data} - Plot filter related Cohort data.}
#' \item{\link{stat} - Get Cohort related statistics.}
#' \item{\link{code} - Return reproducible data filtering code.}
#' \item{\link{get_data} - Get step related data.}
#' \item{\link{sum_up} - Sum up Cohort state.}
#' \item{\link{get_state} - Save Cohort state.}
#' \item{\link{restore} - Restore Cohort state.}
#' \item{\link{attrition} - Show attrition plot.}
#' \item{\link{description} - Show Source or filter related description.}
#' }
#'
#' @return Various type outputs dependent on the selected method.
#' See each method documentation for details.
#' @name cohort-methods
NULL
#' Plot filter related Cohort data.
#'
#' For specified filter the method calls filter-related plot method to present data.
#'
#' @param x Cohort object.
#' @param step_id Id of step in which the filter was defined..
#' @param filter_id Filter id.
#' @param ... Another parameters passed to filter plotting method.
#' @param state Generate plot based on data before ("pre") or after ("post") filtering.
#' @return Filter-specific plot.
#'
#' @seealso \link{cohort-methods}
#' @export
plot_data <- function(x, step_id, filter_id, ..., state = "post") {
x$plot_data(step_id, filter_id, ..., state = state)
}
#' Get Cohort related statistics.
#'
#' Display data statistics related to specified step or filter.
#'
#' @param x Cohort object.
#' @param step_id When `filter_id` specified, `step_id` precises from which step the filter comes from.
#' Otherwise data from specified step is used to calculate required statistics.
#' @param filter_id If not missing, filter related data statistics are returned.
#' @param ... Specific parameters passed to filter related method.
#' @param state Should the stats be calculated on data before ("pre") or after ("post")
#' filtering in specified step.
#' @return List of filter-specific values summing up underlying filter data.
#'
#' @seealso \link{cohort-methods}
#' @export
stat <- function(x, step_id, filter_id, ..., state = "post") {
x$get_stats(step_id, filter_id, ..., state = state)
}
#' Return reproducible data filtering code.
#'
#' @param x Cohort object.
#' @param include_source If `TRUE` source generating code will be included.
#' @param include_methods Which methods definition should be included in the result.
#' @param include_action Which action should be returned in the result.
#' `pre_filtering`/`.post_filtering` - to include data transformation before/after filtering.
#' s`run_binding` - data binding transformation.
#' @param modifier A function taking data frame (storing reproducible code metadata) as
#' an argument, and returning data frame with `expr` column which is then
#' combined into a single expression (final result of `get_code`).
#' See \link{.repro_code_tweak}.
#' @param mark_step Include information which filtering step is performed.
#' @param ... Other parameters passed to \link[formatR]{tidy_source}.
#' @return \link[formatR]{tidy_source} output storing reproducible code for generating final step data.
#'
#' @seealso \link{cohort-methods}
#' @export
code <- function(x, include_source = TRUE, include_methods = c(".pre_filtering", ".post_filtering", ".run_binding"),
include_action = c("pre_filtering", "post_filtering", "run_binding"),
modifier = .repro_code_tweak, mark_step = TRUE, ...) {
x$get_code(include_source = include_source, include_methods = include_methods,
include_action = include_action,
modifier = modifier, mark_step = mark_step, ...)
}
#' Get step related data
#'
#' @param x Cohort object.
#' @param step_id Id of the step from which to source data.
#' @param state Return data before ("pre") or after ("post") step filtering?
#' @param collect Return raw data source (`FALSE`) object or collected (to R memory) data (`TRUE`).
#' @return Subset of Source-specific data connection object or its evaluated version.
#'
#' @seealso \link{cohort-methods}
#' @export
get_data <- function(x, step_id, state = "post", collect = FALSE) {
x$get_data(step_id, state = state, collect = collect)
}
#' Sum up Cohort state.
#'
#' @param x Cohort object.
#' @return None (invisible NULL). Printed summary of Cohort state.
#'
#' @seealso \link{cohort-methods}
#' @export
sum_up <- function(x) {
x$describe_state()
}
#' Get Cohort configuration state.
#'
#' @param x Cohort object.
#' @param step_id If provided, the selected step state is returned.
#' @param json If TRUE, return state in JSON format.
#' @param extra_fields Names of extra fields included in filter to be added to state.
#' @return List object of character string being the list convertion to JSON format.
#'
#' @seealso \link{cohort-methods}
#' @export
get_state <- function(x, step_id, json = FALSE, extra_fields = NULL) {
x$get_state(step_id = step_id, json = json, extra_fields = extra_fields)
}
#' Restore Cohort object.
#'
#' The method allows to restore Cohort object with provided configuration state.
#'
#' @param x Cohort object.
#' @param state List or JSON string containing steps and filters configuration.
#' See \link{get_state}.
#' @param modifier Function two parameters combining the previous and provided state.
#' The returned state is then restored.
#' @param run_flow If TRUE, filtering flow is applied when the operation is finished.
#' @return The `Cohort` class object having the state restored based on provided config.
#'
#' @seealso \link{cohort-methods}
#' @export
restore <- function(x, state, modifier = function(prev_state, state) state, run_flow = FALSE) {
x$restore(state = state, modifier = modifier, run_flow = run_flow)
return(invisible(x))
}
#' Show attrition plot.
#'
#' @param x Cohort object.
#' @param ... Source specific parameters required to generate attrition.
#' @param percent Should attrition changes be presented with percentage values.
#' @return Plot object of class `ggplot`.
#'
#' @seealso \link{cohort-methods}
#' @export
attrition <- function(x, ..., percent = FALSE) {
x$show_attrition(..., percent = percent)
}
#' Show source data or filter description
#'
#' If defined allows to check the provided description related to source data or configured filters.
#'
#' @param x Cohort object.
#' @param field Name of the source description field provided as `description` argument to \link{set_source}.
#' If missing, `step_id` and `filter_id` are used to return filter description.
#' @param filter_id Id of the filter to return description of.
#' @param step_id Id of the filter step to return description of.
#' @param modifier A function taking the description as argument.
#' The function can be used to modify its argument (convert to html, display in browser etc.).
#' @return Any object (or its subset) attached to Source of filter via description argument.
#'
#' @seealso \link{cohort-methods}
#' @export
description <- function(x, field, step_id, filter_id,
modifier = getOption("cb_help_modifier", default = function(x) x)) {
x$show_help(field = field, step_id = step_id, filter_id = filter_id, modifier = modifier)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.