R/class-watch-dog.R

RAVEWatchDog <- R6::R6Class(
  classname = "RAVEWatchDog",
  portable = TRUE,
  private = list(
    pipeline_names = c(
      "import_lfp_native",
      "notch_filter",
      "wavelet_module",
      'reference_module'
    ),
    .raw_path = character(0),
    .job_name = character(0),
    .watch_path = character(0),
    .time_threshold = NULL,
    .project_name = character(0),
    .file_pattern = "^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$",

    .registry_cache = NULL,
    .set_status = function(item) {
      stopifnot(
        is.data.frame(item) && nrow(item) == 1 &&
          setequal(names(item), c("Filename", "Subject", "Block",
                                  "Status", "Details", "LastModified",
                                  "TaskStarted", "TaskEnded", "Directory"))
      )
      registry <- self$load_registry(update = TRUE)
      sel <- registry$Filename == item$Filename
      if(length(sel) && any(sel)) {
        registry <- registry[!sel, ]
      }
      # Directly binding two items will result in error (with NA)
      item$LastModified <- as.POSIXlt(item$LastModified)
      item$TaskStarted <- as.POSIXlt(item$TaskStarted)
      item$TaskEnded <- as.POSIXlt(item$TaskEnded)
      registry <- rbind(item, registry)
      registry <- registry[order(registry$TaskStarted, na.last = TRUE, decreasing = TRUE), ]
      private$.registry_cache <- registry

      # save to csv
      registry$LastModified <- strftime(registry$LastModified)
      registry$TaskStarted <- strftime(registry$TaskStarted)
      registry$TaskEnded <- strftime(registry$TaskEnded)
      dir_create2(dirname(self$registry_path))
      safe_write_csv(registry, file = self$registry_path, row.names = FALSE, quiet = TRUE)
    },

    .get_status = function(file, update = FALSE) {
      stopifnot(length(file) == 1 && file.exists(file.path(private$.watch_path, file)))

      mtime <- file.mtime(file.path(private$.watch_path, file))

      # get subject code, block ID
      fname <- filenames(file)
      m <- gregexec(self$file_pattern, fname, ignore.case = TRUE, useBytes = TRUE)[[1]]
      ml <- attr(m, "match.length")

      error_item <- data.frame(
        Filename = file,
        Subject = NA,
        Block = NA,
        Status = "parse-error",
        Details = "Cannot parse subject code or block ID",
        LastModified = mtime,
        TaskStarted = NA,
        TaskEnded = NA,
        Directory = private$.watch_path
      )
      if(length(m) != 3) {
        return(error_item)
      }

      subject_code <- substr(fname, m[[2]], m[[2]] + ml[[2]] - 1)
      block <- substr(fname, m[[3]], m[[3]] + ml[[3]] - 1)
      if(!nchar(subject_code) || !nchar(block)) {
        return(error_item)
      }

      if(startsWith(block, "0")) {
        block <- sprintf("block%s", block)
      }

      registry <- self$load_registry(update = update)
      if(file %in% registry$Filename) {
        sel <- which(registry$Filename == file)
        item <- registry[sel[[1]], ]
        item$LastModified <- mtime
      } else {
        item <- data.frame(
          Filename = file,
          Subject = subject_code,
          Block = block,
          Status = "ready",
          Details = "Just found the subject",
          LastModified = mtime,
          TaskStarted = NA,
          TaskEnded = NA,
          Directory = private$.watch_path
        )
      }
      item

    },

    .queue = NULL,
    .jobs = NULL,
    .max_jobs = numeric(0L)
  ),
  public = list(

    cache_path = character(0),

    NSType_LFP = "ns3",
    NSType_stimuli = "ns5",

    initialize = function(watch_path, job_name = "RAVEWatchDog"){
      if(!grepl("^[a-zA-Z0-9-]+$", job_name)) {
        stop("Watch dog name can only contain letters, digits, and dash [-].")
      }
      private$.job_name <- job_name
      private$.watch_path <- normalizePath(watch_path, mustWork = FALSE)

      # Default values
      private$.max_jobs <- 1L
      private$.jobs <- dipsaus::fastmap2()

      # We do not allow users to change raw_data_dir once created
      private$.raw_path <- raveio_getopt("raw_data_dir")

      # Automatically set cache path, it can be changed
      self$cache_path <- file.path(cache_root(), "_automation_", job_name)

      dir_create2(self$log_path)
    },

    load_registry = function(update = TRUE) {
      if(!is.data.frame(private$.registry_cache)) {
        update <- TRUE
      }

      if(update) {
        if(file.exists(self$registry_path)) {
          tbl <- utils::read.csv(self$registry_path, header = TRUE,
                                 colClasses = "character")
          colnms <- c("Filename", "Subject", "Block",
                      "Status", "Details", "LastModified",
                      "TaskStarted", "TaskEnded", "Directory")
          if(all(colnms %in% names(tbl))) {
            tbl <- tbl[, colnms]
            tbl$LastModified <- as.POSIXlt(tbl$LastModified)
            tbl$TaskStarted <- as.POSIXlt(tbl$TaskStarted)
            tbl$TaskEnded <- as.POSIXlt(tbl$TaskEnded)
            # re-order
            tbl <- tbl[order(tbl$TaskStarted, decreasing = TRUE),]
            private$.registry_cache <- tbl
            return(tbl)
          }
        }
        private$.registry_cache <- data.frame(
          Filename = character(0L),
          Subject = character(0L),
          Block = character(0L),
          Status = character(0L),
          Details = character(0L),
          LastModified = as.POSIXlt(character(0L)),
          TaskStarted = as.POSIXlt(character(0L)),
          TaskEnded = as.POSIXlt(character(0L)),
          Directory = character(0L)
        )
      }

      private$.registry_cache

    },

    check_file_registry = function() {

      # check the watch path
      fs <- list.files(
        private$.watch_path,
        all.files = FALSE,
        full.names = FALSE,
        include.dirs = FALSE,
        recursive = TRUE
      )
      fs <- fs[grepl(self$file_pattern, filenames(fs), ignore.case = TRUE)]

      if(!length(fs)) { return(character(0L)) }

      mtime <- file.mtime(file.path(private$.watch_path, fs))
      sel <- mtime >= self$time_threshold

      if(!any(sel)) { return(character(0L)) }

      fs <- fs[sel]

      registry <- self$load_registry()

      ignored_files <- registry$Filename[!registry$Status %in% c("queued")]

      fs <- fs[!fs %in% ignored_files]

      return(fs)

    },

    add_to_queue = function(files, force = FALSE) {
      files <- unique(files)
      files <- files[!is.na(files)]

      if(!length(files)) {
        return(length(private$.queue))
      }

      if(length(files) > 1) {
        lapply(files, function(file) {
          self$add_to_queue(file)
        })
        return(length(private$.queue))
      }

      # extract information
      file <- files

      tryCatch({
        item <- private$.get_status(file)

        if(force && item$Status == "running") {
          stop("A process is working on the block. Only one process can work on a block at a time.")
        }

        if(item$Status == "parse-error") {
          private$.set_status(item)
          stop(item$Details)
        }

        if(force || item$Status == "ready") {
          item$Details <- ""
          item$Status <- "queued"
          private$.set_status(item)
          if(force) {
            private$.queue <- unique(c(file, private$.queue))
            catgl("File [{file}] prepended to the task queue", level = "INFO")
          } else {
            private$.queue <- unique(c(private$.queue, file))
            catgl("File [{file}] appended to the task queue", level = "INFO")
          }
        } else if (item$Status == "queued") {
          if(!file %in% private$.queue) {
            private$.queue <- unique(c(private$.queue, file))
          }
        }
      }, error = function(e) {
        catgl("Cannot add file [{file}] to queue. Reason: ",
                      e$message, level = "ERROR")
      })


      return(length(private$.queue))

    },

    check_job_status = function() {
      jobs <- private$.jobs
      nms <- names(jobs)
      lapply(nms, function(file) {
        check <- jobs[[file]]
        if(!is.function(check)) {
          private$.queue <- private$.queue[!private$.queue %in% file]
          jobs[["@remove"]](file)
          catgl("Cannot find process handler of file [{file}]. Removed from queue", level = "WARNING")
          return()
        }
        code <- check()
        if(code == 0) {
          item <- private$.get_status(file)
          item$Status <- "finished"
          item$Details <- ""
          item$TaskEnded <- as.POSIXlt(Sys.time())
          private$.set_status(item)
          # remove from queue
          private$.queue <- private$.queue[!private$.queue %in% file]
          jobs[["@remove"]](file)
          catgl("File [{file}] finished. Removed from queue", level = "INFO")
          return()
        }
        if(code < 0) {
          item <- private$.get_status(file)
          item$Status <- "errored"
          item$Details <- paste(attr(code, "rs_exec_error"), collapse = "")
          item$TaskEnded <- as.POSIXlt(Sys.time())
          private$.set_status(item)
          # remove from queue
          private$.queue <- private$.queue[!private$.queue %in% file]
          jobs[["@remove"]](file)
          catgl("File [{file}] errored (reason: {item$Details}). Removed from queue", level = "ERROR")
          return()
        }
      })
      length(jobs)
    },

    get_pipeline_default_settings = function() {
      re <- lapply(private$pipeline_names, function(pname) {
        pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
        settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())
        settings[["@remove"]](c(
          "import_setup__project_name",
          "import_setup__subject_code",
          "force_import",
          "skip_validation",
          "import_channels__sample_rate",
          "import_channels__electrodes",
          "import_channels__electrode_file",
          "import_blocks__format",
          "import_blocks__session_block",
          "project_name",
          "subject_code",
          "electrode_group",
          "changes"
        ))
        if(pname == "notch_filter") {
          settings$diagnostic_plot_params$path <- NULL
        }
        as.list(settings, sorted = TRUE)
      })
      names(re) <- private$pipeline_names
      re
    },

    create_settings_file = function(overwrite = FALSE) {
      settings_path <- file.path(self$log_path, "settings.yaml")
      if(!overwrite && file.exists(settings_path)) {
        stop("Existing settings file already created. If you want to overwrite that file, use `overwrite=TRUE`")
      }
      backup_file(settings_path, remove = FALSE)
      save_yaml(self$get_pipeline_default_settings(), file = settings_path)
      catgl("A settings file has been created at [{settings_path}]", level = "INFO")
      settings_path
    },

    get_pipeline_settings = function(pname, file, brfile) {
      item <- private$.get_status(file)

      # load blackrock file
      electrode_table <- brfile$electrode_table
      electrodes <- electrode_table$Electrode[electrode_table$NSType == self$NSType_LFP]

      # load pipeline
      pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
      settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())

      # override user-defined settings
      settings_path <- file.path(self$log_path, "settings.yaml")
      if(file.exists(settings_path)) {
        tmp <- load_yaml(settings_path)
        dipsaus::list_to_fastmap2(as.list(tmp[[pname]]), settings)
      }

      # pipeline-specific settings
      subject_code <- sprintf("%s__%s", item$Subject, item$Block)

      switch (
        pname,
        "import_lfp_native" = {
          settings$import_setup__project_name <- self$project_name
          settings$import_setup__subject_code <- subject_code
          settings$force_import <- TRUE
          settings$skip_validation <- FALSE
          srate <- brfile$sample_rates[[self$NSType_LFP]]
          settings$import_channels__sample_rate <- srate
          settings$import_channels__electrodes <- dipsaus::deparse_svec(electrodes)
          settings$import_channels__electrode_file <- "auto"
          settings$import_blocks__format <- names(IMPORT_FORMATS)[unlist(IMPORT_FORMATS) == 'native_blackrock']
          settings$import_blocks__session_block <- item$Block
        },
        "notch_filter" = {
          graph_path <- file.path(private$.raw_path, item$Subject, item$Block, "notch-diagnostic-plots")
          settings$project_name <- self$project_name
          settings$subject_code <- subject_code

          graph_path <- dir_create2(graph_path)
          settings$diagnostic_plot_params$path <- file.path(graph_path, "notch-diagnostic-plots.pdf")
        },
        "wavelet_module" = {
          settings$project_name <- self$project_name
          settings$subject_code <- subject_code
        },
        "reference_module" = {
          settings$project_name <- self$project_name
          settings$subject_code <- subject_code
          settings$reference_name <- "[new reference]"
        },
        {}
      )

      return(settings)

    },

    process_file = function(file) {

      item <- private$.get_status(file)
      brfile <- BlackrockFile$new(path = file.path(private$.watch_path, file), block = item$Block)

      # prepare working directory
      workdir <- file.path(self$cache_path, paste0(file, ".workdir"))
      if(dir.exists(workdir)) {
        unlink(workdir, force = TRUE, recursive = TRUE)
      }
      workdir <- dir_create2(workdir)


      # copy pipelines
      for(pname in private$pipeline_names) {
        pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
        dest <- file.path(workdir, "pipelines", pipeline$pipeline_name)
        pipeline_fork(
          src = pipeline$pipeline_path,
          dest = dest,
          activate = FALSE
        )
        pipeline <- pipeline(pname, paths = file.path(workdir, "pipelines"))
        settings <- self$get_pipeline_settings(pname = pname, file = file, brfile = brfile)
        pipeline$set_settings(.list = settings)
        catgl("Set pipeline [{pname}]:", level = "DEFAULT")
        save_yaml(settings, file = stdout())
      }

      # copy files
      block_path <- file.path(private$.raw_path, item$Subject, item$Block)
      dir_create2(block_path)
      fs <- paste0(brfile$base_path, c(".nev", ".ccf", paste0(".ns", 1:9)))
      fs <- fs[file.exists(fs)]
      for(f in fs) {
        file.copy(f, file.path(block_path, basename(f)), overwrite = TRUE, recursive = FALSE, copy.date = TRUE)
      }

      fake_path <- file.path(private$.raw_path, sprintf("%s__%s", item$Subject, item$Block))
      fake_path <- dir_create2(fake_path)
      if(!file.exists(file.path(fake_path, item$Block))) {
        if(dipsaus::get_os() == "windows") {
          file.copy(block_path, to = fake_path, recursive = TRUE, copy.date = TRUE)
        } else {
          file.symlink(block_path, to = fake_path)
        }
      }

      # Make sure the subject surface files can be loaded properly?
      if(!file.exists(file.path(fake_path, 'rave-imaging'))) {
        # check if original subject has the fs recon
        imaging_path_orig <- file.path(private$.raw_path, item$Subject, 'rave-imaging')
        if(file.exists(imaging_path_orig)) {
          if(dipsaus::get_os() == "windows" || !dir.exists(imaging_path_orig)) {
            # On windows, symlink does not work well so just copy
            # On Unix, if rave-imaging is a symlink, then R (4.0) will treat
            # the path as a file. Just copy over
            file.copy(imaging_path_orig, to = fake_path,
                      recursive = TRUE, copy.date = TRUE)
          } else {
            file.symlink(imaging_path_orig, to = fake_path)
          }
        }
      }

      # set to running
      catgl("Start processing [{file}]", level = "INFO")
      item$Status <- "running"
      item$Details <- ""
      item$TaskStarted <- as.POSIXlt(Sys.time())
      item$TaskEnded <- as.POSIXlt(NA)
      private$.set_status(item)
      private$.jobs[[file]] <- dipsaus::rs_exec(
        name = file,
        focus_on_console = TRUE,
        rs = TRUE,
        wait = FALSE,
        quoted = TRUE,
        nested_ok = TRUE,
        expr = bquote({

          workdir <- .(workdir)
          cwd <- getwd()

          setwd(workdir)
          on.exit({ setwd(cwd) }, add = TRUE, after = FALSE)
          raveio <- asNamespace('raveio')

          if(dipsaus::package_installed('ravedash')){
            ravedash <- do.call('asNamespace', list('ravedash'))
            ravedash$set_logger_path(root_path = .(self$log_path), max_files = 10L)
            ravedash$logger_threshold("trace", type = 'file')
            ravedash$logger_threshold("trace", type = 'console')
          } else {
            ravedash <- NULL
          }
          blackrock_src <- .(file)

          pname <- "import_lfp_native"
          pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
          raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
          pipeline$run(async = FALSE, as_promise = FALSE,
                       scheduler = "none", type = "smart")
          raveio$catgl("[{blackrock_src}]: [{pname}] finished", level = "INFO")
          pname <- "notch_filter"
          pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
          raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
          pipeline$run(names = "apply_notch", async = FALSE, as_promise = FALSE,
                       scheduler = "none", type = "smart")

          pname <- "wavelet_module"
          pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
          raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
          pipeline$run(async = FALSE, as_promise = FALSE,
                       scheduler = "none", type = "smart")

          subject <- pipeline$read("subject")

          # generate reference if exists
          pname <- "reference_module"
          pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
          raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] (reference_table_initial) at [{pipeline$pipeline_path}]", level = "INFO")
          # check subject's localization
          elec_path <- .(file.path(private$.raw_path, item$Subject, "rave-imaging", "electrodes.csv"))
          if(!file.exists(elec_path)) {
            elec_path <- .(file.path(raveio_getopt("data_dir"), private$.project_name, item$Subject, "rave", "meta", "electrodes.csv"))
          }

          if(!file.exists(elec_path)) {
            # list all projects, try to find
            all_projects <- raveio$get_projects(refresh = TRUE)
            elec_path <- file.path(raveio_getopt("data_dir"), all_projects,
                                        .(item$Subject), "rave", "meta", "electrodes.csv")
            elec_path <- elec_path[!is.na(elec_path) & file.exists(elec_path)]
          }
          if(length(elec_path)) {
            elec_path <- elec_path[[1]]
          }

          if(length(elec_path) == 1 && !is.na(elec_path) && file.exists(elec_path)) {
            tryCatch({
              elec_path <- elec_path[[1]]
              elec_table <- utils::read.csv(elec_path)
              elec_table$Electrode <- as.integer(elec_table$Electrode)
              if(length(elec_table$Electrode) == length(subject$electrodes)) {
                o <- order(elec_table$Electrode)
                elec_table <- elec_table[o, ]
                elec_table$Electrode <- sort(subject$electrodes)
              }

              raveio$safe_write_csv(elec_table, file.path(subject$meta_path, "electrodes.csv"),
                                    row.names = FALSE)

            }, error = function(e) {
              if(is.environment(ravedash)) {
                ravedash$logger_error_condition(e, level = "warning")
              } else {
                warning(e)
              }
            })
          }
          pipeline$run(names = "reference_table_initial",
                       async = FALSE, as_promise = FALSE,
                       scheduler = "none", type = "smart")
          unsaved_meta <- file.path(subject$meta_path, "reference__unsaved.csv")
          target_meta <- file.path(subject$meta_path, "reference_auto_generated.csv")
          if(file.exists(unsaved_meta) && !file.exists(target_meta)) {
            file.copy(unsaved_meta, target_meta, overwrite = TRUE)
          }

          # make subject backward-compatible
          raveio$catgl("[{blackrock_src}]: Making data format backward-compatible", level = "INFO")
          raveio$rave_subject_format_conversion(subject$subject_id)
          raveio$catgl("[{blackrock_src}]: Done", evel = "INFO")

        })
      )

    },

    scan = function() {

      files <- self$check_file_registry()
      self$add_to_queue(files)

      # check job status
      njobs <- self$check_job_status()
      inactives <- private$.queue[!private$.queue %in% names(private$.jobs)]

      if(length(inactives) && njobs < private$.max_jobs) {
        # schedule job
        navails <- private$.max_jobs - njobs
        inactives <- inactives[seq_len(min(navails, length(inactives)))]
        for(file in inactives) {
          self$process_file(file)
        }
      }

    },

    reset_registry = function() {
      if(!interactive()) {
        stop("Cannot reset registry in non-interactive mode")
      }
      ans <- dipsaus::ask_yesno(sprintf("Clearing registry for [%s]?", private$.job_name))
      if(isTRUE(ans)) {
        unlink(self$registry_path, force = TRUE)
      }
    },

    watch = function(interval = 5) {
      interval <- as.numeric(interval)
      if(!isTRUE(interval >= 1)) {
        stop("Min interval must be >= 1 seconds")
      }

      if(dipsaus::package_installed('ravedash')){
        ravedash <- do.call('asNamespace', list('ravedash'))
        ravedash$set_logger_path(root_path = self$log_path, max_files = 10L)
        ravedash$logger_threshold("trace", type = 'file')
        ravedash$logger_threshold("trace", type = 'console')
      } else {
        ravedash <- NULL
      }

      on.exit({
        if(is.environment(ravedash)) {
          ravedash$set_logger_path(NULL)
        }
      }, add = TRUE, after = TRUE)

      # make sure directories are there
      dir_create2(self$log_path)
      dir_create2(self$cache_path)

      while(TRUE) {
        tryCatch({
          self$scan()
        }, error = function(e) {
          catgl("Error raised while the master process rescans/schedules tasks. Reason: {paste(e$message, collapse = '\n')}\nWill try again later",
                level = "ERROR")
        })
        Sys.sleep(interval)
      }
    }

  ),
  active = list(

    log_path = function() {
      file.path(private$.raw_path, "_automation", private$.job_name)
    },

    registry_path = function() {
      file.path(self$log_path, "registry.csv")
    },

    time_threshold = function(v) {
      if(!missing(v)) {

        if(length(v) != 1 || is.na(v)) {
          stop("Cannot set time threshold with invalid time")
        }
        tm <- v
        if(is.character(tm)) {
          v <- as.POSIXlt(tm)
          if(is.na(v)) {
            stop("`time_threshold` must have format of [year-month-day hour:minute:second]. For example, '2022-08-03 16:38:00'")
          }
        } else if (!inherits(tm, "POSIXlt")) {
          stop("`time_threshold` must be characters or a `POSIXlt` time object")
        }

        private$.time_threshold <- v

      }
      private$.time_threshold
    },

    project_name = function(v) {
      if(!missing(v)) {
        if(!length(v)) {
          private$.project_name <- character(0)
        } else if(length(v) > 1) {
          stop("Project name must have length of 1")
        } else if(!grepl("^[a-zA-Z0-9_]", v)) {
          stop("Project name can only contain letters, digits, and underscore [_]")
        } else {
          private$.project_name <- as.character(v)
        }
      }
      pn <- private$.project_name
      if(!length(pn)) {
        pn <- "automated"
      }
      pn
    },

    file_pattern = function(v) {
      if(!missing(v)) {
        v <- v[[1]]
        m <- gregexpr("(\\([^\\(\\)]+\\))", v, ignore.case = TRUE)
        m <- unique(m[[1]])
        if(length(m) < 2) {
          stop("File pattern must be a regular expression containing at least two keyword extractors so I can decide the subject code and session block ID. For example, regular expression ['^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$'] matches [YAB_datafile_001.nev]. RAVE will set subject code to [YAB], and block ID as [001].")
        }
        private$.file_pattern <- v
      }
      private$.file_pattern
    },

    queued = function() {
      private$.queue
    },

    max_jobs = function(v) {
      if(!missing(v)) {
        errored <- TRUE
        if(length(v) == 1) {
          v <- as.integer(v)
          if(isTRUE(v > 0)) {
            private$.max_jobs <- v
            errored <- FALSE
          }
        }

        if(errored) {
          stop("Cannot set `max_jobs`, the value must be a positive integer")
        }

      }
      private$.max_jobs
    }

  )
)


