R/utils.R

Defines functions viewtask silent reset.job tailf more dedup file.dir file.name xml2task parse.info ssub_cmd qsub_cmd bsub_cmd

Documented in bsub_cmd dedup file.dir file.name more parse.info qsub_cmd reset.job silent ssub_cmd tailf viewtask xml2task

##############################################################################
## Marcin Imielinski
##
## Weill Cornell Medicine
## mai9037@med.cornell.edu
##
## New York Genome Center
## mimielinski@nygenome.org
##

## This program is free software: you can redistribute it and/or modify it
## under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.

## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.

## You should have received a copy of the GNU Lesser General Public License
## along with this program.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

#' @name bsub_cmd
#' @title bsub_cmd
#' @description
#'
#' Makes bsub command that wraps shell command "cmd" to send to queue "queue"
#' redirecting output / error etc streams to path prefixed by "jname",
#' optional_args: maximum memory requirements "mem", "jlabel" job label
#'
#' @param cmd string shell command to be submitted via bsub
#' @param queue string name of specified queue, '-q "queue_name" ' (default = NULL)
#' @param jname  string path prefix by 'jname' (default = NULL)
#' @param jlabel string job name for '-J "job_name" ' (default = NULL)
#' @param jgroup string job_group_name for '-g "job_group_name" ' (default = NULL)
#' @param mem  integer amount of virtual memory/RAM to use via resource requirement arg '-R "res_req"'' (default = NULL)
#' @param group  string project_name for '-P' (default = NULL)
#' @param cwd string pathname to current working directory; -cwd "current_working_directory" (default = NULL)
#' @param mc.cores integer number of cores to use (default = 1)
#' @param deadline boolean specifies if deadline initiation time used (default = FALSE)
#' @author Marcin Imielinski
#' @export
bsub_cmd = function(cmd, queue = NULL, jname = NULL, jlabel = NULL, jgroup = NULL, mem = NULL, group = NULL, cwd = NULL, mc.cores = NULL, deadline = FALSE)
{
    if (is.null(jname) & is.null(names(cmd))){
        jname = 'job'
    }

    if (length(jname) != length(cmd)){
        jname = rep(jname, length(cmd))
    }

    if (!is.null(jname)){
        names(cmd) = dedup(jname)
    }

    qjname = paste( "\"", names(cmd), "\"", sep="" )
    qjout = paste( "\"", names(cmd), ".bsub.out", "\" ", sep="" )
    qjerr = paste( "\"", names(cmd), ".bsub.err", "\" ", sep="" )
    qjrout = paste( "\"", names(cmd), ".R.out", "\" ", sep="" )
    out_cmd = paste( "bsub -o ", qjout, " -e ",  qjerr);
    out_cmd = paste(out_cmd, ifelse(is.na(queue), '', paste("-q ", queue)))
#    if (!is.null(queue)) out_cmd = ifelse(is.na(queue), '', paste("-q ", queue))
    if (!is.null(group)) out_cmd = paste(out_cmd, " -P ", group)
    if (!is.null(mem)) out_cmd = paste(out_cmd, " -R \"rusage[mem=", mem, "]\" ", sep = "")
    if (!is.null(jlabel)) out_cmd = paste(out_cmd, " -J ", jlabel )
    if (!is.null(jgroup)) out_cmd = paste(out_cmd, " -g ", sub('^\\/*', '/', jgroup))
    if (!is.null(cwd)) out_cmd = paste(out_cmd, " -cwd ", cwd )
    if (!is.null(mc.cores)) out_cmd = paste(out_cmd, sprintf(" -n %d,%d -R 'span[hosts=1]'", mc.cores, mc.cores))
    if (deadline){ out_cmd = paste(out_cmd, '-sla DEADLINEsla') }
    out_cmd = paste(out_cmd," \"",  cmd, "\"", sep = "")
    names(out_cmd)= names(cmd)
    return(out_cmd)
}




