R/mclapply.R

Defines functions mclapply

# -*- ess-indent-offset: 4 -*-

## https://raw.githubusercontent.com/wch/r-source/trunk/src/library/parallel/R/unix/mclapply.R
#  File src/library/parallel/R/unix/mclapply.R
#  Part of the R package, https://www.R-project.org
#
#  Copyright (C) 1995-2019 The R Core Team
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  A copy of the GNU General Public License is available at
#  https://www.R-project.org/Licenses/

### Derived from multicore version 0.1-6 by Simon Urbanek

##  Modified by David C. Norris, for progress reporting
#' @importFrom utils getFromNamespace
mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
                     mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
                     mc.cleanup = TRUE, mc.allow.recursive = TRUE,
                     progreport = NULL,
                     progmetric = length,
                     proginit = 0L,
                     affinity.list = NULL)
{
    if (.Platform$OS.type == "windows") # On Windows, which lacks fork(),
        return(lapply(X, FUN, ...))     # mclapply is just lapply.

    .check_ncores <- getFromNamespace('.check_ncores', 'parallel')
    isChild <- getFromNamespace('isChild', 'parallel')
    mcaffinity <- parallel::mcaffinity
    prepareCleanup <- getFromNamespace('prepareCleanup', 'parallel')
    processID <- getFromNamespace('processID', 'parallel')
    mcfork <- getFromNamespace('mcfork', 'parallel')
    mcexit <- getFromNamespace('mcexit', 'parallel')
    sendMaster <- getFromNamespace('sendMaster', 'parallel')
    selectChildren <- getFromNamespace('selectChildren', 'parallel')
    readChild <- getFromNamespace('readChild', 'parallel')
    cleanup <- getFromNamespace('cleanup', 'parallel')
    mcparallel <- parallel::mcparallel
    mccollect <- parallel::mccollect
    mc.reset.stream <- parallel::mc.reset.stream
    mc.advance.stream <- getFromNamespace('mc.advance.stream', 'parallel')
    mc.set.stream <- getFromNamespace('mc.set.stream', 'parallel')
    closeStdout <- getFromNamespace('closeStdout', 'parallel')

    ## As a convenience, allow client code to omit `mc.preschedule = FALSE`
    ## when requesting progress reporting. (Typically, we will be requesting
    ## progress reports only in those circumstances where the workload lacks
    ## the uniformity & predictability needed for efficient prescheduling.)
    if (missing(mc.preschedule) && !is.null(progreport))
      mc.preschedule <- FALSE

    cores <- as.integer(mc.cores)
    if((is.na(cores) || cores < 1L) && is.null(affinity.list))
        stop("'mc.cores' must be >= 1")
    .check_ncores(cores)

    if (isChild() && !isTRUE(mc.allow.recursive))
        return(lapply(X = X, FUN = FUN, ...))

    ## Follow lapply
    if(!is.vector(X) || is.object(X)) X <- as.list(X)
    if(!is.null(affinity.list) && length(affinity.list) < length(X))
        stop("affinity.list and X must have the same length")

    if(mc.set.seed) mc.reset.stream()
    if(length(X) < 2) {
        old.aff <- mcaffinity()
        mcaffinity(affinity.list[[1]])
        res <- lapply(X = X, FUN = FUN, ...)
        mcaffinity(old.aff)
        return(res)
    }

    if (length(X) < cores) cores <- length(X)
    if (cores < 2L && is.null(affinity.list))
	return(lapply(X = X, FUN = FUN, ...))

    jobs <- list()
    ## all processes created from now on will be terminated by cleanup
    prepareCleanup()
    on.exit(cleanup(mc.cleanup))
    if (!mc.preschedule) {              # sequential (non-scheduled)
        FUN <- match.fun(FUN)
        if (length(X) <= cores && is.null(affinity.list)) { # we can use one-shot parallel
            ## TODO: Implement progress reporting in this case, too. This is reasonable
            ##       because the jobs might have varying durations, and it will be nice
            ##       to get confirmation of progress from the moment earliest results
            ##       become available.
            jobs <- lapply(seq_along(X),
                           function(i) mcparallel(FUN(X[[i]], ...),
                                                  name = names(X)[i],
                                                  mc.set.seed = mc.set.seed,
                                                  silent = mc.silent))
            res <- mccollect(jobs)
            if (length(res) == length(X)) names(res) <- names(X)
            has.errors <- sum(sapply(res, inherits, "try-error"))
        } else { # more complicated, we have to wait for jobs selectively
            sx <- seq_along(X)
            res <- vector("list", length(sx))
            names(res) <- names(X)
            fin <- rep(FALSE, length(X)) # values finished
            if (!is.null(affinity.list)) {
                ## build matrix for job mapping with affinity.list
                ## entry i,j is true if item i is allowed to run on core j
                cores <- max(unlist(x = affinity.list, recursive = TRUE))
                d0 <- logical(cores)
                cpu.map <- lapply(sx, function (i){
                    data <- d0
                    data[as.vector(affinity.list[[i]])] <- TRUE
                    data
                })
                ava <- do.call(rbind, cpu.map)
            } else {
                ## build matrix for job mapping without affinity.list
                ## all entries true
                ava <- matrix(TRUE, nrow = length(X), ncol = cores)
            }
            jobid <- integer(cores)
            ## choose first job for each core to start
            for (i in 1:cores) {
                jobid[i] <- match(TRUE, ava[,i]) # = which(ava[, i])[1]
                ava[jobid[i],] <- FALSE
            }
            ## remove unused cores from matrix
            if(anyNA(jobid)) {
                unused <- which(is.na(jobid))
                jobid <- jobid[-unused]
                ava   <- ava[, -unused, drop = FALSE]
            }
	    ## NOTE: we have to wrap the result in list() since readChild()
	    ## doesn't serialize raw vectors so if the result is a raw
	    ## it won't be serialized and we can't tell (see #17779)
	    ## This also allows us to distinguish try() in user code vs
	    ## our own (eventually)
            jobs <- lapply(jobid,
                           function(i) mcparallel(list(FUN(X[[i]], ...)),
                                                  mc.set.seed = mc.set.seed,
                                                  silent = mc.silent,
                                                  mc.affinity = affinity.list[[i]]))
            jobsp <- processID(jobs)
            has.errors <- 0L
            delivered.result <- 0L
            prog.sofar <- proginit
            while (!all(fin)) {
                s <- selectChildren(jobs[!is.na(jobsp)], -1)
                if (is.null(s)) break   # no children -> no hope (should not happen)
                if (is.integer(s))
                    for (ch in s) {
                        ji <- match(TRUE, jobsp == ch)
                        ci <- jobid[ji]
                        r <- readChild(ch)
                        if (is.raw(r)) {
                            child.res <- unserialize(r)
                            if (inherits(child.res, "try-error"))
                                has.errors <- has.errors + 1L
			    ## unwrap the result
			    if (is.list(child.res)) child.res <- child.res[[1]]
                            ## we can't just assign it since a NULL
                            ## assignment would remove it from the list
                            if (!is.null(child.res)) res[[ci]] <- child.res
                            delivered.result <- delivered.result + 1L
                            if(!is.null(progreport)) {
                                prog <- progmetric(child.res)
                                prog.sofar <- prog.sofar + prog
                                progreport(prog.sofar)
                            }
                        } else {
                            fin[ci] <- TRUE
                            ## the job has finished, so we must not run
                            ## select on its fds again
                            jobsp[ji] <- jobid[ji] <- NA
                            if (any(ava)) { # still something to do,
                                ## look for first job which is allowed to
                                ## run on the now idling core and spawn it
                                nexti <- which.max(ava[, ji])
                                if(!is.na(nexti)) {
                                    jobid[ji] <- nexti
                                    jobs[[ji]] <- mcparallel(list(FUN(X[[nexti]], ...)),
                                                             mc.set.seed = mc.set.seed,
                                                             silent = mc.silent,
                                                             mc.affinity = affinity.list[[nexti]])
                                    jobsp[ji] <- processID(jobs[[ji]])
                                    ava[nexti,] <- FALSE
                                }
                            }
                        }
                    }
            }
            nores <- length(X) - delivered.result
            if (nores > 0)
                warning(sprintf(ngettext(nores,
                                         "%d parallel function call did not deliver a result",
                                         "%d parallel function calls did not deliver results"),
                                nores),
                        domain = NA)
        }
        if (has.errors)
            warning(gettextf("%d function calls resulted in an error",
                             has.errors), domain = NA)
        return(res)
    }

    ## mc.preschedule = TRUE from here on.
    if(!is.null(progreport))
        warning("'mc.preschedule' must be false if progress reporting is used")
    if(!is.null(affinity.list))
        warning("'mc.preschedule' must be false if 'affinity.list' is used")
    sindex <- lapply(seq_len(cores),
                     function(i) seq(i, length(X), by = cores))
    schedule <- lapply(seq_len(cores),
                       function(i) X[seq(i, length(X), by = cores)])
    ch <- list()
    res <- vector("list", length(X))
    names(res) <- names(X)
    cp <- rep(0L, cores)
    fin <- rep(FALSE, cores)
    dr <- rep(FALSE, cores)
    inner.do <- function(core) {
        S <- schedule[[core]]
        f <- mcfork()
        if (isTRUE(mc.set.seed)) mc.advance.stream()
        if (inherits(f, "masterProcess")) { # this is the child process
            on.exit(mcexit(1L, structure("fatal error in wrapper code", class="try-error")))
            if (isTRUE(mc.set.seed)) mc.set.stream()
            if (isTRUE(mc.silent)) closeStdout(TRUE)
            sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
            mcexit(0L)
        }
        jobs[[core]] <<- ch[[core]] <<- f
        cp[core] <<- processID(f)
        NULL
    }
    job.res <- lapply(seq_len(cores), inner.do)
    ac <- cp[cp > 0]
    has.errors <- integer(0)
    while (!all(fin)) {
        s <- selectChildren(ac[!fin], -1)
        if (is.null(s)) break # no children -> no hope we get anything (should not happen)
        if (is.integer(s))
            for (ch in s) {
                a <- readChild(ch)
                if (is.integer(a)) {
                    core <- which(cp == a)
                    fin[core] <- TRUE
                } else if (is.raw(a)) {
                    core <- which(cp == attr(a, "pid"))
                    job.res[[core]] <- ijr <- unserialize(a)
                    if (inherits(ijr, "try-error"))
                        has.errors <- c(has.errors, core)
                    dr[core] <- TRUE
                } else if (is.null(a)) {
                    # the child no longer exists (should not happen)
                    core <- which(cp == ch)
                    fin[core] <- TRUE
                }
            }
    }
    for (i in seq_len(cores)) {
        this <- job.res[[i]]
        if (inherits(this, "try-error")) { ## length-1 result
            for (j in sindex[[i]]) res[[j]] <- this
        } else
            ## we can't just assign it since a NULL
            ## assignment would remove it from the list
            if (!is.null(this)) res[sindex[[i]]] <- this
    }
    nores <- cores - sum(dr)
    if (nores > 0)
        warning(sprintf(ngettext(nores,
                                 "scheduled core %s did not deliver a result, all values of the job will be affected",
                                 "scheduled cores %s did not deliver results, all values of the jobs will be affected"),
                        paste(which(dr == FALSE), collapse = ", ")),
                domain = NA)
    if (length(has.errors)) {
        if (length(has.errors) == cores)
            warning("all scheduled cores encountered errors in user code")
        else
            warning(sprintf(ngettext(length(has.errors),
                                     "scheduled core %s encountered error in user code, all values of the job will be affected",
                                     "scheduled cores %s encountered errors in user code, all values of the jobs will be affected"),
                            paste(has.errors, collapse = ", ")),
                    domain = NA)
    }
    res
}
dcnorris/precautionary documentation built on Feb. 5, 2022, 12:13 p.m.