inst/scripts/connectir_bigmats_diff_worker.R

suppressPackageStartupMessages(library("optparse"))


# Make option list
option_list <- list(
    make_option(c("-a", "--amat"), type="character", default=NULL, help="First file-backed big matrix such as the subject distances (use the *.desc extension) (required)", metavar="file"),
    make_option(c("-b", "--bmat"), type="character", default=NULL, help="Second file-backed big matrix (use the *desc extension) (required)", metavar="file"),
    make_option(c("-p", "--forks"), type="integer", default=1, help="Number of computer processors to use in parallel by forking the complete processing stream [default: %default]", metavar="number"),
    make_option(c("-t", "--threads"), type="integer", default=1, help="Number of computer processors to use in parallel by multi-threading matrix algebra operations [default: %default]", metavar="number"),
    make_option("--blocksize", type="integer", default=0, dest="blocksize", help="How many sets of columns or voxels should used in each iteration of computing the element-wise average (0 = auto) [default: %default]", metavar="number"),
    make_option("--memlimit", type="double", default=4, dest="memlimit", help="If blocksize is set to auto (--blocksize=0), then will set the blocksize to use a maximum of RAM specified by this option  [default: %default]", metavar="number"),
    make_option("--overwrite", action="store_true", default=FALSE, help="Overwrite output if it already exists (default is not to overwrite already existing output)"),
    make_option(c("-d", "--debug"), action="store_true", default=FALSE, help="Like verbose but will also print more helpful error messages when --forks is >1"), 
    make_option(c("-v", "--verbose"), action="store_true", default=TRUE, help="Print extra output [default]"),
    make_option(c("-q", "--quiet"), action="store_false", dest="verbose", help="Print little output")
)

# Make class/usage
parser <- OptionParser(usage = "%prog [options] outprefix (extension should not exist for outprefix)\nWill do a-b=c where c is the output", 
                       option_list=option_list, 
                       add_help_option=TRUE)

# Parse
parser_out <- parse_args(parser, positional_arguments = TRUE)
args <- parser_out$args
opts <- parser_out$options

# Check options/arguments
if (length(args) != 1) {
    print_help(parser)
    quit(save="no", status=1)
}

saved_opts <- list(args=args, opts=opts)

