R/StepBase.R

#' @importFrom digest digest
#' @importFrom methods new
#' @importFrom stats runif
#' @importFrom stats na.omit
#' @importFrom methods is
#' @importFrom utils write.table
#' @importFrom methods setClass
#' @importFrom parallel makeCluster
#' @importFrom parallel parLapply
#' @importFrom parallel stopCluster
#' @name Step-class
#' @rdname  Step-class
#' @title Methods for Step objects
#' @description
#' Users can call Step object operation methods below
#' to obtain information in objects.
#' @details
#' \code{Step} is a S4 class for generating Step S4 objects.
#' All Step objects generated by child classes inherit from Step.
#' To generate new Step objects,
#' a function wrapper with fixed arguments needs to be implemented.
#'  Users use this function to generate new Step functions rather
#'  than Step S4 class to generate objects.
#' @author Zheng Wei
#' @seealso

#' \code{\link{setGenome}}
#' \code{\link{setThreads}}


#' @param .Object \code{Step} object scalar.
#' Step object is returned by functions in each step.
#' @param x \code{Step} object scalar.
#' Step object is returned by functions in each step.
#' @param name \code{Character} scalar.
#' Name can be one of inputList, outputList, paramList, allList, propList or
#' the item names of inputList, outputList or  paramList
#' @param prevSteps \code{List} list of Step objects
#' @param pipeName \code{Character} scalar.
#' The pipeline name that this step belongs to. Default: NULL. It will be replace by the only pipeline name.
#' @param type \code{Character} scalar.
#' Valid types of parameters including "input", "output" and "other"
#' @param item \code{Character} scalar.
#' The items in parameter list (input, output and other) or report list.
#' @param value any type scalar.
#' The value to be set for corresponding item in a list.
#' @param originPath \code{Character} scalar.
#' The file name for output file is based on this original path name.
#' @param regexSuffixName \code{Character} scalar. The suffix for replacement.
#' @param suffix \code{Character} scalar. The new suffix for the file.
#' @param msg \code{Character} scalar. The message to write into log file.
#' @param isWarnning \code{Logical} scalar. Set this message as warning message.
#' Default: FALSE
#' @param appendLog \code{Logical} scalar. Append to the log file.
#' Default: TRUE
#' @param showMsg \code{Logical} scalar. Show the message on screen.
#' Default: TRUE
#' @param attachedTypes \code{Logical} scalar.
#' Show the new type name or show the original type name
#' Default: TRUE
#' @param filename \code{Character} scaler.
#' The name of file under step working directiory
#' @param ... Additional arguments, currently unused.
#' @return the function and result of functions:
#'
#' @examples
#'
#'
#' library(BSgenome)
#' library(rtracklayer)
#' library(magrittr)
#'
#' # generate new Step : RandomRegionOnGenome
#' setClass(Class = "RandomRegionOnGenome",
#'          contains = "Step"
#' )
#'
#' setMethod(
#'     f = "init",
#'     signature = "RandomRegionOnGenome",
#'     definition = function(.Object,prevSteps = list(),...){
#'         # All arguments in function randomRegionOnGenome
#'         # will be passed from "..."
#'         # so get the arguments from "..." first.
#'         allparam <- list(...)
#'         sampleNumb <- allparam[["sampleNumb"]]
#'         regionLen <- allparam[["regionLen"]]
#'         genome <- allparam[["genome"]]
#'         outputBed <- allparam[["outputBed"]]
#'         # no previous steps for this step so ingnore the "prevSteps"
#'         # begin to set input parameters
#'         # no input for this step
#'         # begin to set output parameters
#'         if(is.null(outputBed)){
#'             output(.Object)$outputBed <-
#'                 getStepWorkDir(.Object,"random.bed")
#'         }else{
#'             output(.Object)$outputBed <- outputBed
#'         }
#'         # begin to set other parameters
#'         param(.Object)$regionLen <-  regionLen
#'         param(.Object)$sampleNumb <- sampleNumb
#'         if(is.null(genome)){
#'             param(.Object)$bsgenome <-  getBSgenome(getGenome())
#'         }else{
#'             param(.Object)$bsgenome <-  getBSgenome(genome)
#'         }
#'         # don't forget to return .Object
#'         .Object
#'     }
#' )
#'
#' setMethod(
#'     f = "processing",
#'     signature = "RandomRegionOnGenome",
#'     definition = function(.Object,...){
#'         # All arguments are set in .Object
#'         # so we can get them from .Object
#'         allparam <- list(...)
#'         sampleNumb <- getParam(.Object,"sampleNumb")
#'         regionLen <- getParam(.Object,"regionLen")
#'         bsgenome <- getParam(.Object,"bsgenome")
#'         outputBed <- getParam(.Object,"outputBed")
#'         # begin the calculation
#'         chrlens <-seqlengths(bsgenome)
#'         selchr <- grep("_|M",names(chrlens),invert=TRUE)
#'         chrlens <- chrlens[selchr]
#'         startchrlens <- chrlens - regionLen
#'         spchrs <- sample(x = names(startchrlens),
#'         size =  sampleNumb, replace = TRUE,
#'         prob = startchrlens / sum(startchrlens))
#'         gr <- GRanges()
#'         for(chr in names(startchrlens)){
#'             startpt <- sample(x = 1:startchrlens[chr],
#'             size = sum(spchrs == chr),replace = FALSE)
#'             gr <- c(gr,GRanges(seqnames = chr,
#'             ranges = IRanges(start = startpt, width = 1000)))
#'         }
#'         result <- sort(gr,ignore.strand=TRUE)
#'         rtracklayer::export.bed(object = result, con =  outputBed)
#'         # don't forget to return .Object
#'         .Object
#'     }
#' )
#'
#'
#'
#' setMethod(
#'     f = "genReport",
#'     signature = "RandomRegionOnGenome",
#'     definition = function(.Object, ...){
#'         .Object
#'     }
#' )
#'
#'
#'
#'
#'
#'
#' # This function is exported in NAMESPACE for user to use
#' randomRegionOnGenome <- function(sampleNumb, regionLen = 1000,
#'                                  genome = NULL, outputBed = NULL, ...){
#'     allpara <- c(list(Class = "RandomRegionOnGenome", prevSteps = list()),
#'                  as.list(environment()),list(...))
#'     step <- do.call(new,allpara)
#'     invisible(step)
#' }
#'
#'
#' # generate another new Step : OverlappedRandomRegion
#' setClass(Class = "OverlappedRandomRegion",
#'          contains = "Step"
#' )
#'
#' setMethod(
#'     f = "init",
#'     signature = "OverlappedRandomRegion",
#'     definition = function(.Object,prevSteps = list(),...){
#'         # All arguments in function overlappedRandomRegion and
#'         # runOerlappedRandomRegion will be passed from "..."
#'         # so get the arguments from "..." first.
#'         allparam <- list(...)
#'         inputBed <- allparam[["inputBed"]]
#'         randomBed <- allparam[["randomBed"]]
#'         outputBed <- allparam[["outputBed"]]
#'         # inputBed can obtain from previous step object when running
#'         # runOerlappedRandomRegion
#'         if(length(prevSteps)>0){
#'             prevStep <- prevSteps[[1]]
#'             input(.Object)$randomBed <-  getParam(prevStep,"outputBed")
#'         }
#'         # begin to set input parameters
#'         if(!is.null(inputBed)){
#'             input(.Object)$inputBed <- inputBed
#'         }
#'         if(!is.null(randomBed)){
#'             input(.Object)$randomBed <- randomBed
#'         }
#'         # begin to set output parameters
#'         # the output is recemended to set under the step work directory
#'         if(!is.null(outputBed)){
#'             output(.Object)$outputBed <-  outputBed
#'         }else{
#'             output(.Object)$outputBed <-
#'                 getAutoPath(.Object, getParam(.Object, "inputBed"),
#'                             "bed", suffix = "bed")
#'             # the path can also be generate in this way
#'             # ib <- getParam(.Object,"inputBed")
#'             # output(.Object)$outputBed <-
#'             #    file.path(getStepWorkDir(.Object),
#'             #    paste0(substring(ib,1,nchar(ib)-3), "bed"))
#'         }
#'         # begin to set other parameters
#'         # no other parameters
#'         # don't forget to return .Object
#'
#'
#'         .Object
#'     }
#' )
#' setMethod(
#'     f = "processing",
#'     signature = "OverlappedRandomRegion",
#'     definition = function(.Object,...){
#'         # All arguments are set in .Object
#'         # so we can get them from .Object
#'         allparam <- list(...)
#'         inputBed <- getParam(.Object,"inputBed")
#'         randomBed <- getParam(.Object,"randomBed")
#'         outputBed <- getParam(.Object,"outputBed")
#'
#'         # begin the calculation
#'         gr1 <- import.bed(con = inputBed)
#'         gr2 <- import.bed(con = randomBed)
#'         gr <- second(findOverlapPairs(gr1,gr2))
#'         export.bed(gr,con = outputBed)
#'         # don't forget to return .Object
#'         .Object
#'     }
#' )
#'
#'
#' setMethod(
#'     f = "genReport",
#'     signature = "OverlappedRandomRegion",
#'     definition = function(.Object, ...){
#'         .Object
#'     }
#' )
#'
#'
#'
#'
#' # This function is exported in NAMESPACE for user to use
#' overlappedRandomRegion <- function(inputBed, randomBed,
#'                                    outputBed = NULL, ...){
#'     allpara <- c(list(Class = "OverlappedRandomRegion",
#'         prevSteps = list()),as.list(environment()),list(...))
#'     step <- do.call(new,allpara)
#'     invisible(step)
#' }
#'
#' setGeneric("runOverlappedRandomRegion",
#'            function(prevStep,
#'                     inputBed,
#'                     randomBed = NULL,
#'                     outputBed = NULL,
#'                     ...) standardGeneric("runOverlappedRandomRegion"))
#'
#'
#'
#' setMethod(
#'     f = "runOverlappedRandomRegion",
#'     signature = "Step",
#'     definition = function(prevStep,
#'                           inputBed,
#'                           randomBed = NULL,
#'                           outputBed = NULL,
#'                           ...){
#'         allpara <- c(list(Class = "OverlappedRandomRegion",
#'             prevSteps = list(prevStep)),as.list(environment()),list(...))
#'         step <- do.call(new,allpara)
#'         invisible(step)
#'     }
#' )
#'
#' # add to graph
#' addEdges(edges = c("RandomRegionOnGenome","OverlappedRandomRegion"),
#'          argOrder = 1)
#' # begin to test pipeline
#' setGenome("hg19")
#' # generate test BED file
#' test_bed <- file.path(tempdir(),"test.bed")
#' library(rtracklayer)
#' export.bed(GRanges("chr7:1-127473000"),test_bed)
#'
#'
#' rd <- randomRegionOnGenome(10000)
#' overlap <- runOverlappedRandomRegion(rd, inputBed = test_bed)
#'
#' randombed <- getParam(rd,"outputBed")
#'
#' randombed
#'
#' overlap1 <-
#'     overlappedRandomRegion(inputBed = test_bed, randomBed = randombed)
#'
#' clearStepCache(overlap1)
#' overlap1 <-
#'     overlappedRandomRegion(inputBed = test_bed, randomBed = randombed)
#' clearStepCache(rd)
#' clearStepCache(overlap1)
#' rd <- randomRegionOnGenome(10000) %>%
#' runOverlappedRandomRegion(inputBed = test_bed)
#'
#' stepName(rd)
#' stepID(rd)
#'
#'
#' isReady(rd)

