Nothing
active_new <- function(
pipeline = NULL,
meta = NULL,
names = NULL,
shortcut = NULL,
queue = NULL,
reporter = NULL,
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
envir = NULL
) {
active_class$new(
pipeline = pipeline,
meta = meta,
names = names,
shortcut = shortcut,
queue = queue,
reporter = reporter,
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload,
envir = envir
)
}
active_class <- R6::R6Class(
classname = "tar_active",
inherit = algorithm_class,
portable = FALSE,
cloneable = FALSE,
public = list(
envir = NULL,
exports = NULL,
process = NULL,
seconds_start = NULL,
seconds_appended_meta = -Inf,
seconds_appended_progress = -Inf,
seconds_meta_uploaded = -Inf,
skipping = TRUE,
initialize = function(
pipeline = NULL,
meta = NULL,
names = NULL,
shortcut = NULL,
queue = NULL,
reporter = NULL,
seconds_meta_append = NULL,
seconds_meta_upload = NULL,
envir = NULL
) {
super$initialize(
pipeline = pipeline,
meta = meta,
names = names,
shortcut = shortcut,
queue = queue,
reporter = reporter,
seconds_meta_append = seconds_meta_append,
seconds_meta_upload = seconds_meta_upload
)
self$envir <- envir
},
ensure_meta = function() {
new_store <- !file.exists(self$meta$store)
self$meta$database$sync(prefer_local = TRUE, verbose = FALSE)
self$meta$migrate_database()
self$meta$validate()
self$meta$preprocess(write = TRUE)
if (new_store) {
self$write_gitignore()
self$write_user()
}
self$meta$record_imports(self$pipeline$imports, self$pipeline)
self$meta$restrict_records(self$pipeline)
},
flush_meta = function() {
self$meta$database$flush_rows()
self$scheduler$progress$database$flush_rows()
},
flush_meta_time = function() {
now <- time_seconds_local()
if ((now - seconds_appended_meta) >= seconds_meta_append) {
.subset2(.subset2(.subset2(self, "meta"), "database"), "flush_rows")()
self$seconds_appended_meta <- now
}
if (skipping) {
threshold <- max(min_seconds_appended_progress, seconds_meta_append)
} else {
threshold <- seconds_meta_append
}
if ((now - seconds_appended_progress) >= threshold) {
progress <- .subset2(.subset2(self, "scheduler"), "progress")
.subset2(.subset2(progress, "database"), "flush_rows")()
self$seconds_appended_progress <- now
self$skipping <- TRUE
}
},
upload_meta = function() {
self$meta$database$upload_staged()
self$scheduler$progress$database$upload_staged()
},
upload_meta_time = function() {
now <- time_seconds_local()
if ((now - seconds_meta_uploaded) >= seconds_meta_upload) {
.subset2(self, "upload_meta")()
self$seconds_meta_uploaded <- now
}
},
sync_meta_time = function() {
.subset2(self, "flush_meta_time")()
.subset2(self, "upload_meta_time")()
},
flush_upload_meta_file = function(target) {
if (target_allow_meta(target)) {
.subset2(self, "flush_meta")()
.subset2(self, "upload_meta")()
}
},
write_gitignore = function() {
writeLines(
c(
"# CAUTION: do not edit this file by hand!",
"# _targets/objects/ may have large data files,",
"# and _targets/meta/process may have sensitive information.",
"# It is good pratice to either commit nothing from _targets/,",
"# or if your data is not too sensitive,",
"# commit only _targets/meta/meta.",
"*",
"!.gitignore",
"!meta",
"meta/*",
"!meta/meta"
),
path_gitignore(self$meta$store)
)
},
write_user = function() {
dir_create(path_user_dir(self$meta$store))
},
ensure_process = function() {
self$process <- process_init(path_store = self$meta$store)
self$process$assert_unique()
self$process$record_process()
self$process$database$upload(verbose = FALSE)
},
produce_exports = function(envir, path_store, is_globalenv = NULL) {
map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises
if (is_globalenv %|||% identical(envir, globalenv())) {
out <- as.list(envir, all.names = TRUE)
out <- out[fltr(names(out), ~!is_internal_name(.x, envir))]
out[[".tar_envir_5048826d"]] <- "globalenv"
} else {
discard <- fltr(names(envir), ~is_internal_name(.x, envir))
remove(list = discard, envir = envir)
out <- list(.tar_envir_5048826d = envir)
}
out[[".tar_path_store_5048826d"]] <- path_store
out[[".tar_fun_5048826d"]] <- tar_runtime$fun
out[[".tar_options_5048826d"]] <- tar_options$export()
out[[".tar_envvars_5048826d"]] <- tar_envvars()
out
},
update_exports = function() {
self$exports <- self$produce_exports(
envir = self$envir,
path_store = self$meta$store
)
},
ensure_exports = function() {
if (is.null(self$exports)) {
self$update_exports()
}
},
unload_transient = function() {
pipeline_unload_transient(self$pipeline)
},
unmarshal_target = function(target) {
builder_unmarshal_value(target)
},
skip_target = function(target) {
target_skip(
target = target,
pipeline = .subset2(self, "pipeline"),
scheduler = .subset2(self, "scheduler"),
meta = .subset2(self, "meta"),
active = TRUE
)
},
process_target = function(name) {
scheduler <- .subset2(self, "scheduler")
progress <- .subset2(scheduler, "progress")
pipeline <- .subset2(self, "pipeline")
meta <- .subset2(self, "meta")
.subset2(.subset2(scheduler, "backoff"), "reset")()
target <- pipeline_get_target(pipeline, name)
target_debug(target)
target_update_depend(target, pipeline, meta)
if (counter_exists_name(.subset2(progress, "trimmed"), name)) {
.subset2(scheduler, "trim")(target, pipeline)
counter_del_name(.subset2(progress, "queued"), name)
target_update_queue(target, scheduler)
} else if (target_should_run(target, meta)) {
self$skipping <- inherits(target, "tar_pattern")
self$flush_upload_meta_file(target)
.subset2(self, "run_target")(target)
} else {
.subset2(self, "skip_target")(target)
}
},
backoff = function() {
self$scheduler$backoff$wait()
},
start = function() {
tar_runtime$active <- TRUE # Needs to be set here for tests.
self$seconds_start <- time_seconds()
pipeline_prune_names(self$pipeline, self$names)
pipeline_resolve_auto(self$pipeline)
self$ensure_meta()
self$set_file_info()
self$update_scheduler()
self$bootstrap_shortcut_deps()
self$ensure_process()
self$scheduler$progress$database$reset_storage()
self$scheduler$reporter$report_start()
},
end = function() {
on.exit(self$meta$database$close())
on.exit(scheduler$progress$database$close(), add = TRUE)
scheduler <- self$scheduler
self$flush_meta()
self$meta$database$deduplicate_storage()
self$upload_meta()
path_scratch_del(path_store = self$meta$store)
compare_working_directories()
seconds_elapsed <- time_seconds() - self$seconds_start
scheduler$reporter$report_end(scheduler$progress, seconds_elapsed)
},
validate = function() {
super$validate()
if (!is.null(self$process)) {
self$process$validate()
}
}
)
)
min_seconds_appended_progress <- 1
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.