bpiterate: Parallel iteration over an indeterminate number of data...

bpiterateR Documentation

Parallel iteration over an indeterminate number of data chunks

Description

bpiterate iterates over an indeterminate number of data chunks (e.g., records in a file). Each chunk is processed by parallel workers in an asynchronous fashion; as each worker finishes it receives a new chunk. Data are traversed a single time.

When provided with a vector-like argument ITER = X, bpiterate uses bpiterateAlong to produce the sequence of elements X[[1]], X[[2]], etc.

Usage


bpiterate(
    ITER, FUN, ...,
    BPREDO = list(), BPPARAM=bpparam(), BPOPTIONS = bpoptions()
)

## S4 method for signature 'ANY,ANY,missing'
bpiterate(
    ITER, FUN, ...,
    BPREDO = list(), BPPARAM=bpparam(), BPOPTIONS = bpoptions())

## S4 method for signature 'ANY,ANY,BatchtoolsParam'
bpiterate(
    ITER, FUN, ..., REDUCE, init, reduce.in.order=FALSE,
    BPREDO = list(), BPPARAM=bpparam(), BPOPTIONS = bpoptions()
)

bpiterateAlong(X)

Arguments

X

An object (e.g., vector or list) with 'length()' and '[[' methods available.

ITER

A function with no arguments that returns an object to process, generally a chunk of data from a file. When no objects are left (i.e., end of file) it should return NULL and continue to return NULL regardless of the number of times it is invoked after reaching the end of file. This function is run on the master.

FUN

A function to process the object returned by ITER; run on parallel workers separate from the master. When BPPARAM is a MulticoreParam, FUN is 'decorated' with additional arguments and therefore must have ... in the signature.

BPPARAM

An optional BiocParallelParam instance determining the parallel back-end to be used during evaluation, or a list of BiocParallelParam instances, to be applied in sequence for nested calls to bpiterate.

REDUCE

Optional function that combines (reduces) output from FUN. As each worker returns, the data are combined with the REDUCE function. REDUCE takes 2 arguments; one is the current result and the other is the output of FUN from a worker that just finished.

init

Optional initial value for REDUCE; must be of the same type as the object returned from FUN. When supplied, reduce.in.order is set to TRUE.

reduce.in.order

Logical. When TRUE, REDUCE is applied to the results from the workers in the same order the tasks were sent out.

BPREDO

An output from bpiterate with one or more failed elements. This argument cannot be used with BatchtoolsParam

...

Arguments to other methods, and named arguments for FUN.

BPOPTIONS

Additional options to control the behavior of the parallel evaluation, see bpoptions.

Details

Supported for SnowParam, MulticoreParam and BatchtoolsParam.

bpiterate iterates through an unknown number of data chunks, dispatching chunks to parallel workers as they become available. In contrast, other bp*apply functions such as bplapply or bpmapply require the number of data chunks to be specified ahead of time. This quality makes bpiterate useful for iterating through files of unknown length.

ITER serves up chunks of data until the end of the file is reached at which point it returns NULL. Note that ITER should continue to return NULL reguardless of the number of times it is invoked after reaching the end of the file. FUN is applied to each object (data chunk) returned by ITER.

bpiterateAlong() provides an interator for a vector or other object with length() and [[ methods defined. It is used in place of the first argument ITER=

Value

By default, a list the same length as the number of chunks in ITER(). When REDUCE is used, the return is consistent with application of the reduction. When errors occur, the errors will be attached to the result as an attribute errors

Author(s)

Valerie Obenchain mailto:vobencha@fhcrc.org.

See Also

  • bpvec for parallel, vectorized calculations.

  • bplapply for parallel, lapply-like calculations.

  • BiocParallelParam for details of BPPARAM.

  • BatchtoolsParam for details of BatchtoolsParam.

Examples

## A simple iterator
ITER <- bpiterateAlong(1:10)
result <- bpiterate(ITER, sqrt)
## alteernatively, result <- bpiterate(1:10, sqrt)
unlist(result)

## Not run: 
if (require(Rsamtools) && require(RNAseqData.HNRNPC.bam.chr14) &&
    require(GenomicAlignments) && require(ShortRead)) {

  ## ----------------------------------------------------------------------
  ## Iterate through a BAM file
  ## ----------------------------------------------------------------------

  ## Select a single file and set 'yieldSize' in the BamFile object.
  fl <- RNAseqData.HNRNPC.bam.chr14_BAMFILES[[1]]
  bf <- BamFile(fl, yieldSize = 300000)

  ## bamIterator() is initialized with a BAM file and returns a function.
  ## The return function requires no arguments and iterates through the
  ## file returning data chunks the size of yieldSize.
  bamIterator <- function(bf) {
      done <- FALSE
      if (!isOpen( bf))
	  open(bf)

      function() {
	  if (done)
	      return(NULL)
	  yld <- readGAlignments(bf)
	  if (length(yld) == 0L) {
	      close(bf)
	      done <<- TRUE
	      NULL
	  } else yld
      }
  }

  ## FUN counts reads in a region of interest.
  roi <- GRanges("chr14", IRanges(seq(19e6, 107e6, by = 10e6), width = 10e6))
  counter <- function(reads, roi, ...) {
      countOverlaps(query = roi, subject = reads)
  }

  ## Initialize the iterator.
  ITER <- bamIterator(bf)

  ## The number of chunks returned by ITER() determines the result length.
  bpparam <- MulticoreParam(workers = 3)
  ## bpparam <- BatchtoolsParam(workers = 3), see ?BatchtoolsParam
  bpiterate(ITER, counter, roi = roi, BPPARAM = bpparam)

  ## Re-initialize the iterator and combine on the fly with REDUCE:
  ITER <- bamIterator(bf)
  bpparam <- MulticoreParam(workers = 3)
  bpiterate(ITER, counter, REDUCE = sum, roi = roi, BPPARAM = bpparam)

  ## ----------------------------------------------------------------------
  ## Iterate through a FASTA file
  ## ----------------------------------------------------------------------

  ## Set data chunk size with 'n' in the FastqStreamer object.
  sp <- SolexaPath(system.file('extdata', package = 'ShortRead'))
  fl <- file.path(analysisPath(sp), "s_1_sequence.txt")

  ## Create an iterator that returns data chunks the size of 'n'.
  fastqIterator <- function(fqs) {
      done <- FALSE
      if (!isOpen(fqs))
	  open(fqs)

      function() {
	  if (done)
	      return(NULL)
	  yld <- yield(fqs)
	  if (length(yld) == 0L) {
	      close(fqs)
	      done <<- TRUE
	      NULL
	  } else yld
      }
  }

  ## The process function summarizes the number of times each sequence occurs.
  summary <- function(reads, ...) {
       ShortRead::tables(reads, n = 0)$distribution
  }

  ## Create a param.
  bpparam <- SnowParam(workers = 2)

  ## Initialize the streamer and iterator.
  fqs <- FastqStreamer(fl, n = 100)
  ITER <- fastqIterator(fqs)
  bpiterate(ITER, summary, BPPARAM = bpparam)

  ## Results from the workers are combined on the fly when REDUCE is used.
  ## Collapsing the data in this way can substantially reduce memory
  ## requirements.
  fqs <- FastqStreamer(fl, n = 100)
  ITER <- fastqIterator(fqs)
  bpiterate(ITER, summary, REDUCE = merge, all = TRUE, BPPARAM = bpparam)

  }

## End(Not run)

Bioconductor/BiocParallel documentation built on Oct. 31, 2024, 6:58 a.m.