#' @export Step
Step <- setClass(Class = "Step",
                 slots = list(
                     argv = "list",
                     paramList = "list",
                     inputList = "list",
                     outputList = "list",
                     propList = "list",
                     reportList = "list",
                     stepName = "character",
                     stepBaseClass = "character",
                     finish = "logical",
                     timeStampStart="POSIXct",
                     timeStampEnd="POSIXct",
                     id = "integer",
                     pipeName = "character",
                     loaded = "logical",
                     isReportStep = "logical",
                     initParam = "list",
                     processingParam = "list"
                 ),
                 prototype = c(argv = list(),
                               paramList = list(),
                               inputList = list(),
                               outputList = list(),
                               propList = list(),
                               reportList = list(),
                               stepName = "Step",
                               stepBaseClass = "Step",
                               finish = FALSE,
                               timeStampStart=Sys.time(),
                               timeStampEnd=Sys.time(),
                               id = 0L,
                               pipeName = character(),
                               loaded = FALSE,
                               isReportStep = FALSE,
                               initParam = list(),
                               processingParam = list())
)
setMethod(f = "initialize",
          signature = "Step",
          definition = function(.Object,prevSteps = list(), stepDefName = NULL,
                                stepPipeName = NULL, isReportStep = FALSE, ...){



              argv <- c(as.list(environment()),list(...))

              .Object@stepBaseClass <- stepType(.Object, attachedTypes = FALSE)
              beforeInit <- !is.null(argv[["beforeInit"]])
              afterInit <- !is.null(argv[["afterInit"]])
              beforeProcessing <- !is.null(argv[["beforeProcessing"]])
              afterProcessing <- !is.null(argv[["afterProcessing"]])
              testGenReport <- !is.null(argv[["testGenReport"]])

              .Object@initParam <- list("test")
              .Object@processingParam <- list("test")

              argv[["beforeInit"]] <- NULL
              argv[["afterInit"]] <- NULL
              argv[["beforeProcessing"]] <- NULL
              argv[["afterProcessing"]] <- NULL
              argv[["testGenReport"]] <- NULL

              msgBoxBegin()

              stopifnot(is.logical(isReportStep))
              .Object@isReportStep <- isReportStep

              argv[["prevSteps"]] <- NULL

# check if it is an Step object
              stopifnot(is(prevSteps,"list"))
              lapply(prevSteps, function(obj){
                  if(!is.null(obj)){
                      stopifnot(inherits(obj,"Step"))
                  }
              })





# set pipeName
              inputPipeNameList <- NULL
              stopifnot(is.list(prevSteps))
              if(is.null(stepPipeName)){
                  if(length(prevSteps)>0){
                      pipeNameList <- lapply(prevSteps, function(prevStep){
                          if(!is.null(prevStep) && !isReady(prevStep)){
                              stop(paste(stepName(prevStep),
                                         "is not ready"))
                          }
                          else if(is.null(prevStep)){
                              #stop("previous step can not be NULL")
                              return(NULL)
                          }
                          return(pipeName(prevStep))
                      })
                      inputPipeNameList <- pipeNameList
                      .Object@pipeName <- sort(unique(unlist(pipeNameList)))
                  }else{
                      .Object@pipeName <- getPipeName()
                  }
              }else{
                  stopifnot(is.character(stepPipeName))
#                  tmp<-getPipeName()
                  setPipeName(stepPipeName)
                  .Object@pipeName <- stepPipeName
#                  setPipeName(tmp)
              }

              # initialize step name
              if(is.null(stepDefName)){
                  .Object@stepName <- paste0(paste0(pipeName(.Object),collapse = "_"),"_",stepType(.Object))
              }else{
                  .Object@stepName <- stepDefName
              }



              # use parameters from ... to overwrite the fix parameters
              argvother <- argv[startsWith(names(argv),
                                           paste0(stepName(.Object), "."))]
              rs <- lapply(names(argvother), function(a){
                  a0 <- substring(a,1 + nchar(paste0(stepName(.Object),".")))
                  if(sum(names(argv)==a0)>0){
                      return(argvother[[a]])
                  }else{
                      stop(paste(a," is not parameter of Step ", stepType(.Object),stepName(.Object)))
                  }
              })
              names(rs) <- names(argvother)
              sel <- setdiff(names(argv),names(argvother))
              argv <- c(argv[sel],rs)
              .Object@argv <- argv




# set propery pass from previous pipeline(only the pipeline name regist in .Object@pipeName)
              argSize <- length(prevSteps)
              if(length(.Object@propList)==0){
                  .Object@propList <- list()
                  .Object@propList[[pipeName(.Object)]] <- list()
              }
              if(argSize>0){
                  # lapply(seq_len(argSize), function(i){
                  #     if(!is.null(prevSteps[[i]]) && !isReady(prevSteps[[i]])){
                  #         stop(paste(stepName(prevSteps[[i]]),
                  #                    "is not ready"))
                  #     }
                  #     if(!is.null(prevSteps[[i]])){
                  #         lapply(seq_len(length(pipeName(.Object))), function(j){
                  #             lapply(seq_len(length(pipeName(prevSteps[[i]]))), function(k){
                  #                 if(pipeName(.Object)[j] == pipeName(prevSteps[[i]])[k]){
                  #                     property(.Object,pipeNameIdx = j) <- property(prevSteps[[i]], pipeNameIdx = k)
                  #                     message(property(.Object,pipeNameIdx = j))
                  #                 }
                  #             })
                  #             print(.Object@propList)
                  #         })
                  #     }
                  # })
                  for(i in seq_len(argSize)){
                      if(!is.null(prevSteps[[i]])){
                          for(j in seq_len(length(pipeName(.Object)))){
                              for(k in seq_len(length(pipeName(prevSteps[[i]])))){
                                  if(pipeName(.Object)[j] == pipeName(prevSteps[[i]])[k]){
                                      property(.Object,pipeName = pipeName(.Object)[j]) <- property(prevSteps[[i]], pipeName = pipeName(prevSteps[[i]])[k])
                                  }
                              }
                          }
                      }
                  }
              }



              nameObjList <- getOption("pipeFrameConfig.nameObjList")
              if(is.null(nameObjList)){
                  nameObjList <- list()
              }
              if(!is.null(nameObjList[[stepName(.Object)]])){
                  .Object@id <- nameObjList[[stepName(.Object)]]@id
              }else{
                  count <- getOption("pipeFrameConfig.count")
                  .Object@id <- count
              }
              options(pipeFrameConfig.allowChangeJobDir = FALSE)

              inputPrevSteps <- prevSteps
              prevSteps <- list()
              # for(i in 1:10){
              #     s <- getPrevSteps(stepName = stepName(.Object),i)
              #     if(is.null(s)){
              #         break
              #     }
              #     tt <- nameObjList[[
              #         paste0(s,"_",paste0(.Object@pipeName,collapse = "_"))]]
              #     if(is.null(tt)){
              #         stop(paste("Step", s, " is required for", stepName,
              #                    "please calculate Step",s,"first"))
              #     }else{
              #         prevSteps<-c(prevSteps,list(tt))
              #     }
              # }
              if(argSize > 0 && !isReportStep){
                  if(length(pipeName(.Object))==1 && length(inputPrevSteps) > 0){
                      # in the sample pipeline, auto add remaining prevSteps that follow the graph
                      prevSteps <- lapply(seq_len(10), function(i){
                          s <- getPrevSteps(stepType = stepType(.Object),i)
                          if(is.null(s)){
                              return(NULL)
                          }else{
                              candidateName <- lapply(nameObjList, function(x){
                                  if((stepType(x, attachedTypes = FALSE) %in% s) &&
                                     length(intersect(pipeName(.Object), pipeName(x)))>0){
                                      return(stepName(x))
                                  }else{
                                      return(NULL)
                                  }
                              })
                              candidateName <- unlist(candidateName)
                              if(length(candidateName) == 0) {
                                  stop(paste("There is not any finish step like",paste(s,collapse = " or "),"in this pipeline"))
                              }

                              if(length(inputPrevSteps)>=i && !is.null(inputPrevSteps[[i]])){
                                  candidateType <- lapply(candidateName, function(x){
                                      return(stepType(nameObjList[[x]]))
                                  })
                                  if(stepType(inputPrevSteps[[i]]) %in% candidateType){
                                      return(inputPrevSteps[[i]])
                                  }
                              }
                              candidateID <- lapply(candidateName, function(x){
                                  return(stepID(nameObjList[[x]]))
                              })
                              candidateID <- unlist(candidateID)
                              sel <-  order(candidateID,decreasing = TRUE)[1]
                              latestname <- candidateName[sel]
                              return(nameObjList[[latestname]])

                          }
                      })
                      for(i in sort(seq_len(9),decreasing = TRUE)){
                          if(is.null(prevSteps[[i]])){
                              prevSteps[[i]] <- NULL
                          }else{
                              break
                          }
                      }
                  } else {
                      # previous step come from different pipeline or there are more than one previous step from same pipeline. check if it is the previous step
                      for(i in seq_len(10)){
                          s <- getPrevSteps(stepType = stepType(.Object),i)
                          if(!is.null(s)){
                              if(sum(stepType(inputPrevSteps[[i]],attachedTypes = FALSE) %in% s)<0.5){
                                  stop(paste(stepName(inputPrevSteps[[i]]),"'s step type is not the ",i, "parameter of", stepType(.Object)))
                              }
                          }
                      }
                      prevSteps <- inputPrevSteps

                  }
               }else if(isReportStep){
                  prevSteps <- lapply(nameObjList, function(x){
                      if(length(intersect(pipeName(x),pipeName(.Object)))>0){
                          return(x)
                      }
                  })
              }

              if(!dir.exists(getStepWorkDir(.Object))){
                  dir.create(getStepWorkDir(.Object))
              }


              argv <- c(list(.Object = .Object,prevSteps = prevSteps),argv)
              writeLog(.Object, paste0("Step Name:",.Object@stepName))

              if(beforeInit){
                  .Object@initParam <- argv
                  return(.Object)
              }
              obj_return_from_init <- do.call(init,argv)
              stopifnot(is(obj_return_from_init,stepType(.Object)))
              .Object <- obj_return_from_init
              if(afterInit){
                  .Object@initParam <- argv
                  return(.Object)
              }
              paramValidation(.Object)
              if(beforeProcessing){
                  .Object@processingParam <- prevSteps
                  return(.Object)
              }
              if(isReportStep){
                  obj_return_from_process<-process(.Object, prevSteps = prevSteps)
              }else{
                  if(!is.null(argv[["cmdline"]])){
                      writeLog(.Object, paste("Running command line:"))
                      writeLog(.Object, argv[["cmdline"]])
                      system(argv[["cmdline"]])
                  }else if(!is.null(argv[["callback"]])){
                      func <- argv[["callback"]]
                      func()
                  }else{
                      obj_return_from_process<-process(.Object)
                  }

              }


              stopifnot(is(obj_return_from_process,stepType(.Object)))
              .Object <- obj_return_from_process
              if(afterProcessing){
                  .Object@processingParam <- prevSteps
                  return(.Object)
              }

              if(testGenReport){
                  obj_return_from_genReport <- genReport(.Object, ...)
                  stopifnot(is(obj_return_from_genReport,stepType(.Object)))
                  .Object <- obj_return_from_genReport
              }


              if(is.null(nameObjList[[stepName(.Object)]])){
                  count <- getOption("pipeFrameConfig.count")
                  options(pipeFrameConfig.count = count+1L)
              }
              nameObjList[[stepName(.Object)]] <- .Object
              options(pipeFrameConfig.nameObjList = nameObjList)


              stepreport <- getOption("pipeFrameConfig.report")
              if(is.null(stepreport)){
                  stepreport <- FALSE
                  options(pipeFrameConfig.report = FALSE)
              }
              if(stepreport){
                  pipelineReport <- file.path(getJobDir(),"Report.Rmd")
                  print(pipelineReport)
                  file.copy(from = system.file(package = "pipeFrame", "extdata","Report.Rmd"), to  = pipelineReport,overwrite = TRUE)

                  isrp <- lapply(nameObjList, function(obj){
                      return(obj@isReportStep)
                  })
                  isrp <- unlist(isrp)
                  isrp <- sum(isrp)
                  cat("\n\n", "# Pipeline Reports", file = pipelineReport, append=TRUE, sep = "\n\n")
                  if(isrp == 0){
                      cat("\n\n", "There is no report steps in this pipeline.", file = pipelineReport, append=TRUE, sep = "\n\n")
                  }
                  lapply(nameObjList, function(obj){
                      if(obj@isReportStep){
                          cat("\n\n",
                              paste("##",stepName(obj)),
                              "Click link to visit report:",
                              file = pipelineReport, append=TRUE, sep = "\n\n")
                          reportIdx <- grep("Report|report",names(output(obj)))
                          count <- 1
                          lapply(reportIdx, function(idx){
                              reportname <-names(output(obj))[idx]
                              reportFile <- output(obj)[[idx]]
                              if(startsWith(reportFile,getJobDir())){
                                  reportFile <- file.path("./",substring(reportFile, 1+nchar(getJobDir())))
                              }
                              cat("\n\n", paste0("[Report",count,"](",reportFile,")"),
                                  file = pipelineReport, append=TRUE, sep = "\n\n")
                              count <- count + 1
                          })
                      }
                  })
                  cat("\n\n", "# Steps Information", file = pipelineReport, append=TRUE, sep = "\n\n")
                  lapply(nameObjList, function(obj){
                      cat("\n\n",
                          paste("## ",stepName(obj)),
                          "### Inputs",
                          file = pipelineReport, append=TRUE, sep = "\n\n")
                      inputValue <- input(obj)
                      if(length(inputValue)==0){
                          cat("\n\n",
                              "There are no input directories for this step.",
                              file = pipelineReport, append=TRUE, sep = "\n\n")
                      }else{
                          lapply(names(inputValue), function(x){
                              cat("\n\n",
                                  paste("-- ", x),
                                  paste(inputValue[[x]],collapse = "\n\n"),
                                  file = pipelineReport, append=TRUE, sep = "\n\n")
                          })
                      }


                      cat("\n\n",
                          "### Outputs",
                          file = pipelineReport, append=TRUE, sep = "\n\n")
                      outputValue <- output(obj)
                      if(length(outputValue) == 0){
                          cat("\n\n",
                              "There are no ouput directories for this step.",
                              file = pipelineReport, append=TRUE, sep = "\n\n")
                      }else{
                          lapply(names(outputValue), function(x){
                              cat("\n\n",
                                  paste("-- ", x),
                                  paste(outputValue[[x]],collapse = "\n\n"),
                                  file = pipelineReport, append=TRUE, sep = "\n\n")
                          })
                      }

                      cat("\n\n",
                          "### Other Parameters",
                          file = pipelineReport, append=TRUE, sep = "\n\n")
                      paramValue <- param(obj)
                      if(length(paramValue) == 0){
                          cat("\n\n",
                              "There are no other parameters for this step.",
                              file = pipelineReport, append=TRUE, sep = "\n\n")
                      }else{
                          lapply(names(paramValue), function(n){
                              x <- paramValue[[n]]

                              if(is.character(x)||
                                 is.numeric(x)||
                                 is.logical(x)||
                                 is.factor(x)||
                                 is.null(x)){
                                  val <- x
                                  if(length(x)>1){
                                      val <- paste("a vector started with",x[1])
                                  }
                                  if(is.character(val)){
                                      val <- paste0("\"", val, "\"")
                                  }
                                  x <- val
                              }else{
                                  x <- paste("An object of",class(x))
                              }

                              cat("\n\n",
                                  paste("-- ", n),
                                  paste(x, collapse = "\n\n"),
                                  file = pipelineReport, append=TRUE, sep = "\n\n")
                          })
                      }

                  })
                  saveRDS(object = nameObjList, file = file.path(getJobDir(),".stepobj.rds"))
                  rmarkdown::render(input = pipelineReport, output_file = "Report.html")

              }
              msgBoxDone()
              .Object
          })