tryCatch({
  
  # load connectir
  suppressWarnings(suppressPackageStartupMessages(library("connectir")))

  # parallel processing setup
  set_parallel_procs(opts$forks, opts$threads, opts$verbose)  
  # use foreach parallelization and shared memory?
  parallel_forks <- ifelse(opts$forks == 1, FALSE, TRUE)
  
  ###
  # Check Inputs
  ###
  vcat(opts$verbose, "Checking options")
  
  # check variables
  if (is.null(opts$amat) || is.null(opts$bmat))
      stop("must specify --amat or --bmat")
  if (opts$overwrite)
      stop("Right now the overwrite function isn't implemented")  
  
  ###
  # Setup inputs
  ###
  vcat(opts$verbose, "Setting/checking inputs")
  
  # check/set input
  opts$amat <- abspath(opts$amat)
  opts$bmat <- abspath(opts$bmat)
  if (!file.exists(opts$amat))
      vstop("-a/--amat '%s' does not exist", opts$amat)
  if (!file.exists(opts$bmat))
      vstop("-b/--bmat '%s' does not exist", opts$bmat)
  
  # check/set output
  outprefix <- abspath(args[1])
  if (!file.exists(dirname(args[1]))) {
      vcat(opts$verbose, "Output directory '%s' does not exist, creating", 
           dirname(args[1]))
      dir.create(dirname(args[1]))
  }
  
  if (opts$debug) {
      verbosity <- 2
  } else if (opts$verbose) {
      verbosity <- 1
  } else {
      verbosity <- 0
  }
  
  ###
  # Read inputs
  ###
  
  vcat(opts$verbose, "Reading in inputs")
  
  amat <- attach.big.matrix(opts$amat)
  bmat <- attach.big.matrix(opts$bmat)
  
  # check dimensions
  nr <- nrow(amat); nc <- ncol(amat); type <- typeof(amat)
  if ((nr != nrow(bmat)) || (nc != ncol(bmat))) {
      vstop("Dimensions for -a/--amat (%i,%i) and -b/--bmat (%i,%i) do not match", nr, nc, nrow(bmat), ncol(bmat))
  }
  if (type != typeof(bmat)) {
      vstop("-a/--amat (%s) and -b/--bmat (%s) do not have the same data types", type, typeof(bmat))
  }
  
  ###
  # Setup outputs
  ###
  
  vcat(opts$verbose, "Setup outputs")
  cmat <- big.matrix(nr, nc, type=type, init=NA, 
                     backingfile=paste(basename(outprefix), "_subdist.bin", sep=""), 
                     descriptorfile=paste(basename(outprefix), "_subdist.desc", sep=""),
                     backingpath=dirname(outprefix))  
  gmat <- big.matrix(nr, nc, type=type, init=NA, 
                     backingfile=paste(basename(outprefix), "_subdist_gower.bin", sep=""), 
                     descriptorfile=paste(basename(outprefix), "_subdist_gower.desc", sep=""),
                     backingpath=dirname(outprefix))
  
  ###
  # Subtract / Save
  ###
  vcat(opts$verbose, "Subtracting")
  
  # TODO: automatically determine this based on the memory
  blocksize	<- ifelse(nc < 2500, nc, 2500)
  blocks	<- niftir.split.indices(1, nc, by=blocksize)
  
  for (i in 1:blocks$n) {
      vcat(opts$verbose, "block %i/%i", i, blocks$n)
      
      firstCol <- blocks$start[i]; lastCol <- blocks$ends[i]
	  
	  # Get subsets of inputs and outputs
	  vcat(opts$verbose, "...subset of big matrices")
      sub_amat <- sub.big.matrix(amat, firstCol=firstCol, lastCol=lastCol, 
                                 backingpath=dirname(opts$amat))
      sub_bmat <- sub.big.matrix(bmat, firstCol=firstCol, lastCol=lastCol, 
                                 backingpath=dirname(opts$bmat))
      sub_cmat <- sub.big.matrix(cmat, firstCol=firstCol, lastCol=lastCol, 
                                 backingpath=dirname(outprefix))
	  
	  # Do the subtraction
	  vcat(opts$verbose, "...subtracting a minus b")
	  sub_cmat[,] <- sub_amat[,] - sub_bmat[,]
	  
      # Gower centered matrices
      vcat(opts$verbose, "...gower centering")
      sub_gmat	<- sub.big.matrix(gmat, firstCol=firstCol, lastCol=lastCol, 
                                  backingpath=dirname(outprefix))
      gower.subdist2(sub_cmat, outmat=sub_gmat, verbose=verbosity, parallel=parallel_forks)
	  
	  # Ensure everything was written to the disk
	  vcat(opts$verbose, "...flushing to disk (just in case)")
	  flush(sub_cmat); flush(cmat)
      flush(sub_gmat); flush(gmat)
	  
	  # Free up the memory
	  vcat(opts$verbose, "...freeing memory")
	  rm(sub_amat); rm(sub_bmat); rm(sub_cmat); rm(sub_gmat); gc(FALSE, TRUE)
      amat <- free.memory(amat, dirname(opts$amat))
      bmat <- free.memory(bmat, dirname(opts$bmat))
      cmat <- free.memory(cmat, dirname(outprefix))
      gmat <- free.memory(gmat, dirname(outprefix))
  }  
}, warning = function(ex) {
  cat("\nA warning was detected: \n")
  cat(ex$message, "\n\n")
  cat("Called by: \n")
  print(ex$call)
  cat("\nSaving options...\n")
  save(saved_opts, file="called_options.rda")
}, error = function(ex) {
  cat("\nAn error was detected: \n")
  cat(ex$message, "\n\n")
  cat("Called by: \n")
  print(ex$call)
  cat("\nSaving options...\n")
  save(saved_opts, file="called_options.rda")
}, interrupt = function(ex) {
    cat("\nSaving options...\n")
    save(saved_opts, file="called_options.rda")
    cat("\nKill signal sent. Trying to clean up...\n")
    rm(list=ls())
    gc(FALSE)
    cat("...success\n")
}, finally = {
  cat("\nRemoving everything from memory\n")
  rm(list=ls())
  gc(FALSE)
  cat("...sucesss\n")
})
czarrar/connectir documentation built on Nov. 22, 2020, 12:13 p.m.