R/applyTallies.R

Defines functions rerunBatchTallies batchTallies .waitAndCollect .resubmitProblematicJobs .writeToTallyFile collectTallies .setUpJobs defineBlocks batchTallyParam mergeTallies tallyRangesToFile applyTallies

Documented in applyTallies batchTallies batchTallyParam collectTallies defineBlocks mergeTallies rerunBatchTallies tallyRangesToFile

applyTallies <- function(
  bamfiles, chrom, start, stop, q=25, ncycles = 0, max.depth=1000000,
  prepForHDF5 = TRUE, reference = NULL
  ){
  tmp = bplapply( bamfiles, 
      function(fn, chrom, start, stop, q, ncycles, max.depth){
          library(h5vc)
          tallyBAM( fn, chrom, start, stop, q, ncycles, max.depth )
      }, chrom=chrom, start=start, stop=stop, q=q, ncycles=ncycles,
      max.depth=max.depth)
  if( prepForHDF5 ){
    tmp <- bplapply(tmp, 
        function(counts, reference) {
            library(h5vc)
            prepareForHDF5(counts, reference)
        }, reference = reference)
    return( mergeTallies( tmp ) )
  }else{
    return(tmp)
  }
}

tallyRangesToFile <- function(tallyFile, study, bamfiles, ranges, reference, samples = NULL, q = 25, ncycles = 0, max.depth=1e6){
  writeToTallyFile(tallyRanges(bamfiles, ranges, reference, q, ncycles, max.depth), tallyFile, study, ranges, samples = NULL)  
}

mergeTallies <- function( tallies ){
  ret = list()
  l <- lapply( tallies, function(x) x$Counts)
  l$along = 1.5
  ret$Counts <- do.call( abind, l )
  l <- lapply( tallies, function(x) x$Coverages)
  l$along = 0
  ret$Coverages <- do.call( abind, l )
  l <- lapply( tallies, function(x) x$Deletions)
  l$along = ifelse(length(dim(l[[1]])) > 2, 1.5, 0)
  ret$Deletions <- do.call( abind, l  )
  ret$Reference <- tallies[[1]]$Reference
  return(ret)
}

batchTallyParam <- function(
  bamFiles,
  destination,
  group,
  chrom, start, stop,
  blocksize = 100000,
  registryDir = tempdir(),
  resources = list("queue" = "research-rh6", "memory"="4000", "ncpus"="4", walltime="90:00"),
  q=25, ncycles = 0, max.depth=1000000,
  reference = NULL,
  sleep = 5
  ){
  return(list(
    bamFiles = bamFiles,
    destination = destination,
    group = group,
    chrom = chrom,
    start = start,
    stop = stop,
    blocksize = blocksize,
    registryDir = registryDir,
    resources = resources,
    q = q, ncycles = ncycles, max.depth = max.depth,
    reference = reference,
    sleep = sleep
  ))
}

defineBlocks <- function( start, stop, blocksize ){
  if( stop - start < blocksize - 1){
    stop( paste("Selected blocksize (", blocksize, ") is larger than the length of the interval [", start, ", ", stop, "]. Reduce blocksize accordingly.") )
  }
  blockstarts <- seq(start, stop, blocksize)
  blockends <- seq(start + blocksize - 1, stop, blocksize)
  if( stop %% blocksize == 0){
    if(length(blockstarts) > 1)
      blockstarts <- blockstarts[-length(blockstarts)]
  }else{
    blockends <- c( blockends, stop )
  }
  return( data.frame( Start = blockstarts, End = blockends, ID = seq(length = length(blockends)) ))
}

.setUpJobs <- function( blocks, confList, sleep = 0.25 ){
  registries <- list()
  for( blockID in seq(length = nrow(blocks)) ){
    regID <- paste( "chr", gsub(pattern="\\.", replacement="_", confList$chrom), "block", blocks$ID[blockID], sep = "" )
    registries[[regID]] <- makeRegistry(id=regID, seed=123, file.dir=file.path(confList$registryDir, regID), packages = c("h5vc"))
    ref <- confList$reference[(blocks$Start[blockID] - confList$start + 1):(blocks$End[blockID] - confList$start + 1)]
    batchMap(registries[[regID]], tallyBAM, confList$bamFiles, more.args = list( "chr" = confList$chrom, "start" = blocks$Start[blockID], "stop" = blocks$End[blockID], "reference" = ref ) )
    submitJobs( registries[[regID]], resources = confList$resources )
    print(paste( "Block #", blockID, " submitted." ))
    Sys.sleep(sleep)# sleep a while
  }
  return(registries)
}