#' @name qsub_cmd
#' @title qsub_cmd
#' @description
#'
#' Makes qsub command that wraps shell command "script.fn" to send to queue "queue"
#' redirecting output / error etc streams to path prefixed by "jname",
#' optional_args: maximum memory requirements "mem", "jlabel" job label
#'
#' @param script.fn string shell command to be submitted via bsub
#' @param queue string queue destination, specifies destination of the job '-q "destination" ' (default = NULL)
#' @param jname  string path prefix by 'jname' (default = NULL)
#' @param jlabel string job name for '-J "job_name" ' (default = NULL)
#' @param jgroup string job_group_name for '-g "job_group_name" ' (default = NULL)
#' @param mem  integer amount of virtual memory/RAM to use via resource requirement arg '-R "res_req"'' (default = NULL)
#' @param group  string project_name for '-P' (default = NULL)
#' @param cwd string pathname to current working directory; -cwd "current_working_directory" (default = NULL)
#' @param mc.cores integer number of cores to use (default = 1)
#' @param deadline boolean specifies if deadline initiation time used (default = FALSE)
#' @author Marcin Imielinski
#' @export
qsub_cmd = function(script.fn, queue = NULL, jname = NULL, jlabel = NULL, jgroup = NULL, mem = NULL, group = NULL, cwd = NULL, mc.cores = NULL, deadline = F, now = FALSE, touch_job_out = TRUE, qprior = 0)
{
    if (is.null(jname) & is.null(names(script.fn))){
        jname = 'job'
    }

    if (length(jname) != length(script.fn)){
        jname = rep(jname, length(script.fn))
    }

    if (!is.null(jname)){
        names(script.fn) = dedup(jname)
    }

    qjname = paste( "\"", names(script.fn), "\"", sep="" )
    qjout = paste( "", names(script.fn), ".bsub.out", " " , sep="" )
    qjerr = paste( "", names(script.fn), ".bsub.err", "", sep="" )
    qjrout = paste( "", names(script.fn), ".R.out", "", sep="" )
    out_cmd = paste0("qsub -V -j y -p ", qprior, " -o ", qjout)
    ## out_cmd = sprintf("qsub -V -j y -p %s -o %s ", qprior, qjout)
    ## out_cmd = paste("qsub -V -j y -o ", qjout);
    out_cmd = paste(out_cmd, ifelse(is.na(queue), '', paste("-q ", queue)))
    #if (!is.null(queue)) out_cmd = ifelse(is.na(queue), '', paste("-q ", queue))
    if (!is.null(group)) out_cmd = paste(out_cmd, " -P ", group)
    if (!is.null(mem)) out_cmd = paste(out_cmd, " -l h_vmem=", mem, "g", sep = "")
    if (!is.null(jgroup)) out_cmd = paste(out_cmd, " -g ", sub('^\\/*', '/', jgroup))
    if (!is.null(cwd)) {
        current_umask = Sys.umask(mode = NA)
        out_cmd = paste(out_cmd, " -wd ", cwd )
        Sys.umask(mode = "0002")
        base::file.create(trimws(paste0(cwd, "/", qjout)))
        Sys.umask(mode = current_umask)
        ## cmds = sprintf("umask 002; touch %s", paste0(cwd, "/", qjout))
        ## lapply(cmds, function(this_cmd) {system(this_cmd); return(NULL)})
    }
    if (!is.null(qjname)) out_cmd = paste(out_cmd, " -N ", jlabel)
    out_cmd = paste(out_cmd, '-now', ifelse(now, 'y', 'n'))
    if (!is.null(mc.cores)) out_cmd = paste(out_cmd, ifelse(!is.na(mc.cores), ifelse(mc.cores > 0,  paste(" -pe smp",  mc.cores), ''), ''))
    out_cmd = paste(out_cmd, script.fn)
    names(out_cmd)= names(script.fn)
    return(out_cmd)
}