setGeneric(name = "init",
           def = function(.Object,prevSteps = list(),...){
               standardGeneric("init")
           })


#' @return \item{init}{(For package developer only)
#' A Step child class object with initialized input,
#' output and other parameters}
#' @rdname Step-class
#' @aliases init
#' @export
setMethod(f = "init",
          signature = "Step",
          definition = function(.Object,prevSteps = list(),...){
             stop("`init` function need to be implemented for inherit classes")
          })



setGeneric(name = "process",
           def = function(.Object,...){
               standardGeneric("process")
           })


setMethod(f = "process",
          signature = "Step",
          definition = function(.Object,...){
              if(file.exists(getStepWorkDir(.Object, "ignore.modify"))){
                  writeLog(.Object,paste0("The step:`",.Object@stepName,
                                          "`do not check result files."))
                  writeLog(.Object, "because file 'ignore.modify' in step directory is detected." )
                  writeLog(.Object, "Ignore checking modified result for this step.")
                  writeLog(.Object, paste0("Please confirm the format of each files match the original ones"))
                  obj_return_from_genReport <- genReport(.Object, ...)
                  stopifnot(is(obj_return_from_genReport,stepType(.Object)))
                  .Object <- obj_return_from_genReport
                  .Object <- setFinish(.Object)
              }else {
                  writeLog(.Object,"Begin to check if it is finished...")
                  ifexist <- FALSE
                  md5filepath <- NULL
                  objfiles <- dir(getStepWorkDir(.Object),pattern = "^pipeFrame.obj.*rds")
                  if(length(objfiles)>0){
                      md5filepath <- getParamMD5Path(.Object)
                      if(file.exists(md5filepath)){
                          ifexist <- TRUE
                      }
                  }


                  if(ifexist){
                      writeLog(.Object,paste0("The step:`",.Object@stepName,
                                              "` was finished. Nothing to do."))
                      writeLog(.Object,
                               paste0("If you need to redo or rerun this step,",
                                      "please call 'clearStepCache(YourStepObject)'",
                                      "or remove file: ", md5filepath))
                      pipeFrameObj <- loadStep(md5filepath,regClass = FALSE)
                      input(pipeFrameObj) <- input(.Object)
                      output(pipeFrameObj) <- output(.Object)
                      param(pipeFrameObj) <- param(.Object)
                      .Object <- pipeFrameObj
                      obj_return_from_genReport <- genReport(.Object, ...)
                      stopifnot(is(obj_return_from_genReport,stepType(.Object)))
                      .Object <- obj_return_from_genReport
                      saveRDS(.Object, file = md5filepath)
                      .Object@loaded <- TRUE
                  }else{
                      writeLog(.Object,as.character(Sys.time()))
                      writeLog(.Object, "New step. Start processing data: ")
                      .Object@timeStampStart<-Sys.time()
                      unlink(file.path(getStepWorkDir(.Object), "pipeFrame.obj.*.rds"), force = TRUE)
                      obj_return_from_processing <- processing(.Object, ...)
                      stopifnot(is(obj_return_from_processing, stepType(.Object)))
                      .Object <- obj_return_from_processing
                      .Object@timeStampEnd<-Sys.time()
                      .Object@reportList$timeStampStart <-
                          .Object@timeStampStart
                      .Object@reportList$timeStampEnd <-
                          .Object@timeStampEnd
                      .Object <- setFinish(.Object)
                      md5filepath <- getParamMD5Path(.Object)
                      saveRDS(.Object, file = md5filepath)
                      obj_return_from_genReport <- genReport(.Object, ...)
                      stopifnot(is(obj_return_from_genReport,stepType(.Object)))
                      .Object <- obj_return_from_genReport
                      saveRDS(.Object, file = md5filepath)
                      writeLog(.Object, "All results have been saved.")
                  }
              }

              .Object
          })



