R/Rparutilities.R

Defines functions mpi.iparApply mpi.parApply mpi.iparCapply mpi.parCapply mpi.iparRapply mpi.parRapply mpi.iparReplicate mpi.parReplicate mpi.iparLapply mpi.parLapply .simplify mpi.iapplyLB .mpi.worker.applyLB mpi.applyLB mpi.iparMM mpi.parMM .splitIndices .docall .mpi.worker.sim mpi.parSim mpi.iapply .mpi.worker.apply mpi.apply tailslave.log mpi.close.Rslaves .mpi.worker.exec mpi.remote.exec mpi.spawn.Rslaves lamhosts slave.hostinfo mpi.hostinfo

Documented in .docall lamhosts mpi.apply mpi.applyLB mpi.close.Rslaves mpi.hostinfo mpi.iapply mpi.iapplyLB mpi.iparApply mpi.iparCapply mpi.iparLapply mpi.iparMM mpi.iparRapply mpi.iparReplicate mpi.parApply mpi.parCapply mpi.parLapply mpi.parMM mpi.parRapply mpi.parReplicate mpi.parSim mpi.remote.exec mpi.spawn.Rslaves .mpi.worker.apply .mpi.worker.applyLB .mpi.worker.exec .mpi.worker.sim .simplify slave.hostinfo .splitIndices tailslave.log

### Copyright (C) 2002 Hao Yu 

mpi.hostinfo <- function(comm=1){
    if (mpi.comm.size(comm)==0){
        err <-paste("It seems no members running on comm", comm)
        stop(err)
    }
    hostname <- mpi.get.processor.name() 
    rk <- mpi.comm.rank(comm=comm)
    size <- mpi.comm.size(comm=comm)
    cat("\tHost:",hostname,"\tRank(ID):",rk, "\tof Size:", size,
        "on comm", comm, "\n")
}

slave.hostinfo <- function(comm=1, short=TRUE){
    #if (!mpi.is.master())
 	if (mpi.comm.rank(comm)!=0)
		stop("cannot run slavehostinfo on slaves")
    size <- mpi.comm.size(comm)
    if (size==0){
        err <-paste("It seems no slaves running on comm", comm)
        stop(err)
    }
    if (size == 1)
    mpi.hostinfo(comm)
    else { 
        master <-mpi.get.processor.name() 
        slavehost <- unlist(mpi.remote.exec(mpi.get.processor.name(),comm=comm))
        slavecomm <- 1 #as.integer(mpi.remote.exec(.comm,comm=comm))
        ranks <- 1:(size-1)
        commm <- paste(comm, ")",sep="")
        if (size > 10){
        rank0 <- paste("master  (rank 0 , comm", commm)
            ranks <- c(paste(ranks[1:9]," ",sep=""), ranks[10:(size-1)])
        }
        else
        rank0 <- paste("master (rank 0, comm", commm)
        cat(rank0, "of size", size, "is running on:",master, "\n")
        slavename <- paste("slave", ranks,sep="")
        ranks <- paste("(rank ",ranks, ", comm ",slavecomm,")", sep="")
		if (short && size > 8){
          for (i in 1:3) {
            cat(slavename[i], ranks[i], "of size",size, 
          "is running on:",slavehost[i], "\n")	
		  }
		  cat("... ... ...\n")
		  for (i in (size-2):(size-1)){
		    cat(slavename[i], ranks[i], "of size",size, 
          "is running on:",slavehost[i], "\n")
		  }
		}
		else {
          for (i in 1:(size-1)){
            cat(slavename[i], ranks[i], "of size",size, 
          "is running on:",slavehost[i], "\n")
          }
		}
    }
}

lamhosts <- function(){
    hosts <- system("lamnodes C -c -n", TRUE)
    base <-character(0)
    for (host in hosts)
        base <- c(base, unlist(strsplit(host, "\\."))[1])
    nn <- 0:(length(hosts)-1)
        names(nn) <- base
    nn
}