#' @name ssub_cmd
#' @title ssub_cmd
#' @description
#'
#' Makes ssub command that wraps shell command "script.fn" to send to queue "queue"
#' redirecting output / error etc streams to path prefixed by "jname",
#' optional_args: maximum memory requirements "mem", "jlabel" job label
#'
#' @param script.fn string shell command to be submitted via bsub
#' @param queue string queue destination, specifies destination of the job '-q "destination" ' (default = NULL)
#' @param jname  string path prefix by 'jname' (default = NULL)
#' @param jlabel string job name for '-J "job_name" ' (default = NULL)
#' @param jgroup string job_group_name for '-g "job_group_name" ' (default = NULL)
#' @param mem  integer amount of virtual memory/RAM to use via resource requirement arg '-R "res_req"'' (default = NULL)
#' @param group  string project_name for '-P' (default = NULL)
#' @param cwd string pathname to current working directory; -cwd "current_working_directory" (default = NULL)
#' @param mc.cores integer number of cores to use (default = 1)
#' @param deadline boolean specifies if deadline initiation time used (default = FALSE)
#' @param now
#' @param time
#' @param qprior used for setting the "nice" value for SLURM
#' @author Zoran Gajic
#' @export
ssub_cmd = function(script.fn, queue, qos = NULL, jname = NULL, jlabel = NULL, jgroup = NULL, mem=NULL, group = NULL, cwd = NULL, mc.cores = NULL, deadline = F, now = FALSE, time = "00", qprior = NULL, gres = NULL)
{
        if (is.null(jname) & is.null(names(script.fn)))
            jname = 'job'
        
        if (length(jname) != length(script.fn))
            jname = rep(jname, length(script.fn))

        if (is.null(qos))
            qos = as.character(NA)

        if (is.null(gres))
            gres = as.character(NA)
        
        if (!is.null(jname))
            names(script.fn) = dedup(jname)    
        qjname = paste( "\"", names(script.fn), "\"", sep="" )
        qjout = paste( "", names(script.fn), ".bsub.out", " " , sep="" )
        qjerr = paste( "", names(script.fn), ".bsub.err", "", sep="" )
        qjrout = paste( "", names(script.fn), ".R.out", "", sep="" )                    
        out_cmd = paste("sbatch --export=ALL --output=", qjout, sep = '');
        out_cmd = paste(out_cmd, ifelse(is.na(queue), '', paste0("-p ", queue)))
        out_cmd = paste(out_cmd, ifelse(is.na(qos), '', paste0("-q ", qos)))
        out_cmd = paste(out_cmd, ifelse(is.na(gres), "", paste0("--gres=", gres)))
        out_cmd = paste(out_cmd, paste0('--time=', time, ':00:00 '))
        if (!is.null(mem)) out_cmd = paste(out_cmd, " --mem=", mem, "G", sep = "");
        #if (!is.null(jgroup)) out_cmd = paste(out_cmd, " -g ", sub('^\\/*', '/', jgroup))
        ## if (!is.null(cwd)) out_cmd = paste(out_cmd, " --workdir=", cwd , sep = '')
        if (!is.null(cwd)) {
            current_umask = Sys.umask(mode = NA)
            out_cmd = paste0(out_cmd, " --chdir=", cwd)
            Sys.umask(mode = "0002")
            base::file.create(trimws(paste0(cwd, "/", qjout)))
            Sys.umask(mode = current_umask)
            ## cmds = sprintf("umask 002; touch %s", paste0(cwd, "/", qjout))
            ## lapply(cmds, function(this_cmd) {system(this_cmd); return(NULL)})
        }
        if (!is.null(qjname)) out_cmd = paste(out_cmd, " --job-name=", jlabel, sep = '')
        ## out_cmd = paste(out_cmd, '-now', ifelse(now, 'y', 'n'))
        ## khadi Friday, Sep 25, 2020, Week 39, 08:59:16 AM
        ## wtf... slurm defaults to 2 cores per task if you don't set? changing >1 to >0
        if (!is.null(mc.cores)) out_cmd = paste(out_cmd, ifelse(!is.na(mc.cores), ifelse(mc.cores>0,  paste(" --cpus-per-task=",  mc.cores, sep = ""), ''), ''))
        if (!is.null(qprior)) out_cmd =  paste(out_cmd, ifelse(!is.na(qprior), ifelse(qprior>=0,  paste(" --nice=",  qprior, sep = ""), ''), '')) # set the "nice" value for SLURM
        out_cmd = paste(out_cmd, script.fn)
        names(out_cmd)= names(script.fn)
        return(out_cmd)
    }



