R/comm.fread.r

Defines functions comm.rebalance.df check_sum comm.fread

Documented in comm.fread

#' comm.fread
#'
#' Given a directory, \code{comm.fread()} reads all csv files contained
#' in it in parallel with available resources. 
#'
#' @param dir
#' A directory containing the files desired to be read.  The directory
#' should be accessible to all readers.
#' @param pattern
#' The pattern for files desired to be read.
#' @param shcom 
#' Additional shell command passed to \code{fread}'s \code{com} parameter as: 
#' \code{fread(cmd = paste(shcom, file)}, where file is assigned based on rank.
#' For example if \code{grep <pattern> <file>} is needed, 
#' set \code{shcom = "grep <pattern>"}.
#' (Lustre note: Our Lustre file system had an odd interaction on first reads 
#' of a 5 GB file with \code{fread} and worked 5x faster with 
#' \code{shcom = "cat"} than without it. Second reads were 10x faster. No such
#' slowdown was observed on NFS file systems).
#' @param readers
#' The number of readers.
#' @param verbose
#' Determines the verbosity level. Acceptable values are 0, 1, 2, and 3 for
#' least to most verbosity.
#' @param ...
#' Additional arguments to be passed to \code{data.table::fread()}.
#' 
#' @details
#' Each MPI rank reads different but
#' entire files. Best load balance is achieved when the number of files is 
#' divisible by the number of ranks and the files are approximately the same 
#' size. All files are assumed to contain the same columns. See note for 
#' parameter \code{shcom} if you are working with a Lustre parallel file 
#' system.
#'
#' @return
#' TODO
#'
#' @examples
#' \dontrun{
#' ### Save code in a file "demo.r" and run with 2 processors by
#' ### SHELL> mpiexec -np 2 Rscript demo.r
#' library(pbdMPI)
#' library(pbdIO)
#'
#' path <- "/tmp/read"
#' comm.print(dir(path))
#' ## [1] "a.csv" "b.csv"
#'
#' X <- comm.fread(path)
#'
#' comm.print(X, all.rank=TRUE)
#' ## COMM.RANK = 0
#' ##    a b c
#' ## 1: 1 2 3
#' ## COMM.RANK = 1
#' ##    a b c
#' ## 1: 2 3 4
#'
#' finalize()
#' }
#'
#' @importFrom data.table fread rbindlist
#' @export
comm.fread <- function(dir, pattern="*.csv$", shcom = NULL, readers=comm.size(),
                       verbose=0, ...) {
    if(verbose) print(c(as.list(environment()), list(...)))
    if (!is.character(dir) || length(dir) != 1 || is.na(dir))
        comm.stop("argument 'dir' must be a string")
    if (!is.character(pattern) || length(pattern) != 1 || is.na(pattern))
        comm.stop("argument 'pattern' must be a string")
    if (!is.numeric(readers) || length(readers) != 1 || is.na(readers))
        comm.stop("argument 'readers' must be an integer")
    if (!(verbose %in% 0:3))
        comm.stop("argument 'verbose' must be 0, 1, 2, or 3")

    if(verbose) a <- deltime()
    files <- file.info(list.files(dir, pattern=pattern, full.names=TRUE))

    if (NROW(files) == 0)
        comm.stop(paste("Directory", dir,
                        "contains no files matching pattern", pattern))

    sizes <- files$size
    my_rank <- comm.rank()
    my_files <- comm.chunk(nrow(files), p=readers, lo.side="right",
                           form="vector")
    if(verbose) comm.cat("rank:", my_rank, "files:", 
                         paste(my_files, collapse = " "), 
                         all.rank=TRUE, quiet = TRUE)
    if(verbose > 1) for(ifile in my_files)
                      cat(my_rank, rownames(files)[ifile], "\n")

    ## now fread all my_files and bind into one local data.frame
    ## FIXME Currently, there is sometimes a problem with having fewer
    ##       files than ranks and readers = 1. Probably in fread with
    ##       a null file name.
    if(is.null(shcom)) {
      l <- lapply(rownames(files)[my_files], function(file)
          suppressWarnings(fread(file, showProgress=FALSE, ...)))
    } else {
      l <- lapply(rownames(files)[my_files], function(file)
        suppressWarnings(fread(cmd = paste(shcom, file),
                               showProgress=FALSE, ...)))
    }
    X <- rbindlist(l)

    # TODO if empty? Is length(X) is zero enough?
    ## rank 0 always reads, so it has all attributes. Propagate to NULLs.
    X0 <- bcast(X[0])
    if(length(X) == 0) X <- X0

    if(verbose) a <- deltime(a, "T    component fread time:")

    if(verbose) {
        nrow_have <- unlist(allgather(nrow(X)))
        comm.cat("nrow_have:", nrow_have, "\n")
    }

    X
}