mpi.spawn.Rslaves <- 
    function(Rscript=system.file("slavedaemon.R", package="Rmpi"),
    nslaves=mpi.universe.size(),
    root=0,
    intercomm=2,
    comm=1,
    hosts=NULL,
    needlog=TRUE,
    mapdrive=TRUE,
	quiet=FALSE,
	nonblock=TRUE,
	sleep=0.1) {
    if (!is.loaded("mpi_comm_spawn"))
        stop("You cannot use MPI_Comm_spawn API")   
    if (mpi.comm.size(comm) > 0){
         err <-paste("It seems there are some slaves running on comm ", comm)
         stop(err)
    }
    if (.Platform$OS=="windows"){
		#stop("Spawning is not implemented. Please use mpiexec with Rprofile.")
        workdrive <- unlist(strsplit(getwd(),":"))[1]
		workdir <- unlist(strsplit(getwd(),"/"))
		if (length(workdir) > 1)
			workdir <-paste(workdir, collapse="\\")
		else
        	workdir <- paste(workdir,"\\")
        localhost <- Sys.getenv("COMPUTERNAME")
        networkdrive <-NULL #.Call("RegQuery", as.integer(2),paste("NETWORK\\",workdrive,sep=""), 
                        #PACKAGE="Rmpi")
        remotepath <-networkdrive[which(networkdrive=="RemotePath")+1]
        mapdrive <- as.logical(mapdrive && !is.null(remotepath))
        arg <- c(Rscript, R.home(), workdrive, workdir, localhost, mapdrive, remotepath)
		if (.Platform$r_arch == "i386") 
            realns <- mpi.comm.spawn(slave = system.file("Rslaves32.cmd", 
				package = "Rmpi"), slavearg = arg, nslaves = nslaves, 
				info = 0, root = root, intercomm = intercomm, quiet = quiet)
		else 
			realns <- mpi.comm.spawn(slave = system.file("Rslaves64.cmd", 
			            package = "Rmpi"), slavearg = arg, nslaves = nslaves, 
				info = 0, root = root, intercomm = intercomm, quiet = quiet)
    }
    else{
        tmp <- paste(Sys.getpid(), "+", comm, sep="")   
        if (needlog)
            arg <- c(Rscript, tmp, "needlog", R.home())
        else
            arg <- c(Rscript, tmp , "nolog", R.home())  
        if (!is.null(hosts)){
            hosts <- as.integer(hosts)
            if (any(is.na(hosts)))
                stop("hosts argument contains non-integer object(s).")
            if (max(hosts) > mpi.universe.size() -1 ||min(hosts) < 0){
                tmp1 <- paste("hosts number should be within 0 to",
                    mpi.universe.size()-1)
                stop(tmp1)
            }
            nslaves <- length(hosts)
            tmpfile <-paste(tmp, "appschema", sep="") 
            fileobj <- file(tmpfile,"w")
            cat("c", paste(hosts, collapse=","), sep="", file=fileobj)
            cat(" ", system.file("Rslaves.sh", package="Rmpi"), file=fileobj)
            cat(" ", paste(arg, collapse=" "), file=fileobj)
            close(fileobj)
            mpi.info.create(0)
            mpi.info.set(0,"file",tmpfile)
        }
		if (length(unlist(strsplit(.Platform$pkgType,"mac"))) ==2 && .Platform$r_arch =="x86_64")
			realns<-mpi.comm.spawn(slave=system.file("MacR64slaves.sh", package="Rmpi"),
			slavearg=arg, nslaves=nslaves, info=0, root=root, intercomm=intercomm, quiet = quiet)
		else 
			realns<-mpi.comm.spawn(slave=system.file("Rslaves.sh", package="Rmpi"),
			slavearg=arg, nslaves=nslaves, info=0, root=root, intercomm=intercomm, quiet = quiet)
	}
    if (!is.null(hosts)){
        unlink(tmpfile)
        mpi.info.free(0)
    }
    if (realns==0)
        stop("It seems no single slave spawned.")
    if (mpi.intercomm.merge(intercomm,0,comm)) {
        mpi.comm.set.errhandler(comm)
        mpi.comm.disconnect(intercomm)
		mpi.bcast(nonblock,type=1, rank=0, comm=comm)
		mpi.bcast(sleep,type=2, rank=0, comm=comm)
        if (!quiet) slave.hostinfo(comm)
    }
    else
        stop("Fail to merge the comm for master and slaves.")
}   

mpi.remote.exec <- function(cmd, ...,  simplify=TRUE, comm=1, ret=TRUE){
    if (mpi.comm.size(comm) < 2)
    stop("It seems no slaves running.")
    tag <- floor(runif(1,20000,30000))
    scmd <- substitute(cmd)
    arg <-list(...)
    #if (length(arg) > 0) 
    #    deparse(arg)
    #tag.ret <- c(tag, ret, simplify)
    mpi.bcast.cmd(.mpi.worker.exec, tag=tag, ret=ret, simplify=simplify, comm = comm)
    #mpi.bcast(as.integer(tag.ret), type=1, comm=comm)
    mpi.bcast.Robj(list(scmd=scmd, arg=arg), comm=comm)

    if (ret){
        size <- mpi.comm.size(comm) 
        allcode <- mpi.allgather(integer(2), 1, integer(2*size), comm)
    type <- allcode[seq(3,2*size,2)]
    len <- allcode[seq(4,2*size,2)]
    eqlen <- all(len==len[1])
    if (all(type==1)){
        if (eqlen && simplify){
            out <- mpi.gather(integer(len[1]),1,integer(size*len[1]),0,comm)
            out <- out[(len[1]+1):(size*len[1])]
        dim(out) <- c(len[1], size-1)
        out <- data.frame(out)
        }
        else {
          out1<-mpi.gatherv(integer(1),1,integer(1+sum(len)),c(1,len),0,comm)
            uplen <- cumsum(len)+1
            lowlen <-c(2, uplen[-(size-1)]+1)
                out <- as.list(integer(size-1))
                names(out) <- paste("slave",1:(size-1), sep="")
                for (i in 1:(size-1))
            out[[i]]<- out1[lowlen[i]:uplen[i]]
        }
    }
    else if (all(type==2)){
        if (eqlen && simplify){
                out <- mpi.gather(double(len[1]),2,double(size*len[1]),0,comm)
                out <- out[(len[1]+1):(size*len[1])]
                dim(out) <- c(len[1], size-1)
                out <- data.frame(out)
            }
        else {
          out1<-mpi.gatherv(double(1),2,double(1+sum(len)),c(1,len),0,comm) 
            uplen <- cumsum(len)+1
            lowlen <-c(2, uplen[-(size-1)]+1)
                out <- as.list(integer(size-1))
                names(out) <- paste("slave",1:(size-1), sep="")
                for (i in 1:(size-1))
            out[[i]]<- out1[lowlen[i]:uplen[i]]
        }
    }
 else if (all(type==4)){
        if (eqlen && simplify){
                out <- mpi.gather(raw(len[1]),4,raw(size*len[1]),0,comm)
                out <- out[(len[1]+1):(size*len[1])]
                dim(out) <- c(len[1], size-1)
                out <- data.frame(out)
            }
        else {
          out1<-mpi.gatherv(raw(1),4,raw(1+sum(len)),c(1,len),0,comm)
            uplen <- cumsum(len)+1
            lowlen <-c(2, uplen[-(size-1)]+1)
                out <- as.list(integer(size-1))
                names(out) <- paste("slave",1:(size-1), sep="")
                for (i in 1:(size-1))
                    out[[i]]<- out1[lowlen[i]:uplen[i]]
        }
    }

    else {
            out <- as.list(integer(size-1))
            names(out) <- paste("slave",1:(size-1), sep="")
            for (i in 1:(size-1)){
        tmp<- mpi.recv.Robj(mpi.any.source(),tag,comm)
        src <- mpi.get.sourcetag()[1] 
        out[[src]]<- tmp 
        }
    }
        out
    }
}