collectTallies <- function(blocks, confList, registries ){
  for( regID in names(registries)){
    print(paste("collecting data from registry:", regID))
    results <- loadResults( registries[[regID]] )
    blockID <- as.numeric(strsplit(regID, split = "block")[[1]][2])
    print("Merging samples")
    results <- mergeTallies(results)
    print(paste("Writing to", confList$destination, "-", confList$group))
    startpos <- blocks$Start[blocks$ID == blockID]
    endpos <- blocks$End[blocks$ID == blockID]
    h5write( results$Counts, confList$destination, paste( confList$group, "Counts", sep = "/" ), index = list( NULL, NULL, NULL, startpos:endpos ) )
    h5write( results$Coverages, confList$destination, paste( confList$group, "Coverages", sep = "/" ), index = list( NULL, NULL, startpos:endpos ) )
    h5write( results$Deletions, confList$destination, paste( confList$group, "Deletions", sep = "/" ), index = list( NULL, NULL, startpos:endpos ) )
    h5write( results$Reference, confList$destination, paste( confList$group, "Reference", sep = "/" ), index = list( startpos:endpos ) )
    print("done ... cleaning up")
    unlink(registries[[regID]]$file.dir, recursive=TRUE) #remove registry files completely
  }
}

.writeToTallyFile <- function( registry, regID, blocks, confList ){
  results <- loadResults( registry )
  blockID <- as.numeric(strsplit(regID, split = "block")[[1]][2])
  results <- mergeTallies(results)
  startpos <- blocks$Start[blocks$ID == blockID]
  endpos <- blocks$End[blocks$ID == blockID]
  h5write( results$Counts, confList$destination, paste( confList$group, "Counts", sep = "/" ), index = list( NULL, NULL, NULL, startpos:endpos ) )
  h5write( results$Coverages, confList$destination, paste( confList$group, "Coverages", sep = "/" ), index = list( NULL, NULL, startpos:endpos ) )
  h5write( results$Deletions, confList$destination, paste( confList$group, "Deletions", sep = "/" ), index = list( NULL, NULL, startpos:endpos ) )
  h5write( results$Reference, confList$destination, paste( confList$group, "Reference", sep = "/" ), index = list( startpos:endpos ) )
}

.resubmitProblematicJobs <- function(reg, confList, verbose = TRUE){
  resubmitters <- unique( c( findExpired(reg), findErrors(reg) ) )
  if(length(resubmitters) > 0){
    if(verbose){
      print("Resubmitting Jobs from Registry:")
      showStatus(reg)
    }
    submitJobs(reg, resubmitters, resources = confList$resources)
    return(TRUE)  
  }else{
    return(FALSE)
  }
}

.waitAndCollect <- function(blocks, confList, registries, sleep = 5, nTries = 5){
  expired <- rep( nTries, length(registries) )
  names(expired) <- names(registries)
  while( length(registries) > 0 ){
    Sys.sleep(sleep)
    print(paste("Still", length(registries), "blocks remaining"))
    for( regID in names(registries) ){
      if( length(findDone(registries[[regID]])) == length(confList$bamFiles)){
        #print(paste("Processing:", regID))
        .writeToTallyFile(registries[[regID]], regID, blocks, confList)
        unlink(registries[[regID]]$file.dir, recursive=TRUE) #remove registry files completely
        registries[[regID]] <- NULL
        break;
      }else{
        if(.resubmitProblematicJobs( registries[[regID]], confList )){
          break
        }
      }
    }
  }
}

batchTallies <- function(
  confList = batchTallyParam()
  ){
  if( confList$start <= 0 ){
    confList$start = 1
  }
  blocks <- defineBlocks( confList$start, confList$stop, confList$blocksize )
  registries <- .setUpJobs( blocks, confList )
  .waitAndCollect(blocks, confList, registries)
}

rerunBatchTallies <- function( confList, tryCollect = TRUE ){
  if( confList$start <= 0 ){
    confList$start = 1
  }
  blocks <- defineBlocks( confList$start, confList$stop, confList$blocksize )
  blocksToReRun <- grep( paste("chr", confList$chrom, "block", sep = ""), list.files(confList$registryDir), value = TRUE )
  blocks$regID <- paste( "chr", gsub("\\.", "_", confList$chrom), "block", blocks$ID, sep = "" )
  blocks <- subset(blocks, regID %in% blocksToReRun)
  if( tryCollect ){
    print(paste(nrow(blocks), "unfinished ... trying to collect"))
    registries <- lapply( blocksToReRun, function(regID) loadRegistry(file.path( confList$registryDir, regID ) ) )
    names(registries) <- blocksToReRun
    .waitAndCollect( blocks, confList, registries, nTries = 0, sleep = confList$sleep )  
  }
  blocks <- defineBlocks( confList$start, confList$stop, confList$blocksize )
  blocksToReRun <- grep( paste("chr", confList$chrom, "block", sep = ""), list.files(confList$registryDir), value = TRUE )
  blocks$regID <- paste( "chr", confList$chrom, "block", blocks$ID, sep = "" )
  blocks <- subset(blocks, regID %in% blocksToReRun)
  print(paste( "Still", nrow(blocks), "unfinished ... re-starting"))
  unlink( file.path( confList$registryDir, blocksToReRun ), recursive = TRUE )
  registries <- .setUpJobs( blocks, confList )
  .waitAndCollect( blocks, confList, registries, sleep = confList$sleep )}

Try the h5vc package in your browser

Any scripts or data that you put into this service are public.

h5vc documentation built on Nov. 8, 2020, 4:56 p.m.