setGeneric(name = "stepName",
           def = function(.Object,...){
               standardGeneric("stepName")
           })

#' @return \item{stepName}{get Step object Character name}
#' @rdname Step-class
#' @aliases stepName
#' @export
setMethod(f = "stepName",
          signature = "Step",
          definition = function(.Object,...){
              return(.Object@stepName)
          })


setGeneric(name = "stepType",
           def = function(.Object, attachedTypes = TRUE, ...){
               standardGeneric("stepType")
           })

#' @return \item{stepType}{get Step object Character type name (class name)}
#' @rdname Step-class
#' @aliases stepType
#' @export
setMethod(f = "stepType",
          signature = "Step",
          definition = function(.Object, attachedTypes = TRUE, ...){
              typers <- (as.character(class(.Object)))
              if(attachedTypes){
                  return(typers)
              }else{
                  return(getAttachedStep(typers))
              }
          })

setGeneric(name = "pipeName",
           def = function(.Object,...){
               standardGeneric("pipeName")
           })

#' @return \item{pipeName}{get Step object pipe name}
#' @rdname Step-class
#' @aliases pipeName
#' @export
setMethod(f = "pipeName",
          signature = "Step",
          definition = function(.Object,...){
              return(.Object@pipeName)
          })






setGeneric(name = "input",
           def = function(.Object, ...)
               standardGeneric("input")
)