#' @name parse.info
#' @title parse.info
#' @description
#'
#' parses outputs of completed Job by subdirectory
#'
#' @param jname string 'jobname' pathanme to completed Job subdirectory
#' @param detailed boolean "verbose", outputs 'max.swap', 'max.processes', and 'max.threads' (default = FALSE)
#' @param force boolean force if *out/*err not found (default = FALSE)
#' @param mc.cores integer number of cores to use (default = 1)
#' @author Marcin Imielinski
#' @export
parse.info = function(jname, detailed = FALSE, force = FALSE, mc.cores = 1)
{
    dir = file.dir(jname)
    jname = file.name(jname)

    input.jname = jname
    jname = gsub('\\.bsub\\.out$', '', gsub('\\.bsub\\.err$', '', jname))
    names(input.jname) = jname

    if (length(jname)==0){
        outs = data.frame(jname = NA,
            out.file = NA,
            err.file = NA,
            exit_flag = NA, term_flag = NA, started = NA, reported = NA, hours_elapsed = NA, max_mem = NA, cpu_time = NA,
            success = NA,
            stringsAsFactors = F)
    }
    else{
        outs = data.frame(jname = gsub('\\.R$', '', jname),
            out.file = paste(dir,'/', jname, '.bsub.out', sep = ''),
            err.file = paste(dir, '/', jname, '.bsub.err', sep = ''),
            exit_flag = NA, term_flag = NA, started = NA, reported = NA, hours_elapsed = NA, max_mem = NA, cpu_time = NA,
            success = NA,
            job_type = NA,
            stringsAsFactors = F);
    }

    fn = paste(dir, jname, '.bsub.out', sep = '')
    fn.err = paste(dir, jname, '.bsub.err', sep = '')
    fn.report = paste(dir, jname, '.bsub.report', sep = '')
    fn.report.sge = paste(dir, jname, '.bsub.report', sep = '')

    mtime = data.table(out = file.info(fn)$mtime, err = file.info(fn.err)$mtime, report = file.info(fn.report)$mtime, report.sge = file.info(fn.report.sge)$mtime)
    mtime[, report := pmax(report, report.sge, na.rm = TRUE)]

    ## we can use the report if the report exists and is younger than both the err and out
    ## or if (somehow) the err and out don't exist but the report does
    fn.rep.ex = mtime[ ,ifelse(!is.na(report), ifelse(!is.na(err) | is.na(out), pmin(report>err, report>out, na.rm = TRUE), FALSE), FALSE)] & !force

    if (any(fn.rep.ex)){
        outs[fn.rep.ex, ] = do.call(rbind, lapply(fn.report[fn.rep.ex], read.delim, strings = FALSE))[, names(outs)]
    }

    ## fn.ex these are the ones we need to parse again
    fn.ex = (file.exists(fn) | file.exists(fn.err)) & !fn.rep.ex;

    if (!any(fn.ex)){
        return(outs)
    }

    tmp = matrix(unlist(mclapply(which(fn.ex),
        function(i){
            p = pipe(paste('tail -n 100', fn[i]))
            y = readLines(p);
            close(p)
            p = pipe(paste('head -n 100', fn[i]))
            sge = grep('FLOW', readLines(p), value = TRUE)
            close(p)
            if (any(grepl('^Sender.*LSF System', y))){   ## LSF job
                y = split(y, cumsum(grepl('^Sender', y)))
                y = y[[length(y)]]  ## picks "last" dump from lsf to this out file
                return(c('lsf',
                    c(grep('^Exited with', y, value = T), grep('^Successfully completed', y, value = T), '')[1],
                    c(grep('^TERM', y, value = T), '')[1],
                    c(gsub('Started at ', '', grep('^Started at', y, value = T)), '')[1],
                    c(gsub('Results reported ((at)|(on)) ', '', grep('^Results reported ((at)|(on))', y, value = T), ''))[1],
                    c(gsub('[ ]+CPU time[ ]+\\:[ ]+(.*)[ ]+\\S+', '\\1', grep('^[ ]+CPU time', y, value = T)), '')[1],
                    c(gsub('[ ]+Max Memory[ ]+\\:[ ]+(.*)', '\\1', grep('^[ ]+Max Memory', y, value = T)), '')[1],
                    c(gsub('[ ]+Max Swap[ ]+\\:[ ]+(.*)', '\\1', grep('^[ ]+Max Swap', y, value = T)), '')[1],
                    c(gsub('[ ]+Max Processes[ ]+\\:[ ]+(.*)\\S*', '\\1', grep('^[ ]+Max Processes', y, value = T)), '')[1],
                    c(gsub('[ ]+Max Threads[ ]+\\:[ ]+(.*)\\S*', '\\1', grep('^[ ]+Max Threads', y, value = T)), '')[1]
                ))
            }
            else if (length(sge)>0){
                fn.report.sge = paste(fn.report[i], '.sge', sep = '')
                jobnum = gsub('^FLOW.SGE.JOBID=(.*)', '\\1',  sge[1])
                p = pipe(paste('qacct -j', jobnum[length(jobnum)]))
                tmp = readLines(p)
                close(p)
                vals = structure(str_trim(gsub('^\\S+\\s+(.*)', '\\1', tmp, perl = TRUE)), names = gsub('(^\\S+) .*', '\\1', tmp, perl = TRUE))
                if (length(tmp)>0){
                    write.table(as.data.frame(as.list(vals)), fn.report.sge, sep = '\t', quote = FALSE, row.names = FALSE)
                }
                ## read from file if exists
                else if (file.exists(fn.report.sge[i])){
                    vals = unlist(read.delim(fn.report.sge, stringsAsFactors = FALSE))
                }

                cpuu= gsub('[^a-zA-Z]', '', vals['cpu'])
                memu= gsub('[^a-zA-Z]', '', vals['mem'])
                vmemu= gsub('[^a-zA-Z]', '', vals['maxvmem'])

                return(c('sge',
                    ifelse(vals['exit_status']=='0', 'Successfully completed.', vals['exit_status']),
                    vals['failed'],
                    vals['start_time'],
                    vals['end_time'],
                    as.numeric(gsub('[a-zA-Z]', '', vals['cpu']))*ifelse(grepl('[hH]', vmemu), 3600, ifelse(grepl('m', vmemu), 60, 1)),
                    as.numeric(gsub('[a-zA-Z]', '',  vals['mem']))/ifelse(grepl('MB', vmemu), 1000, ifelse(grepl('KB', vmemu), 1e6, 1)),
                    as.numeric(gsub('[a-zA-Z]', '', vals['maxvmem']))/ifelse(grepl('MB', vmemu), 1000, ifelse(grepl('KB', vmemu), 1e6, 1)),
                    vals['slots'],
                    vals['slots']
                    ))
            }
            ## interpret job as locally run with a /usr/bin/time -v output
            else{
                y = tryCatch(readLines(fn.err[i]), error = function(e) NULL)
                if (is.null(y)){
                    y = readLines(fn[i])
                }
                ix = grep('Command being timed', y)
                if (length(ix)==0){
                    ## fail
                    return(rep(as.character(NA), 10))
                }
                ix = ix[length(ix)] ### only get the last instance
                y = grep('\t.*', y[ix:length(y)][-1], value = TRUE)
                tmp = strsplit(y, '[\\:\t]')
                keyval = structure(str_trim(sapply(tmp, function(x) if (length(x)>2) x[[3]] else NA)),
                    names = sapply(tmp, function(x) if (length(x)>1) x[[2]] else NA))
                etime = file.info(fn.err[i])$mtime
                stime = etime - as.numeric(keyval['User time (seconds)'])
                exit.status = ifelse(keyval['Exit status']==0, 'Successfully completed.', keyval['Exit status'])
                return(c('local', exit.status, NA, as.character(stime), as.character(etime),
                    keyval['User time (seconds)'], keyval['Maximum resident set size (kbytes)'], NA,
                    as.numeric(gsub('\\%', '', keyval['Percent of CPU this job got']))/100, NA))
            }
    }, mc.cores = mc.cores)), ncol = 10, byrow = T)

    colnames(tmp) = c('job.type', 'exit.flag', 'term.flag', 'started', 'reported', 'cpu.time', 'max.memory', 'max.swap', 'max.cpu', 'max.thr')

    .parse.mem = function(mem){
        ix = !is.na(mem)
        out.mem = rep(NA, length(mem))
        if (any(ix)){
            mem = mem[ix]
            tmp = strsplit(mem, '[ ]+')
            tmp.mem = suppressWarnings(as.numeric(sapply(tmp, function(x) x[1])))
            tmp.mem.units = sapply(tmp, function(x) x[2])
            out.mem[ix] = ifelse(is.na(tmp.mem.units), tmp.mem/1e6/4, ## assume time output, which is in kbytes * 4
                                   ifelse(tmp.mem.units == 'MB', tmp.mem/1e3,
                                          ifelse(tmp.mem.units == 'KB', tmp.mem/1e6,
                                                 tmp.mem)))
          }
        return(out.mem)
    }

    ## normalize to GB
    TIME.FORMAT1 = '%a %b %d %H:%M:%S %Y';
    TIME.FORMAT2 = '%Y-%m-%d %H:%M:%S';

    outs$job_type[fn.ex] = tmp[, 'job.type']
    outs$exit_flag[fn.ex] = tmp[, 'exit.flag']
    outs$term_flag[fn.ex] = tmp[, 'term.flag']
    outs$started[fn.ex] = ifelse( outs$job_type[fn.ex]%in% c('lsf','sge'),
        as.character(as.POSIXct(strptime(tmp[, 'started'], TIME.FORMAT1))),
        as.character(as.POSIXct(strptime(tmp[, 'started'], TIME.FORMAT2))))
    outs$reported[fn.ex] = ifelse(outs$job_type[fn.ex]%in% c('lsf','sge'),
        as.character(as.POSIXct(strptime(tmp[, 'reported'], TIME.FORMAT1))),
        as.character(as.POSIXct(strptime(tmp[, 'reported'], TIME.FORMAT2))))
    outs$hours_elapsed = as.numeric(as.POSIXct(outs$reported)-as.POSIXct(outs$started), units = 'hours')
    outs$cpu_time[fn.ex] = suppressWarnings(as.numeric(tmp[, 'cpu.time']))
    outs$max_mem[fn.ex] = ifelse(outs$job_type[fn.ex] == 'sge', as.numeric(tmp[, 'max.memory']), .parse.mem(tmp[, 'max.memory']))

    if (detailed){
        outs$max_swap[fn.ex] = tmp[, 'max.swap']
        outs$max_processes[fn.ex] = tmp[, 'max.processes']
        outs$max_threads[fn.ex] = tmp[, 'max.threads']
    }

    outs$success = ifelse(!is.na(outs$exit_flag), grepl('Success', outs$exit_flag), NA)
    rownames(outs) = dedup(outs$jname)

    ## cache row slices of this report table in the output directories
    ## for easier downstream access
    for (i in which(fn.ex)){
        write.table(outs[i, ], fn.report[i], sep = '\t', quote = F, row.names = FALSE)
    }

    outs = as.data.table(outs)

    if (!is.null(input.jname)){
      outs = outs[, key := input.jname[jname]]
    }
    else{
      outs = outs[, key := jname]
    }

    setkey(outs, 'key')

    return(outs)
}