.typeindex <- function (x) {
    if(is.integer(x))
            as.integer(c(1,length(x)))
    else if (is.numeric(x))
            as.integer(c(2,length(x)))
    else if (is.raw(x))
            as.integer(c(4,length(x)))

    else
            as.integer(-1)
}

.mpi.worker.exec <- function(tag, ret, simplify){
    #assign(".mpi.err", FALSE,  envir = .GlobalEnv)
    assign(".mpi.err", FALSE)
	.comm <- 1
    #tag.ret <- mpi.bcast(integer(3), type=1, comm=.comm)
    #tag <- tag.ret[1]
    #ret <- as.logical(tag.ret[2])
    #simplify <- as.logical(tag.ret[3])
    scmd.arg <- mpi.bcast.Robj(comm=.comm)

    if (ret){
    size <- mpi.comm.size(.comm)
    myerrcode <- as.integer(0)
    if (length(scmd.arg$arg)>0)
        out <- try(do.call(as.character(scmd.arg$scmd), scmd.arg$arg, envir=.GlobalEnv),TRUE)
    else
        out <- try(eval(scmd.arg$scmd, envir=sys.parent()), TRUE)
    
    if (get(".mpi.err")){
        print(geterrmessage())
        type <- integer(2)
    }
        else {
        type <- .typeindex(out)
        if (is.na(type[2]))
            type[2] <- as.integer(0)    
        }
    allcode <- mpi.allgather(type, 1, integer(2*size), .comm)
    type <- allcode[seq(3,2*size,2)]
        len <- allcode[seq(4,2*size,2)]
        eqlen <- all(len==len[1])
        if (all(type==1)) {
            if (eqlen && simplify)
                mpi.gather(out, 1, integer(1), 0, .comm)
        else
        mpi.gatherv(out, 1, integer(1), integer(1), 0 ,.comm)
    }
    else if (all(type==2)) {
            if (eqlen && simplify)
                mpi.gather(out, 2, double(1), 0, .comm)
        else
                mpi.gatherv(out, 2, double(1), integer(1), 0, .comm)
        }
     else if (all(type==4)) {
            if (eqlen && simplify)
                mpi.gather(out, 4, raw(1), 0, .comm)
        else
                mpi.gatherv(out, 4, raw(1), integer(1), 0, .comm)
        }

    else {
        mpi.send.Robj(out,0,tag,.comm)
    }       
    }
    else {
    if (length(scmd.arg$arg)>0)
            out <- try(do.call(as.character(scmd.arg$scmd), scmd.arg$arg))
        else
            out <- try(eval(scmd.arg$scmd))  
    }
}

mpi.close.Rslaves <- function(dellog=TRUE, comm=1){
    if (mpi.comm.size(comm) < 2){
    err <-paste("It seems no slaves running on comm", comm)
    stop(err)
    }
	#mpi.break=delay(do.call("break",  list(), envir=.GlobalEnv))
    mpi.bcast.cmd(cmd="kaerb", rank=0, comm=comm)
    if (.Platform$OS!="windows"){
        if (dellog && mpi.comm.size(0) < mpi.comm.size(comm)){
        tmp <- paste(Sys.getpid(),"+",comm,sep="")  
        logfile <- paste("*.",tmp,".*.log", sep="")
        if (length(system(paste("ls", logfile),TRUE,ignore.stderr=TRUE) )>=1)
            system(paste("rm", logfile))
        }
    }
#     mpi.barrier(comm)
    if (comm >0){
        #if (is.loaded("mpi_comm_disconnect"))
            #mpi.comm.disconnect(comm) 
        #else
            mpi.comm.free(comm)
    }
#   mpi.comm.set.errhandler(0)
}

tailslave.log <- function(nlines=3,comm=1){
    if (mpi.comm.size(comm)==0)
    stop ("It seems no slaves running")
    tmp <- paste(Sys.getpid(),"+",comm,sep="")  
    logfile <- paste("*.",tmp,".*.log", sep="")
    if (length(system(paste("ls", logfile),TRUE,ignore.stderr=TRUE))==0)
    stop("It seems no slave log files.")
    system(paste("tail -",nlines," ", logfile,sep=""))
}