#' @rdname Step-class
#' @return \item{input}{get input list}
#' @aliases  input
#' @export
setMethod(f = "input",
          signature = "Step",
          definition = function(.Object){
              return(.Object@inputList)
          })


setGeneric(name = "input<-",
           def = function(.Object, ..., value)
               standardGeneric("input<-")
)


#' @rdname Step-class
#' @return \item{input<-}{set input list}
#' @aliases  input<-
#' @export
setReplaceMethod(f = "input",
                 signature = "Step",
                 definition = function(.Object, value){
                     if(0!=length(intersect(names(input(.Object)),names(output(.Object))))){
                         stop(paste("new input is in outputList "))
                     }
                     if(0!=length(intersect(names(input(.Object)),names(param(.Object))))){
                         stop(paste("new input is in paramList "))
                     }
                     .Object@inputList <- value
                     .Object
                 })







setGeneric(name = "output",
           def = function(.Object, ...)
               standardGeneric("output")
)

#' @rdname Step-class
#' @return \item{output}{get output list}
#' @aliases  output
#' @export
setMethod(f = "output",
          signature = "Step",
          definition = function(.Object){
              return(.Object@outputList)
          })


setGeneric(name = "output<-",
           def = function(.Object, ..., value)
               standardGeneric("output<-")
)

#' @rdname Step-class
#' @return \item{output<-}{set output list}
#' @aliases  output<-
#' @export
setReplaceMethod(f = "output",
                 signature = "Step",
                 definition = function(.Object, value){
                     if(0!=length(intersect(names(output(.Object)),names(input(.Object))))){
                         stop(paste("new output is in inputList "))
                     }
                     if(0!=length(intersect(names(output(.Object)),names(param(.Object))))){
                         stop(paste("new output is in paramList "))
                     }
                     .Object@outputList <- value
                     .Object
                 })








setGeneric(name = "param",
           def = function(.Object, ...)
               standardGeneric("param")
)

#' @rdname Step-class
#' @return \item{param}{get other parameters list}
#' @aliases  param
#' @export
setMethod(f = "param",
          signature = "Step",
          definition = function(.Object){
              return(.Object@paramList)
          })



setGeneric(name = "param<-",
           def = function(.Object, value)
               standardGeneric("param<-")
)

#' @rdname Step-class
#' @return \item{param<-}{set other parameters list}
#' @aliases  param<-
#' @export
setReplaceMethod(f = "param",
                 signature = "Step",
                 definition = function(.Object, value){
                     if(0!=length(intersect(names(param(.Object)),names(input(.Object))))){
                         stop(paste("new other parameter is in inputList "))
                     }
                     if(0!=length(intersect(names(param(.Object)),names(output(.Object))))){
                         stop(paste("new other parameter is in outputList "))
                     }
                     .Object@paramList<- value
                     .Object
                 })



setGeneric(name = "property",
           def = function(.Object, ..., pipeName = NULL)
               standardGeneric("property")
)

#' @rdname Step-class
#' @return \item{property}{get property list}
#' @aliases  property
#' @export
setMethod(f = "property",
          signature = "Step",
          definition = function(.Object, ..., pipeName = NULL){
              if(is.null(pipeName)){
                  if(length(.Object@pipeName)==1){
                      pipeName <- .Object@pipeName
                  }else{
                      stop("object with multi-pipeName, pipeName need to be specified")
                  }
              }
              stopifnot(is.character(pipeName))
              return(.Object@propList[[pipeName]])
          })



setGeneric(name = "property<-",
           def = function(.Object,..., pipeName = NULL, value)
               standardGeneric("property<-")
)

#' @rdname Step-class
#' @return \item{property<-}{set property list}
#' @aliases  property<-
#' @export
setReplaceMethod(f = "property",
                 signature = "Step",
                 definition = function(.Object,pipeName = NULL, value){
                     if(is.null(pipeName)){
                         if(length(.Object@pipeName)==1){
                             pipeName <- .Object@pipeName
                         }else{
                             stop("object with multi-pipeName, pipeName need to be specified")
                         }
                     }
                     .Object@propList[[pipeName]] <- value
                     .Object
                 })