#' Monitors 'BlackRock' output folder and automatically import data into 'RAVE'
#' @description Automatically import 'BlackRock' files from designated folder
#' and perform 'Notch' filters, 'Wavelet' transform; also generate epoch,
#' reference files.
#' @param watch_path the folder to watch
#' @param project_name the project name to generate
#' @param task_name the watcher's name
#' @param scan_interval scan the directory every \code{scan_interval} seconds,
#' cannot be lower than 1
#' @param time_threshold time-threshold of files: all files with modified
#' time prior to this threshold will be ignored; default is current time
#' @param max_jobs maximum concurrent imports, default is 1
#' @param as_job whether to run in 'RStudio' background job or to block the
#' session when monitoring; default is auto-detected
#' @param dry_run whether to dry-run the code (instead of executing the
#' scripts, return the watcher's instance and open the settings file);
#' default is false
#' @param config_open whether to open the pipeline configuration file; default
#' is equal to \code{dry_run}
#' @returns When \code{dry_run} is true, then the watcher's instance will be
#' returned; otherwise nothing will be returned.
#' @export
auto_process_blackrock <- function(
    watch_path, project_name = "automated", task_name = "RAVEWatchDog",
    scan_interval = 10, time_threshold = Sys.time(), max_jobs = 1L,
    as_job = NA, dry_run = FALSE, config_open = dry_run
) {

  time_threshold <- as.POSIXlt(time_threshold)
  time_threshold <- time_threshold[!is.na(time_threshold)]
  if(!length(time_threshold)) {
    time_threshold <- as.POSIXlt(Sys.time())
  } else {
    time_threshold <- time_threshold[[1]]
  }

  if(!isFALSE(as_job)) {
    as_job <- dipsaus::rs_avail()
  }

  fun <- dipsaus::new_function2(body = bquote({

    raveio <- asNamespace('raveio')
    watcher <- raveio$RAVEWatchDog$new(
      watch_path = .(watch_path),
      job_name = .(task_name)
    )
    watcher$time_threshold <- .(time_threshold)
    watcher$max_jobs <- .(max_jobs)
    watcher$project_name <- .(project_name)

    settings_path <- file.path(watcher$log_path, "settings.yaml")
    if(!file.exists(settings_path)) {
      watcher$create_settings_file()
    }

    return(watcher)

  }), quote_type = "quote")

  watcher <- fun()
  if( config_open ) {

    settings_path <- file.path(watcher$log_path, "settings.yaml")
    if(!file.exists(settings_path)) {
      watcher$create_settings_file()
    }

    try({
      dipsaus::rs_edit_file(settings_path)
      catgl("Watcher's settings file has been opened. Please check the settings, and edit if necessary. All auto-discovered BlackRock files will be preprocessed using this settings file.", level = "INFO")
    }, silent = TRUE)

  }

  if( dry_run ) {
    return(watcher)
  }



  if(as_job) {
    dipsaus::rs_exec(
      bquote({
        fun <- dipsaus::new_function2(body = .(body(fun)))
        # return(fun)
        watcher <- fun()
        watcher$watch(interval = .(scan_interval))
      }),
      quoted = TRUE,
      rs = TRUE,
      name = task_name,
      focus_on_console = TRUE
    )
  } else {
    watcher$watch(interval = scan_interval)
  }

}
beauchamplab/raveio documentation built on May 5, 2024, 1:03 a.m.