mpi.apply <- function(X, FUN, ...,  comm=1){
    n <- length(X)
    nslaves <- mpi.comm.size(comm)-1
     if (nslaves < n)
        stop("data length must be at most total slave size")
    if (!is.function(FUN))
        stop("FUN is not a function")
    length(list(...)) #test for any non existing R objects
    tag <- floor(runif(1,1,1000))    
    mpi.bcast.cmd(.mpi.worker.apply, n=n, tag=tag, comm=comm)
    #mpi.bcast(as.integer(c(tag,n)),type=1,comm=comm)
    mpi.bcast.Robj(list(FUN=FUN,dot.arg=list(...)),rank=0,comm=comm)
    if (n < nslaves)
        X=c(X,as.list(integer( nslaves-n)))
    mpi.scatter.Robj(c(list("master"),as.list(X)),root=0,comm=comm)

    out <- as.list(integer(n))
    for (i in 1:n){
       tmp<- mpi.recv.Robj(mpi.any.source(),tag,comm)
       src <- mpi.get.sourcetag()[1]
       out[[src]]<- tmp
    }
    out
}

.mpi.worker.apply <- function(n, tag){
    #assign(".mpi.err", FALSE,  envir = .GlobalEnv)
	.comm <- 1
    #tag.n <- mpi.bcast(integer(2), type=1, comm=.comm)
    #tag <- tag.n[1]
    #n <- tag.n[2]
    tmpfunarg <- mpi.bcast.Robj(rank=0, comm=.comm)
    .tmpfun <- tmpfunarg$FUN
    dotarg <- tmpfunarg$dot.arg
    tmpdata.arg <- list(mpi.scatter.Robj(root=0,comm=.comm))
    if (mpi.comm.rank(.comm) <= n){
        out <- try(do.call(".tmpfun", c(tmpdata.arg, dotarg)),TRUE)
        mpi.send.Robj(out,0,tag,.comm)
    }
}

mpi.iapply <- function(X, FUN, ...,  comm=1, sleep=0.01){
    n <- length(X)
    nslaves <- mpi.comm.size(comm)-1
     if (nslaves < n)
        stop("data length must be at most total slave size")
    if (!is.function(FUN))
        stop("FUN is not a function")
    length(list(...)) #test for any non existing R objects
    tag <- floor(runif(1,1,1000))    
    mpi.bcast.cmd(.mpi.worker.apply, n=n, tag=tag,comm=comm)
    #mpi.bcast(as.integer(c(tag,n)),type=1,comm=comm)
    mpi.bcast.Robj(list(FUN=FUN,dot.arg=list(...)),rank=0,comm=comm)
    if (n < nslaves)
        X=c(X,as.list(integer( nslaves-n)))
    mpi.scatter.Robj(c(list("master"),as.list(X)),root=0,comm=comm)

    out <- as.list(integer(n))
    done=0
	anysource=mpi.any.source()
    repeat {
       if (mpi.iprobe(anysource,tag,comm)){ 
       srctag <- mpi.get.sourcetag()
       charlen <- mpi.get.count(type=4)
           tmp <- unserialize(mpi.recv(x = raw(charlen), type = 4, srctag[1], 
            srctag[2], comm))
           out[[srctag[1]]]<- tmp
       done=done+1
       }
       if (done < n)
       Sys.sleep(sleep)
       else break
    }
	gc()
    out
}

mpi.parSim <- function(n=100,rand.gen=rnorm, rand.arg=NULL, 
            statistic, nsim=100, run=1, slaveinfo=FALSE, sim.seq=NULL,
            simplify=TRUE, comm=1, ...){
	sim.seq=NULL
    if (mpi.comm.size(comm) < 2)
        stop("It seems no slaves running.")
    if (!is.function(rand.gen))
        stop("rand.gen is not a function")
    if (!is.function(statistic))
        stop("statistic is not a function")
    if (!is.null(rand.arg))
        if (!is.list(rand.arg))
            stop("rand.arg is not a list")
    if (length(list(...))>0)
        deparse(list(...))
        
    slave.num <- mpi.comm.size(comm)-1
    if (!is.null(sim.seq))
        if (!is.integer(sim.seq))
            stop("sim.seq is not an integer vector")
        else if (min(sim.seq)<1 && max(sim.seq)>slave.num && 
                length(sim.seq)!=slave.num*run)
            stop("sim.seq is not in right order")

    mpi.bcast.cmd(.mpi.worker.sim, n=n, nsim=nsim, run=run, comm=comm)  
    mpi.bcast.Robj(list(rand.gen=rand.gen, rand.arg=rand.arg,
                        stat=statistic, stat.arg=list(...)), comm=comm)

    #nnr <- c(n,nsim,run)
    #mpi.bcast(as.integer(nnr),type=1, comm=comm)
    result <- as.list(integer(slave.num*run))

    if (!is.null(sim.seq)){
        for ( i in 1:(slave.num*run)){
            result[[i]] <- mpi.recv.Robj(source=sim.seq[i], tag=8, comm=comm)
            mpi.send(as.integer(i), type=1, dest=sim.seq[i], tag=88, comm=comm)
        }
        return(.simplify(slave.num*run, result, simplify, nsim))
    }

    i <- 0
    anysrc <- mpi.any.source()
    anytag <- mpi.any.tag()
    mpi.parSim.tmp <- integer(slave.num*run)
    while (i < slave.num*run){
        i <- i+1
        result[[i]] <- mpi.recv.Robj(source=anysrc, tag=8, comm=comm)
        src <- mpi.get.sourcetag()[1]
        mpi.send(as.integer(i), type=1, dest=src, tag=88, comm=comm)
        mpi.parSim.tmp[i] <- src
    }
    if (slaveinfo){
        slavename <- paste("slave",1:slave.num, sep="")
        cat("Finished slave jobs summary:\n")
        for (i in 1:slave.num){
            if (i < 10)
                cat(slavename[i], " finished",sum(mpi.parSim==i), "job(s)\n")
            else
                cat(slavename[i], "finished",sum(mpi.parSim==i), "job(s)\n")
        }
    }
	#assign(".mpi.parSim", mpi.parSim.tmp,  envir = .GlobalEnv)
    .simplify(slave.num*run, result, simplify, nsim)
}