#' @name xml2task
#' @title Makes a best attempt to convert a firehose xml task configuration into a Task object or .task file
#' @description
#'
#' Takes a path to an xml file of a FH task configuration, module path, and outputs a Task object or writes
#' to a .task file
#'
#' @param path string of pathname to firehose xml task
#' @param module string name of Module (default = NULL)
#' @param out.file string pathname to *.task file (default = NULL)
#' @author Marcin Imielinski
#' @export
xml2task = function(path, module = NULL, out.file = NULL)
{
    if (!requireNamespace("XML", quietly = TRUE)) {
        stop('In order to read xml files you must install the package "XML".')
    }

    tasks = XML::xmlToList(XML::xmlParse(path))
    tasks = tasks[which(names(tasks)=='pipeline-configuration')]

    if (length(tasks)==0){
        stop('No pipeline configurations found in this xml file .. check file')
    }

    out = lapply(tasks, function(task.config){
        if (!is.null(module)){
            if (!is(module, 'Module')){
                module = Module(module)
            }
        }

        out = NULL
        if (!is.null(task.config$outputs)){
            if (length(task.config$outputs)>0){
                outputs = as.data.table(do.call('rrbind', lapply(task.config$outputs, function(x) as.data.frame(rbind(unlist(x))))))

                setnames(outputs, gsub('\\-', '_', names(outputs)))

                outs = outputs[, {list(list(FlowOutput(name = target_annotation_type_name, pattern= paste(expression, '.*', extension, "$", sep = ''))))},
                    keyby = target_annotation_type_name]
            }
        }

        arg = NULL

        if (!is.null(task.config$parameter)){
            if (length(task.config$parameter)>0){
                params = as.data.table(do.call('rrbind', lapply(task.config$"parameter", function(x) as.data.frame(rbind(unlist(x))))))
                setnames(params, gsub('\\-', '_', names(params)))

                if (is.null(params$"default_value")){
                    params$"default_value" = NA
                }

                arg = params[, {if (mode == 'FlowLiteral'){
                                    list(list(FlowLiteral(name = name, arg = expression, path = file.exists(expression))))
                                }
                                else{
                                    list(list(FlowAnnotation(name = name, arg = expression, path = file.exists(expression), default = default_value)))
                                }
                            }, keyby = name]
            }
        }

        if (is(module, 'Module')){
            if (!is.null(arg)){
                if (!is.null(outs)){
                    return(do.call(Task, c(structure(arg[[2]], names = arg[[1]]), list(outputs = structure(outs[[2]], names = outs[[1]]), module = module))))
                }
                else{
                    return(do.call(Task, c(structure(arg[[2]], names = arg[[1]]), list(module = module))))
                }
            }
            else{
                if (!is.null(outs)){
                    return(do.call(Task, list(outputs = structure(outs[[2]]), names = outs[[1]], module = module)))
                }
                else{
                    stop('No inputs or outputs in this task config, malformed task.config file?')
                }
            }
        }
        else{
            warning('Warning: No module provided as input so just dumping mock task .task file to stdout')
            out = paste('#', task.config$"name", task.config$"task-id")
            out = c(out, '/path/to/module/directory')

            if (length(arg)>0){
                out = c(out, paste('input\t', arg[[1]],'\t',
                    ifelse(sapply(arg[[2]], is, 'FlowLiteral'), '"', ''),
                    sapply(arg[[2]], function(x) x@arg),
                    ifelse(sapply(arg[[2]], is, 'FlowLiteral'), '"', ''),
                    '\t',
                    ifelse(sapply(arg[[2]], function(x) x@path), 'path', 'value'),
                    '\t',
                    ifelse(sapply(sapply(arg[[2]], default), is.null), '', sapply(arg[[2]], default)),
                    sep = ''))
            }
            if (length(outs)>0){
                out = c(out,paste('output\t',
                    sapply(outs[[2]], function(x) x@name), '\t',
                    sapply(outs[[2]], function(x) x@pattern)))
            }

            out = c(out, '')
            if (is.null(out.file)){
                writeLines(out)
            }
            else{
                writeLines(out, out.file)
            }

            return(NULL)
        }
    })

    if (all(sapply(out, is.null))){
        cat('')
    }
    else{
        if (length(out)==1){
            out = out[[1]]
        }
        return(out)
    }

}




