R/ravana_mapreduce.R

Defines functions run_worker evtFinally evtError evtInterrupted task_loop execute_task ravana_map share_object share_function delete_objects delete_functions

Documented in delete_functions delete_objects ravana_map run_worker share_function share_object

#' delete_functions
#' 
#' Delete all shared functions
#' @export
delete_functions <- function(){
  
  if (!exists("Ravana", where = .GlobalEnv)) stop("Global variable [Ravana] does not exist!")
  
  Ravana$sharedfunctions  <<- list()
  
  X <- paste(deparse(Ravana$sharedfunctions), collapse = "\n")
  
  sql <- "UPDATE clusters SET rfunctions=?p1 WHERE clustername=?p2"
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = X, p2 = Ravana$clustername)
  
  DBI::dbExecute(Ravana$connection, SQL)
  message(sprintf("All functions deleted from the Cluster [%s]", Ravana$clustername))  
}



#' delete_objects
#' 
#' Delete all shared objects
#' @export
delete_objects <- function(){
  
  if (!exists("Ravana", where = .GlobalEnv)) stop("Global variable [Ravana] does not exist!")
  
  Ravana$sharedobjects  <<- list()
  
  X <- paste(deparse(Ravana$sharedobjects), collapse = "\n")
  
  sql <- "UPDATE clusters SET robjects=?p1 WHERE clustername=?p2"
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = X, p2 = Ravana$clustername)
  
  DBI::dbExecute(Ravana$connection, SQL)
  message(sprintf("All objects deleted from the Cluster [%s]", Ravana$clustername))  
}

#' share_function
#' 
#' Shares an R function in the cluster
#' @param rfunction R function 
#' @export
share_function <- function(rfunction){
  
  if (!exists("Ravana", where = .GlobalEnv)) stop("Global variable [Ravana] does not exist!")
  
  rfunctionname <-deparse(substitute(rfunction))
  
  message("R function:", rfunctionname)

  Ravana$sharedfunctions[[rfunctionname]]  <<- rfunction  
  
  X <- paste(deparse(Ravana$sharedfunctions), collapse = "\n")

  sql <- "UPDATE clusters SET rfunctions=?p1 WHERE clustername=?p2"
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = X, p2 = Ravana$clustername)
  
  DBI::dbExecute(Ravana$connection, SQL)
  message(sprintf("Function [%s] shared to Cluster [%s]", rfunctionname, Ravana$clustername))
}


#' share_object
#' 
#' Shares an R object (vector, list, matrix, data frame etc.) in the cluster
#' @param robject R object 
#' @export
share_object <- function(robject){
  
  if (!exists("Ravana", where = .GlobalEnv)) stop("Global variable [Ravana] does not exist!")
  
  robjectname <-deparse(substitute(robject))
  Ravana$sharedobjects[[robjectname]]  <<- robject  
  
  X <- paste(deparse(Ravana$sharedobjects), collapse = "\n")
  
  sql <- "UPDATE clusters SET robjects=?p1 WHERE clustername=?p2"
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = X, p2 = Ravana$clustername)
  
  DBI::dbExecute(Ravana$connection, SQL)
  message(sprintf("%s Object [%s] shared to Cluster [%s]", stringr::str_to_title(class(robject)), robjectname, Ravana$clustername))
}




#' ravana_map
#' 
#' Maps a task expressed as an R function to each element of list.
#' @param rfunction R function
#' @param mappeddata 
#' @return Returns the taskid. 
#' @export

ravana_map <- function(rfunction, datatomap){
  if (!exists("Ravana", where = .GlobalEnv)) stop("Global variable [Ravana] does not exist!")  
  
  rfunctionname <-deparse(substitute(rfunction))
  if (!(rfunctionname %in% names(Ravana$sharedfunctions))) stop(sprintf("Function %s is can't be found!",rfunctionname))  

  options(digits.secs = 6)
  options(scipen=6)
  
  taskid <- as.numeric(Sys.time())*1000000
  
  if (class(datatomap)=="data.frame") {
    mapdata <- split(datatomap, seq(nrow(datatomap)))
  } else {
    mapdata <- as.list(datatomap)
  }

  datalen = length(mapdata)
  target <- 0.05
  progress <- 0
  tick <- 0
  cat(sprintf("Mapping %s Tasks                Progress [", datalen))
  for (i in 1:datalen) {
    clustername      <- Ravana$clustername
    createdby        <- paste0(Sys.info()["login"], "@", Sys.info()["nodename"])
    taskseq          <- i
    taskuid          <- uuid::UUIDgenerate()
    mappedrfunction  <- rfunctionname
    mappedparameters <- sub("\n", " ",    deparse1(mapdata[[i]], width.cutoff = 500L))
    mappedparameters <- sub("\t", " ",    mappedparameters)
    mappedparameters <- sub("\r", " ",    mappedparameters)
    mappedparameters <- gsub("\\s+", " ", mappedparameters)    

    sql <- "INSERT INTO mappedtasks(taskid, taskseq, taskuid, clustername, mappedrfunction, mappedparameters, createdby) VALUES (?p1, ?p2, ?p3, ?p4, ?p5, ?p6, ?p7)"
    SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql
                              , p1 = taskid
                              , p2 = taskseq
                              , p3 = taskuid
                              , p4 =clustername
                              , p5 = mappedrfunction
                              , p6 = mappedparameters
                              , p7 = createdby)
    DBI::dbExecute(Ravana$connection, SQL)        
    #if (i %% 10 == 0) cat(" ..", i)
    
    progress <- (i*1.0/datalen)
    if (progress >= target) {
      cat('.')
      if ((tick+1) %% 5 == 0) cat ("|")
      target = target + 0.05
      tick <- tick +1
    }
    
  }
  cat('.]\n')
  
  message(sprintf("\nMapping completed for [%s] with %d items..", mappedrfunction, datalen))
  return(taskid)
}