.mpi.worker.sim <- function(n, nsim, run){
	.comm <- 1
    tmpdata <- mpi.bcast.Robj(comm=.comm)
    rand.arg <- tmpdata$rand.arg
    stat.arg <- tmpdata$stat.arg
    
    .tmp.rand.gen <- tmpdata$rand.gen
    .tmp.statistic <- tmpdata$stat
    
    #nnr <- mpi.bcast(integer(3), type=1, comm=.comm)
    #n <- nnr[1];  nsim <- nnr[2];  run <- nnr[3]

    i <- 0
    slave.num <- mpi.comm.size(.comm)-1
    
    while( i < slave.num*(run-1)+1){
        out <- replicate(nsim, do.call(".tmp.statistic", c(list(do.call(".tmp.rand.gen", 
                            c(list(n),rand.arg))), stat.arg)))
       
        mpi.send.Robj(obj=out, dest=0, tag=8, comm=.comm)
        i <- mpi.recv(integer(1), type=1, source=0, tag=88, comm=.comm)
    }
}

#from snow
.docall <- function(fun, args) {
    if ((is.character(fun) && length(fun) == 1) || is.name(fun))
        fun <- get(as.character(fun), envir = .GlobalEnv, mode = "function")
    enquote <- function(x) as.call(list(as.name("quote"), x))
    do.call("fun", lapply(args, enquote))
}

.splitIndices <- function(nx, ncl) {
    #i <- 1:nx;
    #structure(split(i, cut(i, ncl)), names=NULL)
    x <- 1:nx
    r <- nx/ncl
    ii <- 0:(ncl - 1) * r
    if (nx < ncl)
        intv <- 0:ncl
    else
        intv <- c(x[round(1 + ii)]-1,nx)
    structure(split(x, cut(x, intv)), names = NULL)
}

mpi.parMM <- function(A, B, job.num=mpi.comm.size(comm)-1, comm=1){
    splitRows <- function(x, ncl)
        lapply(.splitIndices(nrow(x), ncl), function(i) x[i,, drop=FALSE])    
    .docall(rbind, mpi.applyLB(splitRows(A, job.num), 
        get("%*%"), B, comm=comm))
}
    
mpi.iparMM <- function(A, B, comm=1, sleep=0.01){
    splitRows <- function(x, ncl)
        lapply(.splitIndices(nrow(x), ncl), function(i) x[i,, drop=FALSE])    
    .docall(rbind, mpi.iapply(splitRows(A, mpi.comm.size(comm)-1), 
        get("%*%"), B, comm=comm, sleep=sleep))
}    

mpi.applyLB <- function(X, FUN, ...,  apply.seq=NULL, comm=1){
	apply.seq=NULL
    n <- length(X)
    slave.num <- mpi.comm.size(comm)-1
    if (slave.num < 1)
        stop("There are no slaves running")
    if (n <= slave.num) {
        if (exists(".mpi.applyLB")) 
			rm(".mpi.applyLB",  envir=.GlobalEnv)
        return (mpi.apply(X,FUN,...,comm=comm))
    }    
    if (!is.function(FUN))
        stop("FUN is not a function")
    length(list(...))
    if (!is.null(apply.seq))
        if (!is.integer(apply.seq))
            stop("apply.seq is not an integer vector")
        else if (min(apply.seq)<1 && max(apply.seq)>slave.num && 
                length(apply.seq)!=n)
            stop("apply.seq is not in right order")
            
    mpi.bcast.cmd(.mpi.worker.applyLB, n=n, comm=comm)
    #mpi.bcast(as.integer(n),type=1,comm=comm)
    mpi.bcast.Robj(list(FUN=FUN,dot.arg=list(...)),rank=0,comm=comm)
    out <- as.list(integer(n))
    mpi.anysource <- mpi.any.source()
    mpi.anytag <- mpi.any.tag()
    for (i in 1:slave.num)
        mpi.send.Robj(list(data.arg=list(X[[i]])), dest=i,tag=i, comm=comm)
  
    if (!is.null(apply.seq)){
        for ( i in 1:n){
            tmp <- mpi.recv.Robj(source=apply.seq[i], tag=mpi.anytag, comm=comm)
            tag <- mpi.get.sourcetag()[2]
            out[[tag]]<- tmp
            j <- i+slave.num
            if (j <= n)
                mpi.send.Robj(list(data.arg=list(X[[j]])), dest=apply.seq[i],tag=j, comm=comm)
            else
                mpi.send.Robj(list(data.arg=list(n)),dest=apply.seq[i],tag=j,comm=comm)
        }
        return(out)
    }
    #.mpi.applyLB <<- integer(n)
	mpi.seq.tmp <- integer(n)
    for (i in 1:n){
       tmp<- mpi.recv.Robj(mpi.anysource,mpi.anytag,comm)
       srctag <- mpi.get.sourcetag()
       out[[srctag[2]]]<- tmp
       mpi.seq.tmp[i] <- srctag[1]
       j <- i+slave.num
       if (j <= n)
            mpi.send.Robj(list(data.arg=list(X[[j]])), dest=srctag[1],tag=j, comm=comm)
       else
            mpi.send.Robj(list(data.arg=list(n)),dest=srctag[1],tag=j,comm=comm)
    }
 	#assign(".mpi.applyLB",mpi.seq.tmp, envir = .GlobalEnv)
	out
}

