seqParallel: Apply Functions in Parallel

Description Usage Arguments Details Value Author(s) See Also Examples

View source: R/Utilities.R

Description

Applies a user-defined function in parallel.

Usage

1
2
3
4
5
seqParallel(cl=seqGetParallel(), gdsfile, FUN,
    split=c("by.variant", "by.sample", "none"), .combine="unlist",
    .selection.flag=FALSE, .initialize=NULL, .finalize=NULL, .initparam=NULL,
    .balancing=FALSE, .bl_size=10000L, .bl_progress=FALSE, ...)
seqParApply(cl=seqGetParallel(), x, FUN, load.balancing=TRUE, ...)

Arguments

cl

NULL or FALSE: serial processing; TRUE: multicore processing (the maximum number of cores minor one); a numeric value: the number of cores to be used; a cluster object for parallel processing, created by the functions in the package parallel, like makeCluster; a BiocParallelParam object from the BiocParallel package. See details

gdsfile

a SeqVarGDSClass object, or NULL

FUN

the function to be applied, should be like FUN(gdsfile, ...) or FUN(...)

split

split the dataset by variant or sample according to multiple processes, or "none" for no split; split="by.variant" by default

.combine

define a fucntion for combining results from different processes; by default, "unlist" is used, to produce a vector which contains all the atomic components, via unlist(..., recursive=FALSE); "list", return a list of results created by child processes; "none", no return; or a function with one or two arguments, like "+"

.selection.flag

TRUE – passes a logical vector of selection to the second argument of FUN(gdsfile, selection, ...)

.initialize

a user-defined function for initializing workers, should have two arguments (process_id, param)

.finalize

a user-defined function for finalizing workers, should have two arguments (process_id, param)

.initparam

parameters passed to .initialize and .initialize

.balancing

load balancing if TRUE

.bl_size

chuck size, the increment for load balancing, 10000 for variants

.bl_progress

if TRUE and .balancing=TRUE, show progress information

x

a vector (atomic or list), passed to FUN

load.balancing

if TRUE, call clusterApplyLB instead of clusterApply

...

optional arguments to FUN

Details

When cl is TRUE or a numeric value, forking techniques are used to create a new child process as a copy of the current R process, see ?parallel::mcfork. However, forking is not available on Windows, and makeCluster is called to make a cluster which will be deallocated after calling FUN.

It is strongly suggested to use seqParallel together with seqParallelSetup. seqParallelSetup could work around the problem of forking on Windows, without allocating clusters frequently.

The user-defined function could use two predefined variables SeqArray:::process_count and SeqArray:::process_index to tell the total number of cluster nodes and which cluster node being used.

seqParallel(, gdsfile=NULL, FUN=..., split="none") could be used to setup multiple streams of pseudo-random numbers, and see nextRNGStream or nextRNGSubStream in the package parallel.

Value

A vector or list of values.

Author(s)

Xiuwen Zheng

See Also

seqSetFilter, seqGetData, seqApply, seqParallelSetup, seqGetParallel

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
library(parallel)

# choose an appropriate cluster size or number of cores
seqParallelSetup(2)


# the GDS file
(gds.fn <- seqExampleFileName("gds"))

# display
(gdsfile <- seqOpen(gds.fn))

# the uniprocessor version
afreq1 <- seqParallel(, gdsfile, FUN = function(f) {
        seqApply(f, "genotype", as.is="double",
            FUN=function(x) mean(x==0, na.rm=TRUE))
    }, split="by.variant")

length(afreq1)
summary(afreq1)


# run in parallel
afreq2 <- seqParallel(, gdsfile, FUN = function(f) {
        seqApply(f, "genotype", as.is="double",
            FUN=function(x) mean(x==0, na.rm=TRUE))
    }, split="by.variant")

length(afreq2)
summary(afreq2)


# check
length(afreq1)  # 1348
all(afreq1 == afreq2)

################################################################
# check -- variant splits

seqParallel(, gdsfile, FUN = function(f) {
        v <- seqGetFilter(f)
        sum(v$variant.sel)
    }, split="by.variant")
# [1] 674 674


################################################################

seqParallel(, NULL, FUN = function() {
        paste(SeqArray:::process_index, SeqArray:::process_count, sep=" / ")
    }, split="none")

seqParallel(, NULL, FUN = function() {
        SeqArray:::process_index
    }, split="none", .combine=function(i) print(i))

seqParallel(, NULL, FUN = function() {
        SeqArray:::process_index
    }, split="none", .combine="+")


################################################################


# close the GDS file
seqClose(gdsfile)

# clear the parallel cluster
seqParallelSetup(FALSE)

SeqArray documentation built on Nov. 8, 2020, 5:08 p.m.