setGeneric(name = "report",
           def = function(.Object, ...)
               standardGeneric("report")
)

#' @rdname Step-class
#' @return \item{report}{get report list}
#' @aliases  report
#' @export
setMethod(f = "report",
          signature = "Step",
          definition = function(.Object){
              return(.Object@reportList)
          })



setGeneric(name = "report<-",
           def = function(.Object, ..., value)
               standardGeneric("report<-")
)

#' @rdname Step-class
#' @return \item{report<-}{set report list}
#' @aliases  report<-
#' @export
setReplaceMethod(f = "report",
                 signature = "Step",
                 definition = function(.Object, value){
                     .Object@reportList <- value
                     .Object
                 })



setGeneric(name = "argv",
           def = function(.Object, ...)
               standardGeneric("argv")
)

#' @rdname Step-class
#' @return \item{argv}{get arguments list}
#' @aliases  argv
#' @export
setMethod(f = "argv",
          signature = "Step",
          definition = function(.Object){
              return(.Object@argv)
          })

setGeneric(name = "argv<-",
           def = function(.Object, ..., value)
               standardGeneric("argv<-")
)

# not export
# @rdname Step-class
# @return \item{argv<-}{set argumnets list}
# @aliases  argv<-
# @export
setReplaceMethod(f = "argv",
                 signature = "Step",
                 definition = function(.Object, value){
                     .Object@argv <- value
                     .Object
                 })



#' @rdname Step-class
#' @return \item{$}{get inputList, outputList, paramList, allList, propList
#' or any item value in inputList, outputList or paramList}
#' @aliases  $
#' @export
setMethod(f = "$",
          signature = "Step",
          definition = function(x, name){
              if(name == "inputList"){
                  return(input(x))
              }else if(name == "outputList"){
                  return(output(x))
              }else if(name == "paramList"){
                  return(param(x))
              }else if(name == "allList"){
                  return(c(input(x), output(x), param(x)))
              }else if(name == "propList"){
                  return(property(x))
              }else if(name == "reportList"){
                  return(report(x))
              }else if(name == "argv"){
                  return(argv(x))
              }else{
                  return(getParam(x,name))
              }
          })



#' @rdname Step-class
#' @return \item{$<-}{set inputList, outputList, paramList, allList, propList
#' or any item value in inputList, outputList or paramList}
#' @aliases  $<-
#' @export
setReplaceMethod(f = "$",
          signature = "Step",
          definition = function(x, name, value){
              if(name == "inputList"){
                  input(x) <- value
              }else if(name == "outputList"){
                  output(x) <- value
              }else if(name == "paramList"){
                  param(x) <- value
              }else if(name == "allList"){
                  stop("`allList` can not be set")
              }else if(name == "propList"){
                  property(x) <- value
              }else if(name == "reportList"){
                  report(x) <- value
              }else if(name == "argv"){
                  argv(x) <- value
              }else{
                  stop(paste(name,"is not available for $<-"))
              }
              x
          })


setGeneric(name = "getParam",
           def = function(.Object,item,...){
               standardGeneric("getParam")
           })

#' @rdname Step-class
#' @return \item{getParam}{Get parameter value set by process function.
#' See \code{getParamItems} to obtain valid items for query.}
#' @aliases  getParam
#' @export
setMethod(f = "getParam",
          signature = "Step",
          definition = function(.Object,item,
                                type = c("input","output","other"),...){
              type <- unique(type)
              # for(t in type){
              #     t1 <- match.arg(t,c("input","output","other"))
              #     if(t1 == "input"){
              #         if(!is.null(input(.Object)[[item]])){
              #             return(input(.Object)[[item]])
              #         }
              #     }else if(t1 == "output"){
              #         if(!is.null(output(.Object)[[item]])){
              #             return(output(.Object)[[item]])
              #         }
              #     }else{
              #         if(!is.null(param(.Object)[[item]])){
              #             return(param(.Object)[[item]])
              #         }
              #     }
              # }

              lapply(type, function(t){
                  t1 <- match.arg(t,c("input","output","other"))
              })
              if("input" %in% type){
                  if(!is.null(input(.Object)[[item]])){
                      return(input(.Object)[[item]])
                  }
              }
              if("output" %in% type){
                  if(!is.null(output(.Object)[[item]])){
                      return(output(.Object)[[item]])
                  }
              }
              if("other" %in% type){
                  if(!is.null(param(.Object)[[item]])){
                      return(param(.Object)[[item]])
                  }
              }
              return(NULL)
          })

setGeneric(name = "getParamItems",
           def = function(.Object, type = c("input","output","other"),...){
               standardGeneric("getParamItems")
           })

#' @rdname Step-class
#' @return \item{getParamItems}{Get parameter name list}
#' @aliases  getParamItems
#' @export
setMethod(f = "getParamItems",
          signature = "Step",
          definition = function(.Object, type = c("input","output","other"),
                                ...){
              type <- unique(type)
              allitem <- c()
              # for(t in type){
              #     t1 <- match.arg(t,c("input","output","other"))
              #     if(t1 == "input"){
              #         allitem <- c(allitem,names(input(.Object)))
              #     }else if(t1 == "output"){
              #         allitem <- c(allitem,names(output(.Object)))
              #     }else{
              #         allitem <- c(allitem,names(param(.Object)))
              #     }
              # }
              allitem <- c()
              lapply(type, function(t){
                  t1 <- match.arg(t,c("input","output","other"))
              })
              if("input" %in% type){
                  allitem <-c(allitem, names(input(.Object)))
              }
              if("output" %in% type){
                  allitem <-c(allitem,names(output(.Object)))
              }
              if("other" %in% type){
                  allitem <-c(allitem,names(param(.Object)))
              }
              return(allitem)
          })



setGeneric(name = "isReady",
           def = function(.Object,...){
               standardGeneric("isReady")
           })

#' @rdname Step-class
#' @return \item{isReady}{Is the process ready for downstream process}
#' @aliases  isReady
#' @export
setMethod(f = "isReady",
          signature = "Step",
          definition = function(.Object,...){
              return(.Object@finish)
          })


setGeneric(name = "clearStepCache",
           def = function(.Object,...){
               standardGeneric("clearStepCache")
           })

#' @rdname Step-class
#' @return \item{clearStepCache}{Clear cache of Step object}
#' @aliases  clearStepCache
#' @export
setMethod(f = "clearStepCache",
          signature = "Step",
          definition = function(.Object,...){
              if(!unlink(getParamMD5Path(.Object))){
                  message("Chache has been cleared")
              }else{
                  message("Chache does not exist. Nothing has been done.")
              }
              message("cleaning output files ...")
              lapply(output(.Object), function(x){
                  message("removing files if exist:")
                  x <- unlist(x)
                  x <- normalizePath(x)
                  print(x)
                  unlink(x,recursive = TRUE, force = TRUE)
              })
              .Object@finish<-FALSE
              .Object
          })


setGeneric(name = "getAutoPath",
           def = function(.Object,originPath,regexSuffixName,suffix,...){
               standardGeneric("getAutoPath")
           })