.mpi.worker.applyLB <- function(n){
    #assign(".mpi.err", FALSE,  envir = .GlobalEnv)
	.comm <- 1
    #n <- mpi.bcast(integer(1), type=1, comm=.comm)
    tmpfunarg <- mpi.bcast.Robj(rank=0, comm=.comm)
    .tmpfun <- tmpfunarg$FUN
    dotarg <- tmpfunarg$dot.arg
    mpi.anytag <- mpi.any.tag()
    repeat {
        tmpdata.arg <- mpi.recv.Robj(source=0,tag=mpi.anytag, comm=.comm)$data.arg
        tag <- mpi.get.sourcetag()[2]
        if (tag > n)
            break
        out <- try(do.call(".tmpfun", c(tmpdata.arg, dotarg)),TRUE)
        #if (.mpi.err)
        #    print(geterrmessage())
        mpi.send.Robj(out,0,tag,.comm)
    }
}

mpi.iapplyLB <- function(X, FUN, ...,  apply.seq=NULL, comm=1, sleep=0.01){
	apply.seq=NULL
    n <- length(X)
    slave.num <- mpi.comm.size(comm)-1
    if (slave.num < 1)
        stop("There are no slaves running")
    if (n <= slave.num) {
        if (exists(".mpi.applyLB"))
            rm(".mpi.applyLB",  envir =.GlobalEnv)
        return (mpi.iapply(X,FUN,...,comm=comm,sleep=sleep))
    }
    if (!is.function(FUN))
        stop("FUN is not a function")
    if (slave.num > 2000)
        stop("Total slaves are more than nonblock send/receive can handle")
    length(list(...))
    if (!is.null(apply.seq))
        if (!is.integer(apply.seq))
            stop("apply.seq is not an integer vector")
        else if (min(apply.seq)<1 && max(apply.seq)>slave.num &&
                length(apply.seq)!=n)
            stop("apply.seq is not in right order")

    mpi.bcast.cmd(.mpi.worker.applyLB, n=n, comm=comm)
    #mpi.bcast(as.integer(n),type=1,comm=comm)
    mpi.bcast.Robj(list(FUN=FUN,dot.arg=list(...)),rank=0,comm=comm)
    out <- as.list(integer(n))
    mpi.anysource <- mpi.any.source()
    mpi.anytag <- mpi.any.tag()
    for (i in 1:slave.num)
        mpi.send.Robj(list(data.arg=list(X[[i]])), dest=i,tag=i,comm=comm)
    #for (i in 1:slave.num)
    #    mpi.waitany(slave.num)

    if (!is.null(apply.seq)){
       i=0
       repeat {
        if (mpi.iprobe(apply.seq[i+1],mpi.anytag,comm)){
            i=i+1
            j <- i+slave.num
            if ( j <= n)
                mpi.send.Robj(list(data.arg=list(X[[j]])), dest=apply.seq[i],tag=j, comm=comm) 
            else
                mpi.send.Robj(as.integer(0),dest=apply.seq[i],tag=j,comm=comm)  
            charlen <- mpi.get.count(type=4)
            tag <- mpi.get.sourcetag()[2]
            tmp <- unserialize(mpi.recv(x = raw(charlen), type = 4, apply.seq[i], tag, comm))
            out[[tag]]<- tmp
            #mpi.wait(0)
        }
      if (i < n)
         Sys.sleep(sleep)
      else break
      }
      return(out)
    }
    mpi.seq.tmp <- integer(n)
    i=0
    repeat {
        if (mpi.iprobe(mpi.anysource,mpi.anytag,comm)){
            i=i+1
            srctag <- mpi.get.sourcetag()
            src <- srctag[1]
            tag <- srctag[2]
            j <- i+slave.num
            if ( j <= n)
                mpi.send.Robj(list(data.arg=list(X[[j]])), dest=src,tag=j, comm=comm)
            else
                mpi.send.Robj(as.integer(0),dest=src,tag=j,comm=comm)
            charlen <- mpi.get.count(type=4)
            tmp <- unserialize(mpi.recv(x = raw(charlen), type = 4, src, tag, comm))
            out[[tag]]<- tmp
            mpi.seq.tmp[i] <- src
            #mpi.wait(src-1)
        }
        if (i < n)
            Sys.sleep(sleep)
        else
            break
    }
  	#assign(".mpi.applyLB",mpi.seq.tmp, envir = .GlobalEnv)
	gc()
    out
}

#.mpi.worker.iapplyLB <- function(){
#    assign(".mpi.err", FALSE,  envir = .GlobalEnv)
#    n <- mpi.bcast(integer(1), type=1, comm=.comm)
#    tmpfunarg <- mpi.bcast.Robj(rank=0, comm=.comm)
#    .tmpfun <- tmpfunarg$fun
#    dotarg <- tmpfunarg$dot.arg
#    mpi.anytag <- mpi.any.tag()
#    repeat {
#        tmpdata.arg <- mpi.recv.Robj(source=0,tag=mpi.anytag, comm=.comm)$data.arg
#        tag <- mpi.get.sourcetag()[2]
#        if (tag > n)
#            break
#        out <- try(do.call(".tmpfun", c(tmpdata.arg, dotarg)),TRUE)
#        #if (.mpi.err)
#        #    print(geterrmessage())
#        mpi.wait(0)
#        mpi.isend.Robj(out,0,tag,.comm)
#    }
#    mpi.wait(0)
#}