check_sum <- function(X) {
    ## Report variable sums to check input
    my_numeric <- sapply(X, is.numeric)
    Xnumeric <- which(allreduce(my_numeric, op="land"))
    colSums(X[, Xnumeric, with=FALSE], na.rm=TRUE)
}

## TODO document and export this function
comm.rebalance.df <- function(X, verbose=0, ...) {
    ## Data frame X has unequal number of rows across ranks. This function
    ##   balances the rows by sending rows from ranks that have too many to
    ##   ranks that have too few.
    ##
    ## TODO makes copies with rbind(). Should this go into copyless C?
    ##
    my_rank <- comm.rank()
    nrow_have <- unlist(allgather(nrow(X)))
    N <- sum(nrow_have)

    ## TODO Three nrow_ vectors can be one with a bit more logic
    nrow_want <- comm.chunk(N, form="number", # type="equal",
                             all.rank=TRUE, ...)
    nrow_send <- pmax(nrow_have - nrow_want, 0)
    nrow_recv <- pmax(nrow_want - nrow_have, 0)
    if(verbose > 1) {
        comm.cat("nrow_have:", nrow_have, "\n")
        comm.cat("nrow_want:", nrow_want, "\n")
        comm.cat("nrow_send:", nrow_send, "\n")
        comm.cat("nrow_recv:", nrow_recv, "\n")
    }

    ## get global numeric column sums for error checking
    if(verbose > 2) before_sums <- check_sum(X)

    while(sum(nrow_send)) {
        recv_i <- 0
        senders <- (1:comm.size())[nrow_send > 0]
        for(proc_send in senders) {
            ## senders and receivers start from 1. Do -1 for rank!
            receivers <- (1:comm.size())[nrow_recv > 0]
            if(recv_i < length(receivers)) {
                recv_i <- recv_i + 1
                count_s <- nrow_send[proc_send]
                count_r <- nrow_recv[receivers[recv_i]]
                count <- min(count_s, count_r)
                if(my_rank + 1 == receivers[recv_i]) {
                    ## receivers and senders are disjoint sets
                    buffer <- matrix(NA, count, ncol(X))
                    buffer <- recv(buffer, rank.source=proc_send - 1)
                    ## can not use irecv because rbind follows!!
                    X <- rbind(X, buffer)
                }
                if(my_rank + 1 == proc_send) {
                    ## but two senders can be sending to same receiver
                    isend(X[1:count, ], rank.dest=receivers[recv_i] - 1)
                    X <- X[-(1:count), ]
                }
                nrow_recv[receivers[recv_i]] <- count_r - count
                nrow_send[proc_send] <- count_s - count
            }
        }
    }

    ## check if global column sums have not changed
    if(verbose > 2) {
        after_sums <- check_sum(X)
        equal <- all.equal(allreduce(before_sums), allreduce(after_sums))
        comm.cat("checksum equal:", equal, "on", ncol(X), "columns\n")
    } else if(verbose > 1) {
        nrow_have <- unlist(allgather(nrow(X)))
        comm.cat("nrow_have:", nrow_have, "\n")
    }

    X
}
go-ski/pbdIO documentation built on July 28, 2023, 11:37 a.m.