#' ravana_reduce
#' 
#' Reduces a task which has been already Mapped.
#' @param rfunction R function
#' @param taskid 
#' @return Returns the results in a list.
#' @export
ravana_reduce <- function (taskid){
  if (!exists("Ravana", where = .GlobalEnv)) stop("Can't [set_worker]. Global variable [Ravana] does not exist!")
  
  sql <- 'SELECT  AVG(progress) as progress FROM mappedtasks WHERE taskid=?p1'
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = taskid)
  
  progress <- 0
  target <- 0.05
  tick <- 0
  

  cat(sprintf("Reducing Task [%s]  Progress [", taskid))
  while(progress < 1){
    res      <- DBI::dbGetQuery(Ravana$connection, SQL)
    progress <- res$progress[1]
    if (progress >= target) {
      cat('.')
      if ((tick+1) %% 5 == 0) cat ("|")
      target = target + 0.05
      tick <- tick +1
      }
    Sys.sleep(0.2)
  }
  cat('.]\n')

  sql <- "UPDATE  mappedtasks SET status ='Closed' WHERE taskid=?p1"
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = taskid)
  res <- DBI::dbExecute(Ravana$connection, SQL)
  
  sql <- 'SELECT  mappedparameters, mappedresults FROM mappedtasks WHERE taskid=?p1'
  SQL <- DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = taskid)
  res <- DBI::dbGetQuery(Ravana$connection, SQL)
  
  return (res)
}



execute_task <- function() {
  if (!exists("Ravana", where = .GlobalEnv)) stop("Can't [set_worker]. Global variable [Ravana] does not exist!")
  sql = 'SELECT * FROM collect_task(?p1, ?p2)'  
  SQL = DBI::sqlInterpolate(DBI::ANSI(), sql, p1  = Ravana$clustername, p2= Sys.info()["nodename"])
  
  res  <- DBI::dbGetQuery(Ravana$connection, SQL)
  rows <- length(res[,1])
  
  if (rows>0 && !is.na(res$taskuid[1])){
    parameters <- eval(parse(text=res$mappedparameters[1]))

    result <- Ravana$sharedfunctions[res$mappedrfunction[1]][[1]](parameters)
    
    result  <- deparse1(result, collapse = '\n')
    #result <- sub("\n", " ",    deparse1(result, width.cutoff = 500L))
    #result <- sub("\t", " ",    result)
    #result <- sub("\r", " ",    result)
    #result <- gsub("\\s+", " ", result)    
    
    sql = "SELECT * FROM submit_task(?p1, ?p2)"
    SQL = DBI::sqlInterpolate(DBI::ANSI(), sql, p1 = res$taskuid[1], p2 = result)
    res  <- DBI::dbGetQuery(Ravana$connection, SQL)
    rows <- length(res[,1])
    message(sprintf("Task(%s, %s) for Function [%s] completed for [%s]", res$taskid[1], res$taskseq[1], res$mappedrfunction[1], Ravana$clustername))
  }
  heartbeat()
}

task_loop <- function(){
  while (T){
    execute_task()
    Sys.sleep(0.3)
    heartbeat
  }
}

evtInterrupted <- function(e){
  write_log("Interruption", "Worker terminated by user!")
  message("Ravana: Worker terminated by user!")
  Disconnect()
}

evtError   <- function(e){
  write_log("Error", e)
  message("[Error] ", e)
}

evtFinally <- function(cond){
  NOOP()
}

#' run_worker
#' 
#' Starts processing available tasks as an worker node.
#' 
#' @export
run_worker <- function(){
  tryCatch(task_loop()
           , interrupt = function(c){evtInterrupted(c)}
           , error     = function(c){evtError(c)}
           , finally   = function(c){evtFinally(c)})  
}
arupkamal/ravana documentation built on Dec. 19, 2021, 5:34 a.m.