.simplify <- function(n, answer, simplify, len=1, recursive=FALSE){
    if (simplify && length(answer)&&length(common.len <- unique(unlist(lapply(answer, 
        length)))) == 1 ) {
        if (common.len == len) 
            unlist(answer, recursive = recursive)
        else if (common.len > len) 
            array(unlist(answer, recursive = recursive),
                  dim = c(common.len/len, n*len),
                  dimnames = list(names(answer[[1]]), names(answer)))
        else answer
    }
    else answer
}

mpi.parLapply <- function(X, FUN, ..., job.num=mpi.comm.size(comm)-1, apply.seq=NULL, comm=1){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitList <- function(x, ncl)
        lapply(.splitIndices(length(X), ncl), function(i) X[i])
    .docall(c, mpi.applyLB(splitList(X, job.num), 
        lapply, FUN, ..., apply.seq=apply.seq, comm=comm))
}

mpi.iparLapply <- function(X, FUN, ..., job.num=mpi.comm.size(comm)-1, apply.seq=NULL, comm=1, sleep=0.01){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitList <- function(X, ncl)
        lapply(.splitIndices(length(X), ncl), function(i) X[i])
    .docall(c, mpi.iapplyLB(splitList(X, job.num), 
        lapply, FUN, ..., apply.seq=apply.seq, comm=comm, sleep=sleep))
}

mpi.parSapply <- function (X, FUN, ..., job.num=mpi.comm.size(comm)-1, apply.seq=NULL, 
                simplify = TRUE, USE.NAMES = TRUE, comm=1) 
{
    FUN <- match.fun(FUN)
    answer <- mpi.parLapply(as.list(X),FUN,...,job.num=job.num,apply.seq=apply.seq,comm=comm)
    if (USE.NAMES && is.character(X) && is.null(names(answer))) 
        names(answer) <- X
    .simplify(length(X),answer, simplify)
}

mpi.iparSapply <- function (X, FUN, ..., job.num=mpi.comm.size(comm)-1, apply.seq=NULL, 
                simplify = TRUE, USE.NAMES = TRUE, comm=1,sleep=0.01) 
{
    FUN <- match.fun(FUN)
    answer <- mpi.iparLapply(as.list(X),FUN,...,job.num=job.num,apply.seq=apply.seq,comm=comm,sleep=sleep)
    if (USE.NAMES && is.character(X) && is.null(names(answer))) 
        names(answer) <- X
    .simplify(length(X),answer, simplify)
}

mpi.parReplicate <- function(n,  expr, job.num=mpi.comm.size(comm)-1, apply.seq=NULL,
                                simplify = TRUE, comm=1){
    mpi.parSapply(integer(n), eval.parent(substitute(function(...) expr)), 
    job.num=job.num, apply.seq=apply.seq, simplify = simplify, comm=comm)
}

mpi.iparReplicate <- function(n,  expr, job.num=mpi.comm.size(comm)-1, apply.seq=NULL,
                                simplify = TRUE, comm=1,sleep=0.01){
    mpi.iparSapply(integer(n), eval.parent(substitute(function(...) expr)), 
    job.num=job.num, apply.seq=apply.seq, simplify = simplify, comm=comm,sleep=sleep)
}

mpi.parRapply <- function(X,FUN,...,job.num=mpi.comm.size(comm)-1,apply.seq=NULL,comm=1){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitRows <- function(X, ncl)
        lapply(.splitIndices(nrow(X), ncl), function(i) X[i,, drop=FALSE])
    .docall(c, mpi.applyLB(splitRows(X,job.num), apply, 1, FUN, ..., 
            apply.seq=apply.seq, comm=comm))
}

mpi.iparRapply <- function(X,FUN,...,job.num=mpi.comm.size(comm)-1,apply.seq=NULL,comm=1,sleep=0.01){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitRows <- function(X, ncl)
        lapply(.splitIndices(nrow(X), ncl), function(i) X[i,, drop=FALSE])
    .docall(c, mpi.iapplyLB(splitRows(X,job.num), apply, 1, FUN, ..., 
            apply.seq=apply.seq, comm=comm,sleep=sleep))
}

mpi.parCapply <- function(X,FUN,...,job.num=mpi.comm.size(comm)-1,apply.seq=NULL,comm=1){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitCols <- function(X, ncl)
        lapply(.splitIndices(ncol(X), ncl), function(i) X[,i, drop=FALSE])
    .docall(c, mpi.applyLB(splitCols(X,job.num), apply, 2, FUN, ..., 
            apply.seq=apply.seq, comm=comm))
}

mpi.iparCapply <- function(X,FUN,...,job.num=mpi.comm.size(comm)-1,apply.seq=NULL,comm=1,sleep=0.01){
    if (job.num < 2)
        stop("job.num is at least 2.")
    splitCols <- function(X, ncl)
        lapply(.splitIndices(ncol(X), ncl), function(i) X[,i, drop=FALSE])
    .docall(c, mpi.iapplyLB(splitCols(X,job.num), apply, 2, FUN, ..., 
            apply.seq=apply.seq, comm=comm,sleep=sleep))
}