#' @name file.name
#' @title file.name
#' @description
#'
#' grabs filenames from list of paths
#'
#' @param paths vector of pathnames
#' @author Marcin Imielinski
#' @export
file.name = function(paths)
{
    return(gsub('(^|(.*\\/))?([^\\/]*)', '\\3', paths))
}




#' @name file.dir
#' @title file.dir
#' @description
#'
#' grabs file.dirs from list of paths
#'
#' @param paths vector of pathnames
#' @author Marcin Imielinski
#' @export
file.dir = function(paths)
{
    return(gsub('(^|(.*\\/))?([^\\/]*)$', '\\2', paths))
}




#' @name dedup
#' @title dedup
#' @description
#'
#' relabels duplicates in a character vector with .1, .2, .3
#' (where "." can be replaced by any user specified suffix)
#'
#' @param x vector of files
#' @param suffix string suffix (default = '.')
#' @author Marcin Imielinski
#' @export
dedup = function(x, suffix = '.')
{
  dup = duplicated(x);
  udup = setdiff(unique(x[dup]), NA)
  udup.ix = lapply(udup, function(y) which(x==y))
  udup.suffices = lapply(udup.ix, function(y) c('', paste(suffix, 2:length(y), sep = '')))
  out = x;
  out[unlist(udup.ix)] = paste(out[unlist(udup.ix)], unlist(udup.suffices), sep = '');
  return(out)
}




