# RAVEWatchDog <- R6::R6Class(
# classname = "RAVEWatchDog",
# portable = TRUE,
# private = list(
# pipeline_names = c(
# "import_lfp_native",
# "notch_filter",
# "wavelet_module",
# 'reference_module'
# ),
# .raw_path = character(0),
# .job_name = character(0),
# .watch_path = character(0),
# .time_threshold = NULL,
# .project_name = character(0),
# .file_pattern = "^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$",
#
# .registry_cache = NULL,
# .set_status = function(item) {
# stopifnot(
# is.data.frame(item) && nrow(item) == 1 &&
# setequal(names(item), c("Filename", "Subject", "Block",
# "Status", "Details", "LastModified",
# "TaskStarted", "TaskEnded", "Directory"))
# )
# registry <- self$load_registry(update = TRUE)
# sel <- registry$Filename == item$Filename
# if(length(sel) && any(sel)) {
# registry <- registry[!sel, ]
# }
# # Directly binding two items will result in error (with NA)
# item$LastModified <- as.POSIXlt(item$LastModified)
# item$TaskStarted <- as.POSIXlt(item$TaskStarted)
# item$TaskEnded <- as.POSIXlt(item$TaskEnded)
# registry <- rbind(item, registry)
# registry <- registry[order(registry$TaskStarted, na.last = TRUE, decreasing = TRUE), ]
# private$.registry_cache <- registry
#
# # save to csv
# registry$LastModified <- strftime(registry$LastModified)
# registry$TaskStarted <- strftime(registry$TaskStarted)
# registry$TaskEnded <- strftime(registry$TaskEnded)
# dir_create2(dirname(self$registry_path))
# safe_write_csv(registry, file = self$registry_path, row.names = FALSE, quiet = TRUE)
# },
#
# .get_status = function(file, update = FALSE) {
# stopifnot(length(file) == 1 && file.exists(file.path(private$.watch_path, file)))
#
# mtime <- file.mtime(file.path(private$.watch_path, file))
#
# # get subject code, block ID
# fname <- filenames(file)
# m <- gregexec(self$file_pattern, fname, ignore.case = TRUE, useBytes = TRUE)[[1]]
# ml <- attr(m, "match.length")
#
# error_item <- data.frame(
# Filename = file,
# Subject = NA,
# Block = NA,
# Status = "parse-error",
# Details = "Cannot parse subject code or block ID",
# LastModified = mtime,
# TaskStarted = NA,
# TaskEnded = NA,
# Directory = private$.watch_path
# )
# if(length(m) != 3) {
# return(error_item)
# }
#
# subject_code <- substr(fname, m[[2]], m[[2]] + ml[[2]] - 1)
# block <- substr(fname, m[[3]], m[[3]] + ml[[3]] - 1)
# if(!nchar(subject_code) || !nchar(block)) {
# return(error_item)
# }
#
# if(startsWith(block, "0")) {
# block <- sprintf("block%s", block)
# }
#
# registry <- self$load_registry(update = update)
# if(file %in% registry$Filename) {
# sel <- which(registry$Filename == file)
# item <- registry[sel[[1]], ]
# item$LastModified <- mtime
# } else {
# item <- data.frame(
# Filename = file,
# Subject = subject_code,
# Block = block,
# Status = "ready",
# Details = "Just found the subject",
# LastModified = mtime,
# TaskStarted = NA,
# TaskEnded = NA,
# Directory = private$.watch_path
# )
# }
# item
#
# },
#
# .queue = NULL,
# .jobs = NULL,
# .max_jobs = numeric(0L)
# ),
# public = list(
#
# cache_path = character(0),
#
# NSType_LFP = "ns3",
# NSType_stimuli = "ns5",
#
# initialize = function(watch_path, job_name = "RAVEWatchDog"){
# if(!grepl("^[a-zA-Z0-9-]+$", job_name)) {
# stop("Watch dog name can only contain letters, digits, and dash [-].")
# }
# private$.job_name <- job_name
# private$.watch_path <- normalizePath(watch_path, mustWork = FALSE)
#
# # Default values
# private$.max_jobs <- 1L
# private$.jobs <- dipsaus::fastmap2()
#
# # We do not allow users to change raw_data_dir once created
# private$.raw_path <- raveio_getopt("raw_data_dir")
#
# # Automatically set cache path, it can be changed
# self$cache_path <- file.path(cache_root(), "_automation_", job_name)
#
# dir_create2(self$log_path)
# },
#
# load_registry = function(update = TRUE) {
# if(!is.data.frame(private$.registry_cache)) {
# update <- TRUE
# }
#
# if(update) {
# if(file.exists(self$registry_path)) {
# tbl <- utils::read.csv(self$registry_path, header = TRUE,
# colClasses = "character")
# colnms <- c("Filename", "Subject", "Block",
# "Status", "Details", "LastModified",
# "TaskStarted", "TaskEnded", "Directory")
# if(all(colnms %in% names(tbl))) {
# tbl <- tbl[, colnms]
# tbl$LastModified <- as.POSIXlt(tbl$LastModified)
# tbl$TaskStarted <- as.POSIXlt(tbl$TaskStarted)
# tbl$TaskEnded <- as.POSIXlt(tbl$TaskEnded)
# # re-order
# tbl <- tbl[order(tbl$TaskStarted, decreasing = TRUE),]
# private$.registry_cache <- tbl
# return(tbl)
# }
# }
# private$.registry_cache <- data.frame(
# Filename = character(0L),
# Subject = character(0L),
# Block = character(0L),
# Status = character(0L),
# Details = character(0L),
# LastModified = as.POSIXlt(character(0L)),
# TaskStarted = as.POSIXlt(character(0L)),
# TaskEnded = as.POSIXlt(character(0L)),
# Directory = character(0L)
# )
# }
#
# private$.registry_cache
#
# },
#
# check_file_registry = function() {
#
# # check the watch path
# fs <- list.files(
# private$.watch_path,
# all.files = FALSE,
# full.names = FALSE,
# include.dirs = FALSE,
# recursive = TRUE
# )
# fs <- fs[grepl(self$file_pattern, filenames(fs), ignore.case = TRUE)]
#
# if(!length(fs)) { return(character(0L)) }
#
# mtime <- file.mtime(file.path(private$.watch_path, fs))
# sel <- mtime >= self$time_threshold
#
# if(!any(sel)) { return(character(0L)) }
#
# fs <- fs[sel]
#
# registry <- self$load_registry()
#
# ignored_files <- registry$Filename[!registry$Status %in% c("queued")]
#
# fs <- fs[!fs %in% ignored_files]
#
# return(fs)
#
# },
#
# add_to_queue = function(files, force = FALSE) {
# files <- unique(files)
# files <- files[!is.na(files)]
#
# if(!length(files)) {
# return(length(private$.queue))
# }
#
# if(length(files) > 1) {
# lapply(files, function(file) {
# self$add_to_queue(file)
# })
# return(length(private$.queue))
# }
#
# # extract information
# file <- files
#
# tryCatch({
# item <- private$.get_status(file)
#
# if(force && item$Status == "running") {
# stop("A process is working on the block. Only one process can work on a block at a time.")
# }
#
# if(item$Status == "parse-error") {
# private$.set_status(item)
# stop(item$Details)
# }
#
# if(force || item$Status == "ready") {
# item$Details <- ""
# item$Status <- "queued"
# private$.set_status(item)
# if(force) {
# private$.queue <- unique(c(file, private$.queue))
# catgl("File [{file}] prepended to the task queue", level = "INFO")
# } else {
# private$.queue <- unique(c(private$.queue, file))
# catgl("File [{file}] appended to the task queue", level = "INFO")
# }
# } else if (item$Status == "queued") {
# if(!file %in% private$.queue) {
# private$.queue <- unique(c(private$.queue, file))
# }
# }
# }, error = function(e) {
# catgl("Cannot add file [{file}] to queue. Reason: ",
# e$message, level = "ERROR")
# })
#
#
# return(length(private$.queue))
#
# },
#
# check_job_status = function() {
# jobs <- private$.jobs
# nms <- names(jobs)
# lapply(nms, function(file) {
# check <- jobs[[file]]
# if(!is.function(check)) {
# private$.queue <- private$.queue[!private$.queue %in% file]
# jobs[["@remove"]](file)
# catgl("Cannot find process handler of file [{file}]. Removed from queue", level = "WARNING")
# return()
# }
# code <- check()
# if(code == 0) {
# item <- private$.get_status(file)
# item$Status <- "finished"
# item$Details <- ""
# item$TaskEnded <- as.POSIXlt(Sys.time())
# private$.set_status(item)
# # remove from queue
# private$.queue <- private$.queue[!private$.queue %in% file]
# jobs[["@remove"]](file)
# catgl("File [{file}] finished. Removed from queue", level = "INFO")
# return()
# }
# if(code < 0) {
# item <- private$.get_status(file)
# item$Status <- "errored"
# item$Details <- paste(attr(code, "rs_exec_error"), collapse = "")
# item$TaskEnded <- as.POSIXlt(Sys.time())
# private$.set_status(item)
# # remove from queue
# private$.queue <- private$.queue[!private$.queue %in% file]
# jobs[["@remove"]](file)
# catgl("File [{file}] errored (reason: {item$Details}). Removed from queue", level = "ERROR")
# return()
# }
# })
# length(jobs)
# },
#
# get_pipeline_default_settings = function() {
# re <- lapply(private$pipeline_names, function(pname) {
# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
# settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())
# settings[["@remove"]](c(
# "import_setup__project_name",
# "import_setup__subject_code",
# "force_import",
# "skip_validation",
# "import_channels__sample_rate",
# "import_channels__electrodes",
# "import_channels__electrode_file",
# "import_blocks__format",
# "import_blocks__session_block",
# "project_name",
# "subject_code",
# "electrode_group",
# "changes"
# ))
# if(pname == "notch_filter") {
# settings$diagnostic_plot_params$path <- NULL
# }
# as.list(settings, sorted = TRUE)
# })
# names(re) <- private$pipeline_names
# re
# },
#
# create_settings_file = function(overwrite = FALSE) {
# settings_path <- file.path(self$log_path, "settings.yaml")
# if(!overwrite && file.exists(settings_path)) {
# stop("Existing settings file already created. If you want to overwrite that file, use `overwrite=TRUE`")
# }
# backup_file(settings_path, remove = FALSE)
# save_yaml(self$get_pipeline_default_settings(), file = settings_path)
# catgl("A settings file has been created at [{settings_path}]", level = "INFO")
# settings_path
# },
#
# get_pipeline_settings = function(pname, file, brfile) {
# item <- private$.get_status(file)
#
# # load blackrock file
# electrode_table <- brfile$electrode_table
# electrodes <- electrode_table$Electrode[electrode_table$NSType == self$NSType_LFP]
#
# # load pipeline
# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
# settings <- dipsaus::list_to_fastmap2(pipeline$get_settings())
#
# # override user-defined settings
# settings_path <- file.path(self$log_path, "settings.yaml")
# if(file.exists(settings_path)) {
# tmp <- load_yaml(settings_path)
# dipsaus::list_to_fastmap2(as.list(tmp[[pname]]), settings)
# }
#
# # pipeline-specific settings
# subject_code <- sprintf("%s__%s", item$Subject, item$Block)
#
# switch (
# pname,
# "import_lfp_native" = {
# settings$import_setup__project_name <- self$project_name
# settings$import_setup__subject_code <- subject_code
# settings$force_import <- TRUE
# settings$skip_validation <- FALSE
# srate <- brfile$sample_rates[[self$NSType_LFP]]
# settings$import_channels__sample_rate <- srate
# settings$import_channels__electrodes <- dipsaus::deparse_svec(electrodes)
# settings$import_channels__electrode_file <- "auto"
# settings$import_blocks__format <- names(IMPORT_FORMATS)[unlist(IMPORT_FORMATS) == 'native_blackrock']
# settings$import_blocks__session_block <- item$Block
# },
# "notch_filter" = {
# graph_path <- file.path(private$.raw_path, item$Subject, item$Block, "notch-diagnostic-plots")
# settings$project_name <- self$project_name
# settings$subject_code <- subject_code
#
# graph_path <- dir_create2(graph_path)
# settings$diagnostic_plot_params$path <- file.path(graph_path, "notch-diagnostic-plots.pdf")
# },
# "wavelet_module" = {
# settings$project_name <- self$project_name
# settings$subject_code <- subject_code
# },
# "reference_module" = {
# settings$project_name <- self$project_name
# settings$subject_code <- subject_code
# settings$reference_name <- "[new reference]"
# },
# {}
# )
#
# return(settings)
#
# },
#
# process_file = function(file) {
#
# item <- private$.get_status(file)
# brfile <- BlackrockFile$new(path = file.path(private$.watch_path, file), block = item$Block)
#
# # prepare working directory
# workdir <- file.path(self$cache_path, paste0(file, ".workdir"))
# if(dir.exists(workdir)) {
# unlink(workdir, force = TRUE, recursive = TRUE)
# }
# workdir <- dir_create2(workdir)
#
#
# # copy pipelines
# for(pname in private$pipeline_names) {
# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines"))
# dest <- file.path(workdir, "pipelines", pipeline$pipeline_name)
# pipeline_fork(
# src = pipeline$pipeline_path,
# dest = dest,
# activate = FALSE
# )
# pipeline <- pipeline(pname, paths = file.path(workdir, "pipelines"))
# settings <- self$get_pipeline_settings(pname = pname, file = file, brfile = brfile)
# pipeline$set_settings(.list = settings)
# catgl("Set pipeline [{pname}]:", level = "DEFAULT")
# save_yaml(settings, file = stdout())
# }
#
# # copy files
# block_path <- file.path(private$.raw_path, item$Subject, item$Block)
# dir_create2(block_path)
# fs <- paste0(brfile$base_path, c(".nev", ".ccf", paste0(".ns", 1:9)))
# fs <- fs[file.exists(fs)]
# for(f in fs) {
# file.copy(f, file.path(block_path, basename(f)), overwrite = TRUE, recursive = FALSE, copy.date = TRUE)
# }
#
# fake_path <- file.path(private$.raw_path, sprintf("%s__%s", item$Subject, item$Block))
# fake_path <- dir_create2(fake_path)
# if(!file.exists(file.path(fake_path, item$Block))) {
# if(dipsaus::get_os() == "windows") {
# file.copy(block_path, to = fake_path, recursive = TRUE, copy.date = TRUE)
# } else {
# file.symlink(block_path, to = fake_path)
# }
# }
#
# # Make sure the subject surface files can be loaded properly?
# if(!file.exists(file.path(fake_path, 'rave-imaging'))) {
# # check if original subject has the fs recon
# imaging_path_orig <- file.path(private$.raw_path, item$Subject, 'rave-imaging')
# if(file.exists(imaging_path_orig)) {
# if(dipsaus::get_os() == "windows" || !dir.exists(imaging_path_orig)) {
# # On windows, symlink does not work well so just copy
# # On Unix, if rave-imaging is a symlink, then R (4.0) will treat
# # the path as a file. Just copy over
# file.copy(imaging_path_orig, to = fake_path,
# recursive = TRUE, copy.date = TRUE)
# } else {
# file.symlink(imaging_path_orig, to = fake_path)
# }
# }
# }
#
# # set to running
# catgl("Start processing [{file}]", level = "INFO")
# item$Status <- "running"
# item$Details <- ""
# item$TaskStarted <- as.POSIXlt(Sys.time())
# item$TaskEnded <- as.POSIXlt(NA)
# private$.set_status(item)
# private$.jobs[[file]] <- dipsaus::rs_exec(
# name = file,
# focus_on_console = TRUE,
# rs = TRUE,
# wait = FALSE,
# quoted = TRUE,
# nested_ok = TRUE,
# expr = bquote({
#
# workdir <- .(workdir)
# cwd <- getwd()
#
# setwd(workdir)
# on.exit({ setwd(cwd) }, add = TRUE, after = FALSE)
# raveio <- asNamespace('raveio')
#
# if(dipsaus::package_installed('ravedash')){
# ravedash <- do.call('asNamespace', list('ravedash'))
# ravedash$set_logger_path(root_path = .(self$log_path), max_files = 10L)
# ravedash$logger_threshold("trace", type = 'file')
# ravedash$logger_threshold("trace", type = 'console')
# } else {
# ravedash <- NULL
# }
# blackrock_src <- .(file)
#
# pname <- "import_lfp_native"
# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
# pipeline$run(async = FALSE, as_promise = FALSE,
# scheduler = "none", type = "smart")
# raveio$catgl("[{blackrock_src}]: [{pname}] finished", level = "INFO")
# pname <- "notch_filter"
# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
# pipeline$run(names = "apply_notch", async = FALSE, as_promise = FALSE,
# scheduler = "none", type = "smart")
#
# pname <- "wavelet_module"
# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO")
# pipeline$run(async = FALSE, as_promise = FALSE,
# scheduler = "none", type = "smart")
#
# subject <- pipeline$read("subject")
#
# # generate reference if exists
# pname <- "reference_module"
# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines"))
# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] (reference_table_initial) at [{pipeline$pipeline_path}]", level = "INFO")
# # check subject's localization
# elec_path <- .(file.path(private$.raw_path, item$Subject, "rave-imaging", "electrodes.csv"))
# if(!file.exists(elec_path)) {
# elec_path <- .(file.path(raveio_getopt("data_dir"), private$.project_name, item$Subject, "rave", "meta", "electrodes.csv"))
# }
#
# if(!file.exists(elec_path)) {
# # list all projects, try to find
# all_projects <- raveio$get_projects(refresh = TRUE)
# elec_path <- file.path(raveio_getopt("data_dir"), all_projects,
# .(item$Subject), "rave", "meta", "electrodes.csv")
# elec_path <- elec_path[!is.na(elec_path) & file.exists(elec_path)]
# }
# if(length(elec_path)) {
# elec_path <- elec_path[[1]]
# }
#
# if(length(elec_path) == 1 && !is.na(elec_path) && file.exists(elec_path)) {
# tryCatch({
# elec_path <- elec_path[[1]]
# elec_table <- utils::read.csv(elec_path)
# elec_table$Electrode <- as.integer(elec_table$Electrode)
# if(length(elec_table$Electrode) == length(subject$electrodes)) {
# o <- order(elec_table$Electrode)
# elec_table <- elec_table[o, ]
# elec_table$Electrode <- sort(subject$electrodes)
# }
#
# raveio$safe_write_csv(elec_table, file.path(subject$meta_path, "electrodes.csv"),
# row.names = FALSE)
#
# }, error = function(e) {
# if(is.environment(ravedash)) {
# ravedash$logger_error_condition(e, level = "warning")
# } else {
# warning(e)
# }
# })
# }
# pipeline$run(names = "reference_table_initial",
# async = FALSE, as_promise = FALSE,
# scheduler = "none", type = "smart")
# unsaved_meta <- file.path(subject$meta_path, "reference__unsaved.csv")
# target_meta <- file.path(subject$meta_path, "reference_auto_generated.csv")
# if(file.exists(unsaved_meta) && !file.exists(target_meta)) {
# file.copy(unsaved_meta, target_meta, overwrite = TRUE)
# }
#
# # make subject backward-compatible
# raveio$catgl("[{blackrock_src}]: Making data format backward-compatible", level = "INFO")
# raveio$rave_subject_format_conversion(subject$subject_id)
# raveio$catgl("[{blackrock_src}]: Done", evel = "INFO")
#
# })
# )
#
# },
#
# scan = function() {
#
# files <- self$check_file_registry()
# self$add_to_queue(files)
#
# # check job status
# njobs <- self$check_job_status()
# inactives <- private$.queue[!private$.queue %in% names(private$.jobs)]
#
# if(length(inactives) && njobs < private$.max_jobs) {
# # schedule job
# navails <- private$.max_jobs - njobs
# inactives <- inactives[seq_len(min(navails, length(inactives)))]
# for(file in inactives) {
# self$process_file(file)
# }
# }
#
# },
#
# reset_registry = function() {
# if(!interactive()) {
# stop("Cannot reset registry in non-interactive mode")
# }
# ans <- dipsaus::ask_yesno(sprintf("Clearing registry for [%s]?", private$.job_name))
# if(isTRUE(ans)) {
# unlink(self$registry_path, force = TRUE)
# }
# },
#
# watch = function(interval = 5) {
# interval <- as.numeric(interval)
# if(!isTRUE(interval >= 1)) {
# stop("Min interval must be >= 1 seconds")
# }
#
# if(dipsaus::package_installed('ravedash')){
# ravedash <- do.call('asNamespace', list('ravedash'))
# ravedash$set_logger_path(root_path = self$log_path, max_files = 10L)
# ravedash$logger_threshold("trace", type = 'file')
# ravedash$logger_threshold("trace", type = 'console')
# } else {
# ravedash <- NULL
# }
#
# on.exit({
# if(is.environment(ravedash)) {
# ravedash$set_logger_path(NULL)
# }
# }, add = TRUE, after = TRUE)
#
# # make sure directories are there
# dir_create2(self$log_path)
# dir_create2(self$cache_path)
#
# while(TRUE) {
# tryCatch({
# self$scan()
# }, error = function(e) {
# catgl("Error raised while the master process rescans/schedules tasks. Reason: {paste(e$message, collapse = '\n')}\nWill try again later",
# level = "ERROR")
# })
# Sys.sleep(interval)
# }
# }
#
# ),
# active = list(
#
# log_path = function() {
# file.path(private$.raw_path, "_automation", private$.job_name)
# },
#
# registry_path = function() {
# file.path(self$log_path, "registry.csv")
# },
#
# time_threshold = function(v) {
# if(!missing(v)) {
#
# if(length(v) != 1 || is.na(v)) {
# stop("Cannot set time threshold with invalid time")
# }
# tm <- v
# if(is.character(tm)) {
# v <- as.POSIXlt(tm)
# if(is.na(v)) {
# stop("`time_threshold` must have format of [year-month-day hour:minute:second]. For example, '2022-08-03 16:38:00'")
# }
# } else if (!inherits(tm, "POSIXlt")) {
# stop("`time_threshold` must be characters or a `POSIXlt` time object")
# }
#
# private$.time_threshold <- v
#
# }
# private$.time_threshold
# },
#
# project_name = function(v) {
# if(!missing(v)) {
# if(!length(v)) {
# private$.project_name <- character(0)
# } else if(length(v) > 1) {
# stop("Project name must have length of 1")
# } else if(!grepl("^[a-zA-Z0-9_]", v)) {
# stop("Project name can only contain letters, digits, and underscore [_]")
# } else {
# private$.project_name <- as.character(v)
# }
# }
# pn <- private$.project_name
# if(!length(pn)) {
# pn <- "automated"
# }
# pn
# },
#
# file_pattern = function(v) {
# if(!missing(v)) {
# v <- v[[1]]
# m <- gregexpr("(\\([^\\(\\)]+\\))", v, ignore.case = TRUE)
# m <- unique(m[[1]])
# if(length(m) < 2) {
# stop("File pattern must be a regular expression containing at least two keyword extractors so I can decide the subject code and session block ID. For example, regular expression ['^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$'] matches [YAB_datafile_001.nev]. RAVE will set subject code to [YAB], and block ID as [001].")
# }
# private$.file_pattern <- v
# }
# private$.file_pattern
# },
#
# queued = function() {
# private$.queue
# },
#
# max_jobs = function(v) {
# if(!missing(v)) {
# errored <- TRUE
# if(length(v) == 1) {
# v <- as.integer(v)
# if(isTRUE(v > 0)) {
# private$.max_jobs <- v
# errored <- FALSE
# }
# }
#
# if(errored) {
# stop("Cannot set `max_jobs`, the value must be a positive integer")
# }
#
# }
# private$.max_jobs
# }
#
# )
# )
#
#' Monitors 'BlackRock' output folder and automatically import data into 'RAVE'
#' @description Automatically import 'BlackRock' files from designated folder
#' and perform 'Notch' filters, 'Wavelet' transform; also generate epoch,
#' reference files.
#' @param watch_path the folder to watch
#' @param project_name the project name to generate
#' @param task_name the watcher's name
#' @param scan_interval scan the directory every \code{scan_interval} seconds,
#' cannot be lower than 1
#' @param time_threshold time-threshold of files: all files with modified
#' time prior to this threshold will be ignored; default is current time
#' @param max_jobs maximum concurrent imports, default is 1
#' @param as_job whether to run in 'RStudio' background job or to block the
#' session when monitoring; default is auto-detected
#' @param dry_run whether to dry-run the code (instead of executing the
#' scripts, return the watcher's instance and open the settings file);
#' default is false
#' @param config_open whether to open the pipeline configuration file; default
#' is equal to \code{dry_run}
#' @returns When \code{dry_run} is true, then the watcher's instance will be
#' returned; otherwise nothing will be returned.
#' @noRd
NULL
# auto_process_blackrock <- function(
# watch_path, project_name = "automated", task_name = "RAVEWatchDog",
# scan_interval = 10, time_threshold = Sys.time(), max_jobs = 1L,
# as_job = NA, dry_run = FALSE, config_open = dry_run
# ) {
#
# time_threshold <- as.POSIXlt(time_threshold)
# time_threshold <- time_threshold[!is.na(time_threshold)]
# if(!length(time_threshold)) {
# time_threshold <- as.POSIXlt(Sys.time())
# } else {
# time_threshold <- time_threshold[[1]]
# }
#
# if(!isFALSE(as_job)) {
# as_job <- dipsaus::rs_avail()
# }
#
# fun <- dipsaus::new_function2(body = bquote({
#
# raveio <- asNamespace('raveio')
# watcher <- raveio$RAVEWatchDog$new(
# watch_path = .(watch_path),
# job_name = .(task_name)
# )
# watcher$time_threshold <- .(time_threshold)
# watcher$max_jobs <- .(max_jobs)
# watcher$project_name <- .(project_name)
#
# settings_path <- file.path(watcher$log_path, "settings.yaml")
# if(!file.exists(settings_path)) {
# watcher$create_settings_file()
# }
#
# return(watcher)
#
# }), quote_type = "quote")
#
# watcher <- fun()
# if( config_open ) {
#
# settings_path <- file.path(watcher$log_path, "settings.yaml")
# if(!file.exists(settings_path)) {
# watcher$create_settings_file()
# }
#
# try({
# dipsaus::rs_edit_file(settings_path)
# catgl("Watcher's settings file has been opened. Please check the settings, and edit if necessary. All auto-discovered BlackRock files will be preprocessed using this settings file.", level = "INFO")
# }, silent = TRUE)
#
# }
#
# if( dry_run ) {
# return(watcher)
# }
#
#
#
# if(as_job) {
# dipsaus::rs_exec(
# bquote({
# fun <- dipsaus::new_function2(body = .(body(fun)))
# # return(fun)
# watcher <- fun()
# watcher$watch(interval = .(scan_interval))
# }),
# quoted = TRUE,
# rs = TRUE,
# name = task_name,
# focus_on_console = TRUE
# )
# } else {
# watcher$watch(interval = scan_interval)
# }
#
# }
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.