mpi.parApply <- function(X, MARGIN, FUN, ..., job.num = mpi.comm.size(comm)-1,
                    apply.seq=NULL, comm=1)
{
    FUN <- match.fun(FUN)
    d <- dim(X)
    dl <- length(d)
    if(dl == 0)
    stop("dim(X) must have a positive length")
    ds <- 1:dl

    if(length(oldClass(X)) > 0)
    X <- if(dl == 2) as.matrix(X) else as.array(X)
    dn <- dimnames(X)

    s.call <- ds[-MARGIN]
    s.ans  <- ds[MARGIN]
    d.call <- d[-MARGIN]
    d.ans  <- d[MARGIN]
    dn.call<- dn[-MARGIN]
    dn.ans <- dn[MARGIN]

    d2 <- prod(d.ans)
    if(d2 == 0) {
        newX <- array(vector(typeof(X), 1), dim = c(prod(d.call), 1))
        ans <- FUN(if(length(d.call) < 2) newX[,1] else
                   array(newX[,1], d.call, dn.call), ...)
        return(if(is.null(ans)) ans else if(length(d.call) < 2) ans[1][-1]
               else array(ans, d.ans, dn.ans))
    }
    newX <- aperm(X, c(s.call, s.ans))
    dim(newX) <- c(prod(d.call), d2)
    if(length(d.call) < 2) {
        if (length(dn.call)) dimnames(newX) <- c(dn.call, list(NULL))
        ans <- mpi.parLapply(1:d2, function(i, ...) FUN(newX[,i], ...), 
                ..., job.num = job.num, apply.seq=apply.seq, comm=comm )
    } 
    else
        ans <- mpi.parLapply(1:d2,
            function(i, ...) FUN(array(newX[,i], d.call, dn.call), ...),
                ..., job.num = job.num, apply.seq=apply.seq, comm=comm)
                    
    ans.list <- is.recursive(ans[[1]])
    l.ans <- length(ans[[1]])

    ans.names <- names(ans[[1]])
    if(!ans.list)
        ans.list <- any(unlist(lapply(ans, length)) != l.ans)
    if(!ans.list && length(ans.names)) {
        all.same <- sapply(ans, function(X) identical(names(X), ans.names))
        if (!all(all.same)) ans.names <- NULL
    }
    len.a <- if(ans.list) d2 else length(ans <- unlist(ans, recursive = FALSE))
    if(length(MARGIN) == 1 && len.a == d2) {
        names(ans) <- if(length(dn.ans[[1]])) dn.ans[[1]]
        return(ans)
    }
    if(len.a == d2)
        return(array(ans, d.ans, dn.ans))
    if(len.a > 0 && len.a %% d2 == 0)
        return(array(ans, c(len.a %/% d2, d.ans),
                     if(is.null(dn.ans)) {
                         if(!is.null(ans.names)) list(ans.names,NULL)
                     } else c(list(ans.names), dn.ans)))
    return(ans)
}

mpi.iparApply <- function(X, MARGIN, FUN, ..., job.num = mpi.comm.size(comm)-1,
                    apply.seq=NULL, comm=1,sleep=0.01)
{
    FUN <- match.fun(FUN)
    d <- dim(X)
    dl <- length(d)
    if(dl == 0)
    stop("dim(X) must have a positive length")
    ds <- 1:dl

    if(length(oldClass(X)) > 0)
    X <- if(dl == 2) as.matrix(X) else as.array(X)
    dn <- dimnames(X)

    s.call <- ds[-MARGIN]
    s.ans  <- ds[MARGIN]
    d.call <- d[-MARGIN]
    d.ans  <- d[MARGIN]
    dn.call<- dn[-MARGIN]
    dn.ans <- dn[MARGIN]

    d2 <- prod(d.ans)
    if(d2 == 0) {
        newX <- array(vector(typeof(X), 1), dim = c(prod(d.call), 1))
        ans <- FUN(if(length(d.call) < 2) newX[,1] else
                   array(newX[,1], d.call, dn.call), ...)
        return(if(is.null(ans)) ans else if(length(d.call) < 2) ans[1][-1]
               else array(ans, d.ans, dn.ans))
    }
    newX <- aperm(X, c(s.call, s.ans))
    dim(newX) <- c(prod(d.call), d2)
    if(length(d.call) < 2) {
        if (length(dn.call)) dimnames(newX) <- c(dn.call, list(NULL))
        ans <- mpi.iparLapply(1:d2, function(i, ...) FUN(newX[,i], ...), 
                ..., job.num = job.num, apply.seq=apply.seq, comm=comm, sleep=sleep )
    } 
    else
        ans <- mpi.iparLapply(1:d2,
            function(i, ...) FUN(array(newX[,i], d.call, dn.call), ...),
                ..., job.num = job.num, apply.seq=apply.seq, comm=comm, sleep=sleep)
                    
    ans.list <- is.recursive(ans[[1]])
    l.ans <- length(ans[[1]])

    ans.names <- names(ans[[1]])
    if(!ans.list)
        ans.list <- any(unlist(lapply(ans, length)) != l.ans)
    if(!ans.list && length(ans.names)) {
        all.same <- sapply(ans, function(X) identical(names(X), ans.names))
        if (!all(all.same)) ans.names <- NULL
    }
    len.a <- if(ans.list) d2 else length(ans <- unlist(ans, recursive = FALSE))
    if(length(MARGIN) == 1 && len.a == d2) {
        names(ans) <- if(length(dn.ans[[1]])) dn.ans[[1]]
        return(ans)
    }
    if(len.a == d2)
        return(array(ans, d.ans, dn.ans))
    if(len.a > 0 && len.a %% d2 == 0)
        return(array(ans, c(len.a %/% d2, d.ans),
                     if(is.null(dn.ans)) {
                         if(!is.null(ans.names)) list(ans.names,NULL)
                     } else c(list(ans.names), dn.ans)))
    return(ans)
}

#mpi.parallel.sim <- mpi.parSim

Try the Rmpi package in your browser

Any scripts or data that you put into this service are public.

Rmpi documentation built on April 1, 2023, 12:20 a.m.