#' @name more
#' @title more
#'
#' @description
#' "more" +/- grep vector of files
#'
#' @param x vector of files
#' @param grep string to grep in files (default = NULL)
#' @param pipe boolean grep piped input 'x' (default = NULL)
#' @author Marcin Imielinski
#' @export
more = function(x, grep = NULL, pipe = FALSE)
{
    if (is.null(grep)){
        x = paste('more', paste(x, collapse = ' '))
    }
    else{
        x = paste('grep -H', grep, paste(x, collapse = ' '), ' | more')
    }
    if (pipe){
        p = pipe(x)
        out = readLines(p)
        close(p)
        return(out)
    }
    else{
        system(x)
    }
}




#' @name tailf
#' @title tailf
#'
#' @description
#' "tail -f" +/- grep vector of files
#'
#' @param x vector of files
#' @param n integer specific number of lines in file (default = NULL)
#' @param grep string to grep in files (default = NULL)
#' @author Marcin Imielinski
#' @export
tailf = function(x, n = NULL, grep = NULL)
{
    ## create command
    if (is.null(grep)){
        if (is.null(n)){
            x = paste('tail -f', paste(x, collapse = ' '))
        }
        else{
            x = paste('tail -n', n, paste(x, collapse = ' '))
        }
    }
    else{
      x = paste('grep -H', grep, paste(x, collapse = ' '), ' | more')
    }

    ## execute command
    system(x)
}