#' @return \item{getAutoPath}{(For package developer)
#' Developer can use this method to generate new file name
#' based on exist input file name}
#' @rdname Step-class
#' @aliases getAutoPath
#' @export
setMethod(f = "getAutoPath",
          signature = "Step",
          definition = function(.Object,originPath,regexSuffixName,suffix,...){
              stopifnot(is.character(originPath))
              if(!startsWith(suffix,".")){
                  suffix <- paste0(".", suffix)
              }
              if(suffix == "."){
                  suffix <- ""
              }
              prefix<-getBasenamePrefix(originPath,regexSuffixName)
              return(file.path(getStepWorkDir(.Object),
                               paste0(prefix,suffix)))
          })
setGeneric(name = "paramValidation",
           def = function(.Object,...){
               standardGeneric("paramValidation")
           })
setMethod(f = "paramValidation",
          signature = "Step",
          definition = function(.Object,...){
              writeLog(.Object, "All Parameters for This Step:")
              checkAllPath(.Object)
              checkRequireParam(.Object)
              writeLog(.Object, "__________________________________________")
          })


setGeneric(name = "checkRequireParam",
           def = function(.Object,...){
               standardGeneric("checkRequireParam")
           })

#' @return \item{checkRequireParam}{(For package developer)
#' Check required inputs or parameters are filled.}
#' @rdname Step-class
#' @aliases checkRequireParam
#' @export
setMethod(f = "checkRequireParam",
          signature = "Step",
          definition = function(.Object,...){
              # override this function if necessary
              writeLog(.Object, "|Other Parameters:")
              paramValue <- param(.Object)

              lapply(names(paramValue), function(n){
                  x <- paramValue[[n]]
                  writeLog(.Object, paste0("|    ", n, ":"))
                  if(is.character(x)||
                     is.numeric(x)||
                     is.logical(x)||
                     is.factor(x)||
                     is.null(x)){
                      val <- x
                      if(length(x)>1){
                          val <- paste("a vector started with",x[1])
                      }
                      if(is.character(val)){
                          val <- paste0("\"", val, "\"")
                      }
                      writeLog(.Object, paste0("|        ", val))
                  }else{
                      writeLog(.Object, paste("|        An object of",class(x)))
                  }
              })
              return(TRUE)
          })



setGeneric(name = "checkAllPath",
           def = function(.Object,...){
               standardGeneric("checkAllPath")
           })

#' @return \item{checkRequireParam}{(For package developer)
#'  Check required inputs are filled.}
#' @rdname Step-class
#' @aliases checkAllPath
#' @export
setMethod(f = "checkAllPath",
          signature = "Step",
          definition = function(.Object,...){
              writeLog(.Object, "|Input:")
              inputValue <- input(.Object)
              for(n in names(inputValue)){
                  if(!is.character(inputValue[[n]])){
                      stop(paste("input file value of", n, "is not is not character"))
                  }
                  writeLog(.Object, paste0("|    ", n,":"))
                  lapply(inputValue[[n]], function(x){
                      if(!file.exists(x)){
                          stop(paste("input file directory", n, "is not exist:", x))
                      }
                      writeLog(.Object, paste0("|        \"", x,"\""))
                  })
              }
              writeLog(.Object, "|Output:")
              ouputValue <- output(.Object)
              for(n in names(ouputValue)){
                  if(!is.character(ouputValue[[n]])){
                      stop(paste("ouput file value of", n, "is not is not character:"))
                  }
                  writeLog(.Object, paste0("|    ", n,":"))
                  lapply(ouputValue[[n]], function(x){
                      if(!file.exists(dirname(x))){
                          stop(paste("ouput file/folder's directory", n, "is not exist:",x ))
                      }
                      writeLog(.Object, paste0("|        \"", x,"\""))
                  })
              }
          })

setGeneric(name = "getParamMD5Path",
           def = function(.Object,...){
               standardGeneric("getParamMD5Path")
           })
