
Defines functions pipeline_run

Documented in pipeline_run

#' @rdname rave-pipeline
#' @export
pipeline_run <- function(
  pipe_dir = Sys.getenv("RAVE_PIPELINE", "."),
  scheduler = c("none", "future", "clustermq"),
  type = c("smart", "callr", "vanilla"),
  envir = new.env(parent = globalenv()),
  callr_function = NULL,
  names = NULL,
  async = FALSE,
  check_interval = 0.5,
  progress_quiet = !async,
  progress_max = NA,
  progress_title = "Running pipeline",
  return_values = TRUE,

  pipe_dir <- activate_pipeline(pipe_dir)

  type <- match.arg(type)
  scheduler <- match.arg(scheduler)
  callr_function <- substitute(callr_function)

  clustermq_scheduler <- getOption('clustermq.scheduler', NA)
  if(scheduler == "clustermq"){
    # if(!identical(clustermq_scheduler, "LOCAL")){
    #   callr_function <- NULL
    # }
    callr_function <- NULL

  if(type == "vanilla"){
    callr_function <- NULL
  } else if (type == "callr") {
    callr_function <- quote(callr::r)

  args <- list(
    names = names,
    envir = envir,
    callr_function = NULL,

  subprocess <- TRUE
  fun <- function(subprocess = TRUE){}
  environment(fun) <- globalenv()
  body(fun) <- bquote({
    ns <- asNamespace('raveio')
    callr_function <- eval(.(callr_function))
    args <- .(args)
      args$callr_function <- callr_function
    clustermq_scheduler <- .(clustermq_scheduler)

    all_names <- ns$pipeline_target_names(pipe_dir = .(pipe_dir))
    if(length(args$names)) {
        stop("pipeline_run: `names` must be NULL or characters")
      missing_names <- args$names[!args$names %in% all_names]
      if(length(missing_names)) {
        stop("pipeline_run: the following `names` cannot be found: ", paste(missing_names, collapse = ", "))
    } else {
      args$names <- NULL

    if(subprocess) {
        "future.fork.enable" = FALSE,
        "dipsaus.no.fork" = TRUE,
        "dipsaus.cluster.backup" = "multisession"

    # Load shared functions into envir
    shared_libs <- list.files(file.path(.(pipe_dir), "R"), pattern = "^shared-.*\\.R",
                              full.names = TRUE, ignore.case = TRUE)
    lapply(sort(shared_libs), function(f) {
      source(file = f, local = args$envir, chdir = TRUE)

    # if(dir.exists(file.path(.(pipe_dir), "py"))) {
    #   pipeline_py_module(pipe_dir = .(pipe_dir),
    #                      convert = FALSE)
    # }

    if(.(type) == "smart"){
      local <- ns$with_future_parallel
    make <- function(fun, use_local = TRUE) {
          expr = {
            if( use_local ) {
              local({ do.call(fun, args) })
            } else {
              do.call(fun, args)
          `tar_condition_file` = function(e) {
            # destroy and try again, and throw all other errors
            targets::tar_destroy(ask = FALSE, destroy = "meta")
            if( use_local ) {
              local({ do.call(fun, args) })
            } else {
              do.call(fun, args)
          error = function( e ) {

            if(inherits(e, "tar_condition_run")) {
              # remove ANSI code
              msg <- trimws(dipsaus::ansi_strip(e$message), which = "left")

              if(startsWith(msg, "Error running targets::tar_make")) {
                msg <- gsub("^Error running targets::tar_make.*help\\.html[\n \t]{0,}Last error:[\n \t]{0,1}", "", msg)
                e$message <- msg


      warn_table <- as.data.frame(targets::tar_meta(fields = warnings, complete_only = TRUE))
      if( nrow(warn_table) ) {
        for(ii in seq_len(nrow(warn_table))) {
          msg <- sprintf("Caveat in target [%s]: %s", warn_table$name[[ii]], warn_table$warnings[[ii]])
          warning(msg, call. = FALSE)

    if("none" == .(scheduler)){
      make( targets::tar_make )
    } else if("future" == .(scheduler)){
      args$workers <- ns$raveio_getopt("max_worker", default = 1L)
      make( targets::tar_make_future )
      # local({ do.call(targets::tar_make_future, args) })
    } else {

      if(is.na(clustermq_scheduler)) {
        clustermq_scheduler <- "multiprocess"
      options('clustermq.scheduler' = clustermq_scheduler)
      if(identical(clustermq_scheduler, "LOCAL")){
        make( targets::tar_make_clustermq )
        # local({ do.call(targets::tar_make_clustermq, args) })
      } else {
        args$workers <- ns$raveio_getopt("max_worker", default = 1L)
        # do.call(targets::tar_make_clustermq, args)
        make( targets::tar_make_clustermq, use_local = FALSE )



  res <- PipelineResult$new(path = pipe_dir, verbose = TRUE)
  res$check_interval <- check_interval
  res$names <- names

  if(!progress_quiet && is.na(progress_max)){
      progress_max <- length(names)
    } else {
      progress_max <- length(pipeline_target_names(pipe_dir = pipe_dir))
  res$progressor <- dipsaus::progress2(
    progress_title, max = progress_max, shiny_auto_close = !async,
    quiet = progress_quiet


      async = TRUE,
      expr = {
        callr::r_bg(func = fun, args = list(subprocess = TRUE),
                    package = FALSE, poll_connection = TRUE,
                    supervise = TRUE, error = "error")
  } else {
      async = FALSE,
      expr = {
        fun(subprocess = FALSE)


#' @rdname rave-pipeline
#' @export
pipeline_clean <- function(
    pipe_dir = Sys.getenv("RAVE_PIPELINE", "."),
    destroy = c("all", "cloud", "local", "meta", "process",
                "progress", "objects", "scratch", "workspaces"),
    ask = FALSE
    ) {
  destroy <- match.arg(destroy)
  pipe_dir <- activate_pipeline(pipe_dir)
  targets::tar_destroy(ask = ask, destroy = destroy)

#' @rdname rave-pipeline
#' @export
pipeline_run_bare <- function(
  pipe_dir = Sys.getenv("RAVE_PIPELINE", "."),
  scheduler = c("none", "future", "clustermq"),
  type = c("smart", "callr", "vanilla"),
  envir = new.env(parent = globalenv()),
  callr_function = NULL,
  names = NULL, return_values = TRUE,
  ...) {
  pipe_dir <- activate_pipeline(pipe_dir)

  type <- match.arg(type)
  scheduler <- match.arg(scheduler)
  callr_function <- substitute(callr_function)

  clustermq_scheduler <- getOption('clustermq.scheduler', NA)
  if(scheduler == "clustermq"){
    # if(!identical(clustermq_scheduler, "LOCAL")){
    #   callr_function <- NULL
    # }
    callr_function <- NULL

  if(type == "vanilla"){
    callr_function <- NULL
  } else if (type == "callr") {
    callr_function <- quote(callr::r)

  all_names <- pipeline_target_names(pipe_dir = pipe_dir)

  if(length(names)) {
      stop("pipeline_run_bare: `names` must be NULL or characters")
    missing_names <- names[!names %in% all_names]
    if(length(missing_names)) {
      stop("pipeline_run_bare: the following `names` cannot be found: ", paste(missing_names, collapse = ", "))
  } else {
    names <- NULL

  if(type == "smart") {
    local <- with_future_parallel

  args <- list(
    names = names,
    envir = envir,
    callr_function = callr_function,

  # Load shared functions into envir
  shared_libs <- list.files(file.path(pipe_dir, "R"), pattern = "^shared-.*\\.R",
                            full.names = TRUE, ignore.case = TRUE)
  lapply(sort(shared_libs), function(f) {
    source(file = f, local = args$envir, chdir = TRUE)

  # Python modules loaded from here will be null pointers in targets
  # if(dir.exists(file.path(pipe_dir, "py"))) {
  #   pipeline_py_module(pipe_dir = pipe_dir,
  #                      convert = FALSE)
  # }

  make <- function(fun, use_local = TRUE) {
        expr = {
          if( use_local ) {
            local({ do.call(fun, args) })
          } else {
            do.call(fun, args)
        `tar_condition_file` = function(e) {
          # destroy and try again, and throw all other errors
          targets::tar_destroy(ask = FALSE, destroy = "meta")
          if( use_local ) {
            local({ do.call(fun, args) })
          } else {
            do.call(fun, args)
        error = function( e ) {

          if(inherits(e, "tar_condition_run")) {
            # remove ANSI code
            msg <- trimws(dipsaus::ansi_strip(e$message), which = "left")

            if(startsWith(msg, "Error running targets::tar_make")) {
              msg <- gsub("^Error running targets::tar_make.*help\\.html[\n \t]{0,}Last error:[\n \t]{0,1}", "", msg)
              e$message <- msg

    warn_table <- as.data.frame(targets::tar_meta(fields = warnings, complete_only = TRUE))
    if( nrow(warn_table) ) {
      for(ii in seq_len(nrow(warn_table))) {
        msg <- sprintf("Caveat in target [%s]: %s", warn_table$name[[ii]], warn_table$warnings[[ii]])
        warning(msg, call. = FALSE)

  switch (
    "none" = {
      make( targets::tar_make )
    "future" = {
      args$workers <- raveio_getopt("max_worker", default = 1L)
      make( targets::tar_make_future )
      if(is.na(clustermq_scheduler)) {
        clustermq_scheduler <- "multiprocess"
      options('clustermq.scheduler' = clustermq_scheduler)
      if(identical(clustermq_scheduler, "LOCAL")){
        make( targets::tar_make_clustermq )
        # local({ do.call(targets::tar_make_clustermq, args) })
      } else {
        args$workers <- raveio_getopt("max_worker", default = 1L)
        # do.call(targets::tar_make_clustermq, args)
        make( targets::tar_make_clustermq, use_local = FALSE )

  # Read in names
  if(!length(names)) {
    names <- pipeline_target_names(pipe_dir = pipe_dir)

  if( return_values ) {
    return(pipeline_read(var_names = names, pipe_dir = pipe_dir))
  } else {


Try the raveio package in your browser

Any scripts or data that you put into this service are public.

raveio documentation built on July 26, 2023, 5:29 p.m.