R/class_active.R

Defines functions active_new

active_new <- function(
  pipeline = NULL,
  meta = NULL,
  names = NULL,
  shortcut = NULL,
  queue = NULL,
  reporter = NULL,
  seconds_meta_append = NULL,
  seconds_meta_upload = NULL,
  envir = NULL
) {
  active_class$new(
    pipeline = pipeline,
    meta = meta,
    names = names,
    shortcut = shortcut,
    queue = queue,
    reporter = reporter,
    seconds_meta_append = seconds_meta_append,
    seconds_meta_upload = seconds_meta_upload,
    envir = envir
  )
}

active_class <- R6::R6Class(
  classname = "tar_active",
  inherit = algorithm_class,
  portable = FALSE,
  cloneable = FALSE,
  public = list(
    envir = NULL,
    exports = NULL,
    process = NULL,
    seconds_start = NULL,
    seconds_appended_meta = -Inf,
    seconds_appended_progress = -Inf,
    seconds_meta_uploaded = -Inf,
    skipping = TRUE,
    initialize = function(
      pipeline = NULL,
      meta = NULL,
      names = NULL,
      shortcut = NULL,
      queue = NULL,
      reporter = NULL,
      seconds_meta_append = NULL,
      seconds_meta_upload = NULL,
      envir = NULL
    ) {
      super$initialize(
        pipeline = pipeline,
        meta = meta,
        names = names,
        shortcut = shortcut,
        queue = queue,
        reporter = reporter,
        seconds_meta_append = seconds_meta_append,
        seconds_meta_upload = seconds_meta_upload
      )
      self$envir <- envir
    },
    ensure_meta = function() {
      new_store <- !file.exists(self$meta$store)
      self$meta$database$sync(prefer_local = TRUE, verbose = FALSE)
      self$meta$migrate_database()
      self$meta$validate()
      self$meta$preprocess(write = TRUE)
      if (new_store) {
        self$write_gitignore()
        self$write_user()
      }
      self$meta$record_imports(self$pipeline$imports, self$pipeline)
      self$meta$restrict_records(self$pipeline)
    },
    flush_meta = function() {
      self$meta$database$flush_rows()
      self$scheduler$progress$database$flush_rows()
    },
    flush_meta_time = function() {
      now <- time_seconds_local()
      if ((now - seconds_appended_meta) >= seconds_meta_append) {
        .subset2(.subset2(.subset2(self, "meta"), "database"), "flush_rows")()
        self$seconds_appended_meta <- now
      }
      if (skipping) {
        threshold <- max(min_seconds_appended_progress, seconds_meta_append)
      } else {
        threshold <- seconds_meta_append
      }
      if ((now - seconds_appended_progress) >= threshold) {
        progress <- .subset2(.subset2(self, "scheduler"), "progress")
        .subset2(.subset2(progress, "database"), "flush_rows")()
        self$seconds_appended_progress <- now
        self$skipping <- TRUE
      }
    },
    upload_meta = function() {
      self$meta$database$upload_staged()
      self$scheduler$progress$database$upload_staged()
    },
    upload_meta_time = function() {
      now <- time_seconds_local()
      if ((now - seconds_meta_uploaded) >= seconds_meta_upload) {
        .subset2(self, "upload_meta")()
        self$seconds_meta_uploaded <- now
      }
    },
    sync_meta_time = function() {
      .subset2(self, "flush_meta_time")()
      .subset2(self, "upload_meta_time")()
    },
    flush_upload_meta_file = function(target) {
      if (target_allow_meta(target)) {
        .subset2(self, "flush_meta")()
        .subset2(self, "upload_meta")()
      }
    },
    write_gitignore = function() {
      writeLines(
        c(
          "# CAUTION: do not edit this file by hand!",
          "# _targets/objects/ may have large data files,",
          "# and _targets/meta/process may have sensitive information.",
          "# It is good pratice to either commit nothing from _targets/,",
          "# or if your data is not too sensitive,",
          "# commit only _targets/meta/meta.",
          "*",
          "!.gitignore",
          "!meta",
          "meta/*",
          "!meta/meta"
        ),
        path_gitignore(self$meta$store)
      )
    },
    write_user = function() {
      dir_create(path_user_dir(self$meta$store))
    },
    ensure_process = function() {
      self$process <- process_init(path_store = self$meta$store)
      self$process$assert_unique()
      self$process$record_process()
      self$process$database$upload(verbose = FALSE)
    },
    produce_exports = function(envir, path_store, is_globalenv = NULL) {
      map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises
      if (is_globalenv %|||% identical(envir, globalenv())) {
        out <- as.list(envir, all.names = TRUE)
        out <- out[fltr(names(out), ~!is_internal_name(.x, envir))]
        out[[".tar_envir_5048826d"]] <- "globalenv"
      } else {
        discard <- fltr(names(envir), ~is_internal_name(.x, envir))
        remove(list = discard, envir = envir)
        out <- list(.tar_envir_5048826d = envir)
      }
      out[[".tar_path_store_5048826d"]] <- path_store
      out[[".tar_fun_5048826d"]] <- tar_runtime$fun
      out[[".tar_options_5048826d"]] <- tar_options$export()
      out[[".tar_envvars_5048826d"]] <- tar_envvars()
      out
    },
    update_exports = function() {
      self$exports <- self$produce_exports(
        envir = self$envir,
        path_store = self$meta$store
      )
    },
    ensure_exports = function() {
      if (is.null(self$exports)) {
        self$update_exports()
      }
    },
    unload_transient = function() {
      pipeline_unload_transient(self$pipeline)
    },
    unmarshal_target = function(target) {
      builder_unmarshal_value(target)
    },
    skip_target = function(target) {
      target_skip(
        target = target,
        pipeline = .subset2(self, "pipeline"),
        scheduler = .subset2(self, "scheduler"),
        meta = .subset2(self, "meta"),
        active = TRUE
      )
    },
    process_target = function(name) {
      scheduler <- .subset2(self, "scheduler")
      progress <- .subset2(scheduler, "progress")
      pipeline <- .subset2(self, "pipeline")
      meta <- .subset2(self, "meta")
      .subset2(.subset2(scheduler, "backoff"), "reset")()
      target <- pipeline_get_target(pipeline, name)
      target_debug(target)
      target_update_depend(target, pipeline, meta)
      if (counter_exists_name(.subset2(progress, "trimmed"), name)) {
        .subset2(scheduler, "trim")(target, pipeline)
        counter_del_name(.subset2(progress, "queued"), name)
        target_update_queue(target, scheduler)
      } else if (target_should_run(target, meta)) {
        self$skipping <- inherits(target, "tar_pattern")
        self$flush_upload_meta_file(target)
        .subset2(self, "run_target")(target)
      } else {
        .subset2(self, "skip_target")(target)
      }
    },
    backoff = function() {
      self$scheduler$backoff$wait()
    },
    start = function() {
      tar_runtime$active <- TRUE # Needs to be set here for tests.
      self$seconds_start <- time_seconds()
      pipeline_prune_names(self$pipeline, self$names)
      pipeline_resolve_auto(self$pipeline)
      self$ensure_meta()
      self$set_file_info()
      self$update_scheduler()
      self$bootstrap_shortcut_deps()
      self$ensure_process()
      self$scheduler$progress$database$reset_storage()
      self$scheduler$reporter$report_start()
    },
    end = function() {
      on.exit(self$meta$database$close())
      on.exit(scheduler$progress$database$close(), add = TRUE)
      scheduler <- self$scheduler
      self$flush_meta()
      self$meta$database$deduplicate_storage()
      self$upload_meta()
      path_scratch_del(path_store = self$meta$store)
      compare_working_directories()
      seconds_elapsed <- time_seconds() - self$seconds_start
      scheduler$reporter$report_end(scheduler$progress, seconds_elapsed)
    },
    validate = function() {
      super$validate()
      if (!is.null(self$process)) {
        self$process$validate()
      }
    }
  )
)

min_seconds_appended_progress <- 1

Try the targets package in your browser

Any scripts or data that you put into this service are public.

targets documentation built on June 8, 2025, 10:24 a.m.