#' @return \item{getParamMD5Path}{The Step object storage directory}
#' @rdname Step-class
#' @aliases getParamMD5Path
#' @export
setMethod(f = "getParamMD5Path",
          signature = "Step",
          definition = function(.Object,...){
              print(Sys.time())
              paramstr <- c(stepName(.Object))
              itNames <- names(param(.Object))
              # for(n in sort(itNames)){
              #     paramstr<-c(paramstr,n)
              #     paramstr<-c(paramstr,getParam(.Object,n,type="other"))
              # }
              rs <- lapply(sort(itNames), function(n){
                   ap <- param(.Object)[[n]]
                   if(is.character(ap)){
                       ap <- lapply(ap,function(x){
                           if(startsWith(x,getJobDir())){
                               return(substring(x,2+nchar(getJobDir())))
                           }else if(startsWith(x,getRefDir())){
                               return(substring(x,2+nchar(getRefDir())))
                           }else{
                               return(x)
                           }
                       })
                       ap <- sort(unlist(ap))
                   }
                   return(c(n,ap))
              })
              print(Sys.time())
              paramstr <- c(paramstr,unlist(rs))
              ioNames <- c(names(input(.Object)),names(output(.Object)))
              # for(n in sort(ioNames)){
              #     paramstr<-c(paramstr,n)
              #     paths <- getParam(.Object,n,type = c("input","output"))
              #     if(!is.character(paths) && !is.list(paths)){
              #         paramstr <- c(paramstr,paths)
              #         next
              #     }
              #     paths <- sort(unlist(paths))
              #     paths1 <- c()
              #     breakflag <- FALSE
              #     for(path in paths){
              #         if(dir.exists(path)){
              #             paths1 <- c(paths1, sort(dir(path,recursive = TRUE)))
              #         }else if(file.exists(path)){
              #             paths1 <- c(paths1, path)
              #         }else{
              #             paramstr <- c(paramstr,runif(1))
              #             breakflag <- TRUE
              #             break;
              #         }
              #     }
              #     if(breakflag){
              #         break
              #     }
              #     paths <- paths1
              #     paths <- paths[grep("pipeFrame.obj",paths,invert = TRUE)]
              #     checkpaths <- c()
              #     for(path in paths){
              #         p <- normalizePath(path)
              #         checkpaths <- c(checkpaths,p)
              #         if(startsWith(p,getJobDir())){
              #             p <- substring(p,2+nchar(getJobDir()))
              #         }
              #         paramstr <- c(paramstr,p)
              #     }
              #     for(p in checkpaths){
              #         filesize <- file.info(p)$size
              #         paramstr <- c(paramstr,filesize)
              #     }
              # }
              cl <- NULL


              paramstr0 <- lapply(sort(ioNames), function(n){
                  paramstr0<- n
                  paths <- input(.Object)[[n]]
                  if(is.null(paths)){
                      paths <- output(.Object)[[n]]
                  }
                  if(!is.character(paths) && !is.list(paths)){
                      paramstr0 <- c(paramstr0, paths) ## stop error in check file dir
                      return(paramstr0)
                  }
                  return(paramstr0)
              })

              ios <- c(input(.Object),output(.Object))

              ios <- unlist(ios)

              paths <- tryCatch(lapply(ios,function(path){
                      if(dir.exists(path)){
                          allfiles <- dir(path,recursive = TRUE)
                          if(length(allfiles)==0){
                              return(runif(1))
                          }else{
                              return(sort(file.path(path,allfiles)))
                          }
                      }else if(file.exists(path)){
                          return(path)
                      }else{
                          stop("")
                      }
                  }), error = function(e){})


             if(is.null(paths) || length(paths) == 0){
                 md5code<-substr(digest(object = runif(1),algo = "md5"),1,8)
                 md5filepath<-file.path(getStepWorkDir(.Object),
                                        paste("pipeFrame.obj",md5code,
                                              "rds",sep = "."))
                 return(md5filepath)
             }else{
                 paths<-unlist(paths)
                 paths <- paths[grep("pipeFrame.obj",paths,invert = TRUE)]
                 if(getOption("pipeFrameConfig.ignoreCheck")){
                     paths <- file.info(paths)$size
                 }else{
                     sizes <- file.info(paths)$size
                     print(Sys.time())
                     if(max(sizes)<1000000000){
                         paths <- tools::md5sum(paths)
                         names(paths) <- NULL
                     }else{
                         threadsize <- getThreads()
                         if(length(paths) < threadsize){
                             threadsize <- length(paths)
                         }
                         cl <- makeCluster(threadsize)
                         paths <- parLapply(cl = cl, X = paths, fun = function(path){
                             return(tools::md5sum(path))
                         })
                         stopCluster(cl)
                         paths <- unlist(paths)
                         names(paths) <- NULL
                     }
                     print(Sys.time())
                 }
             }


              # threadsize <- getThreads()
              #
              # if(length(ios) < threadsize){
              #     threadsize <- length(ios)
              # }
              # print(Sys.time())
              # cl <- makeCluster(threadsize)
              #
              # print(Sys.time())
              # paramstr01 <- parLapply(cl = cl, X = ios, fun = function(paths){
              #     paths <- sort(unlist(paths))
              #     paths1 <- lapply(paths,function(path){
              #         if(dir.exists(path)){
              #             allfiles <- dir(path,recursive = TRUE)
              #             if(length(allfiles)==0){
              #                 return(runif(1))
              #             }else{
              #                 return(sort(file.path(path,allfiles)))
              #             }
              #         }else if(file.exists(path)){
              #             return(path)
              #         }else{
              #             return(runif(1))
              #         }
              #     })
              #     paths2 <- lapply(paths1,function(path){
              #         return(is.numeric(path))
              #     })
              #
              #     paths1 <- unlist(paths1)
              #     paths <- paths1
              #     paths <- paths[grep("pipeFrame.obj",paths,invert = TRUE)]
              #
              #     if(sum(unlist(paths2))==0){
              #         if(getOption("pipeFrameConfig.ignoreCheck")){
              #             paths <- lapply(paths, function(p){
              #                 file.info(p)$size
              #             })
              #             paths <- unlist(paths)
              #         }else{
              #             paths <- tools::md5sum(paths)
              #             names(paths) <- NULL
              #         }
              #
              #     }
              #
              #     return(paths)
              #     #paramstr0 <- c(paramstr0, paths)
              #
              #     # checkpaths <- c()
              #     # # for(path in paths){
              #     # #     p <- normalizePath(path)
              #     # #     checkpaths <- c(checkpaths,p)
              #     # #     if(startsWith(p,getJobDir())){
              #     # #         p <- substring(p,2+nchar(getJobDir()))
              #     # #     }
              #     # #     paramstr <- c(paramstr,p)
              #     # # }
              #     # ps <- lapply(paths, function(path){
              #     #     p <- normalizePath(path)
              #     #     checkpaths <- c(checkpaths,p)
              #     #     if(startsWith(p,getJobDir())){
              #     #         p <- substring(p,2+nchar(getJobDir()))
              #     #     }
              #     #     return(p)
              #     # })
              #     # paramstr0 <- c(paramstr0,unlist(ps))
              #     # # for(p in checkpaths){
              #     # #     filesize <- file.info(p)$size
              #     # #     paramstr <- c(paramstr,filesize)
              #     # # }
              #     # fs <- lapply(checkpaths, function(p){
              #     #     file.info(p)$size
              #     # })
              #     # paramstr0 <- c(paramstr0, unlist(fs))
              #     # return(paramstr0)
              # })
              # print(Sys.time())
              # stopCluster(cl)
              print(Sys.time())
              #paramstr <- c(paramstr,paramstr0, paramstr01)
              paramstr <- c(paramstr, paramstr0, paths)
              names(paramstr) <- NULL
              md5code<-substr(digest(object = paramstr,algo = "md5"),1,8)
              md5filepath<-file.path(getStepWorkDir(.Object),
                                     paste("pipeFrame.obj",md5code,
                                          "rds",sep = "."))
              print(Sys.time())
              return(md5filepath)
          })
setGeneric(name = "setFinish",
           def = function(.Object,...){
               standardGeneric("setFinish")
           })
setMethod(f = "setFinish",
          signature = "Step",
          definition = function(.Object,...){
              .Object@finish<-TRUE
              writeLog(.Object,as.character(Sys.time()))
              writeLog(.Object,"processing finished")
              .Object
          })


setGeneric(name = "getStepWorkDir",
           def = function(.Object, filename = NULL, ...){
               standardGeneric("getStepWorkDir")
           })
#' @return \item{getStepWorkDir}{Get the step work directory of this object}
#' @rdname Step-class
#' @aliases getStepWorkDir
#' @export
setMethod(f = "getStepWorkDir",
          signature = "Step",
          definition = function(.Object, filename = NULL, ...){
              if(is.null(filename)){
                  return(file.path(getJobDir(),
                                   paste0("Step_",
                                          sprintf("%02d",
                                                  stepID(.Object)),
                                          "_",stepName(.Object))))
              }else{
                  return(file.path(getJobDir(),
                                   paste0("Step_",
                                          sprintf("%02d",stepID(.Object)),
                                          "_",stepName(.Object)),filename))
              }

          })

setGeneric(name = "stepID",
           def = function(.Object,...){
               standardGeneric("stepID")
           })

#' @return \item{stepID}{Get the step ID}
#' @rdname Step-class
#' @aliases stepID
#' @export
setMethod(f = "stepID",
          signature = "Step",
          definition = function(.Object,...){
              return(.Object@id)
          })



setGeneric(name = "writeLog",
           def = function(.Object,msg,...,isWarnning=FALSE,
                          appendLog=TRUE,showMsg=TRUE){
               standardGeneric("writeLog")
           })
#' @return \item{writeLog}{(For package developer) write log.}
#' @rdname Step-class
#' @aliases writeLog
#' @export
setMethod(f = "writeLog",
          signature = "Step",
          definition = function(.Object,msg,...,isWarnning=FALSE,
                                appendLog=TRUE,showMsg=TRUE){
              if(isWarnning){
                  warning(msg)
                  msg<-paste0("Warning:",msg)
              }else if(showMsg){
                  message(msg)
              }
              write.table(
                  msg,file.path(getStepWorkDir(.Object),
                                "pipeFrame.obj.log"),quote = FALSE,
                  row.names = FALSE,col.names = FALSE,append = appendLog)
          })

#' @return \item{processing}{(For package developer) Run pipeline step}
#' @rdname Step-class
#' @aliases processing
#' @export
setGeneric(name = "processing",
           def = function(.Object,...){
               standardGeneric("processing")
           })

#' @return \item{genReport}{(For package developer) Generate report list}
#' @rdname Step-class
#' @aliases genReport
#' @export
setGeneric(name = "genReport",
           def = function(.Object,...){
               standardGeneric("genReport")
           })
wzthu/pipeFrame documentation built on Sept. 22, 2021, 4:36 p.m.