#' @name reset.job
#' @title reset.job
#'
#' Reset a job with different params
#'
#' @return A Flow job object
#' @author Kevin Hadi
#' @export reset.job
reset.job = function(x, ..., i = NULL, rootdir = x@rootdir, jb.mem = x@runinfo$mem, jb.cores = x@runinfo$cores, jb.time = x@runinfo$time, update_cores = 1, task = NULL) {
    if (!inherits(x, "Job")) stop ("x must be a Flow Job object")
    if (is.null(task))
        usetask = Flow::task(x@task)
    else if (is.character(task) || inherits(task, "Task"))
        usetask = task
    args = list(...)
    new.ent = copy(entities(x))
    if (!is.null(i)) {
        jb.mem = replace(x@runinfo$mem, i, jb.mem)
        jb.cores = replace(x@runinfo$cores, i, jb.cores)
    }
    tsk = viewtask(usetask)
    ## if (!all(names(args) %in% colnames(new.ent)))
    if (!all(names(args) %in% colnames(new.ent)) && !names(args) %in% viewtask(usetask)$V2)
        stop("adding additional column to entities... this function is just for resetting with new arguments")
    for (j in seq_along(args))
    {
        data.table::set(new.ent, i = i, j = names(args)[j], value = args[[j]])
    }
    these.forms = formals(body(findMethods("initialize")$Job@.Data)[[2]][[3]])
    if ("time" %in% names(these.forms)) {
        if ("update_cores" %in% names(these.forms))
            jb = Job(usetask, new.ent, rootdir = rootdir, mem = jb.mem, time = jb.time, cores = jb.cores, update_cores = update_cores)
        else
            jb = Job(usetask, new.ent, rootdir = rootdir, mem = jb.mem, time = jb.time, cores = jb.cores)
    } else {
        if ("update_cores" %in% names(these.forms))
            jb = Job(usetask, new.ent, rootdir = rootdir, mem = jb.mem, cores = jb.cores, update_cores = update_cores)
        else
            jb = Job(usetask, new.ent, rootdir = rootdir, mem = jb.mem, cores = jb.cores)
    }
    return(jb)
}


#' @name silent
#' @title run expression without any printed output
#'
#' execute expression without any output to console.
#' silent({var = function_that_has_explicit_print(...)})
#' 
#'
#' @author Kevin Hadi
#' @param ... an expression
#' @return NULL
#' @export
silent = function(this_expr, this_env = parent.frame()) {
    eval(expr = {capture.output(
            capture.output(... = this_expr,
                           file = "/dev/null",
                           type = c("output")),
            file = "/dev/null",
            type = "message")
    }, envir = this_env)
    invisible()
}

#' or Flow Task object
#'
#'
#' @author Kevin Hadi
#' @return A Flow job object
#' @export
viewtask = function(jb, arglst = c("name", "arg", "default")) {
    lst.emptychar2na = function(x) {
        x[!nzchar(x)] = NA_character_
        x
    }
    lst.zerochar2empty = function(x) {
        x[x == "character(0)"] = list("")
        x
    }
    ifun = function(x, arglst = arglst) {
        
        unlist(lst.emptychar2na(lst.zerochar2empty(lapply(arglst, function(y)
            tryCatch((slot(x, y)), error = function(e) NA_character_)))))
    }
    if (inherits(jb, "Job"))
        obj = jb@task
    else if (inherits(jb, "Task"))
        obj = jb
    else if (inherits(jb, "character"))
        obj = Task(jb)
    as.data.table(data.table::transpose(lapply(obj@args, ifun, arglst = arglst)))
}
mskilab/Flow documentation built on Jan. 12, 2023, 8:31 a.m.