R/parallel.R

#twUtestF("applyLB")



sfFArgsApplyLB <- function(
	### Load balanced application of F_APPLY to arguments supplied by function F_ARGS(i).
	N_APPLYCASES, 	##<< i=1..N_APPLYCASES
	F_ARGS, 		##<< function(i) returning a list of first arguments to F_APPLY
	F_APPLY, 		##<< function to be applied
	...,			##<< further arguments passed to F_APPLY  
	SFFARGSAPPLY_ADDARGS = list(), ##<< further arguments passed to F_APPLY
	debugSequential=FALSE	##<< evaluate using lapply instead of dynamicClusterApply
){
	##seealso<< 
	## \code{\link{twSnowfall}}
	snowfall:::sfCheck()
	snowfall:::checkFunction(F_ARGS)
	addArgs = c(SFFARGSAPPLY_ADDARGS, list(...))
	argfun <- function(i) c(F_ARGS(i), addArgs)
	if (snowfall::sfParallel() && !debugSequential) 
		return(dynamicClusterApply(snowfall::sfGetCluster(), F_APPLY, N_APPLYCASES, argfun ))
	else {
		lapply(1:N_APPLYCASES, function(i){
				clArgs <- argfun(i)
				do.call( F_APPLY, clArgs)
			})
	}
	### a list of results of F_APPLY
}
#twUtestF("applyLB","test.sfFArgsApplyLB")
attr(sfFArgsApplyLB,"ex") <- function(){
	#sfInit(parallel=TRUE,cpus=2)
	
	X <- matrix(0:9,nrow=5,ncol=2)
	dimnames(X) <- list( rows=paste("r",1:nrow(X),sep=""), cols=paste("c",1:ncol(X),sep="")  )
	Y <- X*10
	# delivering row i of X and row i of Y as arguments to F_Apply
	F_ARGS <- function(i){list(arg1=X[i,],arg2=Y[i,])}
	F_APPLY <- paste
	.res <- sfFArgsApplyLB( nrow(X), F_ARGS, F_APPLY, sep="-")
}


sfSimplifyLBResult <- function(
	### Transform the list resulting from \code{\link{sfClusterApplyLB}} to vector or matrix, similar as apply. 
	resl,
	caseNames=NULL	##<< list of column names, name corresponds to dimension name for columns, i.e as obtained from dimnames(X)[1]
){
	##seealso<< 
	## \code{\link{twSnowfall}}
	
	# assumes that all components in the list are of identical structure
	res <- 
		if( length(resl[[1]]) == 0) resl[[1]]	#includes NULL
		else if( length(resl[[1]]) == 1)	res <- structure( unlist(resl), names=as.vector(unlist(caseNames)) )
		else if ( is.list(resl[[1]]))		structure(resl,names=as.vector(unlist(caseNames)) )
		#else if( is.vector(resl[[1]]) | is.matrix(resl[[1]]) ){	#also works for matrix for which is.vector is TRUE
		else if( is.atomic(resl[[1]]) ){	#is.vector does not work for objects with attributes
			.dimnames <- c( list(names(resl[[1]])), caseNames ) 
			if( all(sapply(.dimnames,is.null))) .dimnames<-NULL
			res <- matrix( unlist(resl), byrow=FALSE, nrow=length(resl[[1]]), dimnames=.dimnames  )
		}else resl
	### Depending on components (result of FUN) \describe{
	### \item{scalar}{vector}
	### \item{vector}{matrix with cases in columns}
	### \item{matrix}{each column corresponds to as.vector(matrix), i.e. stacked to one vector }
	### \item{list}{list of lists. May transform to dataframe by: do.call(rbind, lapply(.res,data.frame)))}
	### \item{data.frame}{list of data.frames}}
}

sfApplyMatrixLB <- function(
	### Load balanced parallel application of FUN to rows or columns of matrix X 
	X,			##<< matrix with rows corresponding to cases supplied to first argument of FUN 
	MARGIN=1,	##<< 1 indicates rows, 2 indicates columns
	FUN,		##<< function to be applied on each row
	...,		##<< further arguments passed to FUN
	debugSequential = FALSE 	##<< use apply 
){
	##seealso<< 
	## \code{\link{twSnowfall}}
	
	if( !is.matrix(X) )
		stop("X must be matrix.")
	if( !(MARGIN %in% 1:2))
		stop("MARGIN must be 1 or 2.")
	if( nrow(X) == 0)
		return( list() )
	##details<< 
	## if debugSequentail is TRUE, simple apply is used 
	res <- if( debugSequential ){
			apply(X, MARGIN, FUN, ... )
		}else{
			caseNames <- dimnames(X)[MARGIN]
			nCases <- if(MARGIN==1) nrow(X) else ncol(X)
			F_ARGS <- if(MARGIN==1) function(i) list(X[i,]) else function(i) list(X[,i])
			resl <- sfFArgsApplyLB( nCases, F_ARGS, FUN, ...)
			res <- sfSimplifyLBResult(resl,caseNames)
		}#!debugSeqential
	### a list with result of FUN for each row or columns of X respectively
}
#twUtestF("","test.sfFArgsApplyDep")


twDynamicClusterApplyDep <- function (
	### Modification of \code{dynamicClusterApply} (snow-internal) to provide result of previous step.
	cl, fun, n, argfun, ##<< a function returning a list of arguments passed to fun
	dependsStep,		##<< dependencies in 1..n
	initVal=vector("list",dependsStep)	##<<  results presented to argfun for the first 1..dependsStep results
	,freeMasterNode=FALSE	##<< if set to TRUE, no job is submitted to node 1, so that this node can dispatch jobs without waiting
){
	##seealso<< 
	## \code{\link{twSnowfall}}
	
	##details<< 
	## \code{argfun=function(i,prevRes) list( <args(i,prevRes)> )} 
	## provides arguments to fun where \describe{
	## \item{i}{index for which to generate arguments}
	## \item{prevRes}{result of FUN for case i-dependsStep.}}
	## can be assume that prevResList[[i-useNCpus]] has been evaluted already
	
	##details<< 
	## \code{dependsStep} 
	checkCluster(cl)
	useNCpus <- length(cl)
	if (n > 0 && useNCpus > 0) {
		val <- vector("list", n)
		jobState <- rep( as.factor(c("depending","waiting","processing","completed"))[1], n+dependsStep)
		jobState[1:min(dependsStep,n)]<-"waiting"	#first dependsStep jobs are waiting
		submit <- function(node, job){
	#cat(paste("s",job,"n",node,":",sep=""))
			#jobState[job] <- "processing" #only locally visible
			# do it outside and do not forget to call it
			dependVal <- if( job > dependsStep) 
					val[[job-dependsStep]] 
				else 
					initVal[[job]]
			args <- argfun(job, dependVal)
	#str(args)
			sendCall(cl[[node]], fun, args, tag = job)
	#cat(paste("--",sep=""))
	#when args was too big, sendCall did not return
		}
		#submit 2 jobs to each node, so that if one is finished there is already one in the queue
		#node0 <- if(!freeMasterNode || n<2) 1 else 2	# start with two in order to keep first node free for dispatching
		nWorkers <- if(!freeMasterNode || useNCpus==1) useNCpus else useNCpus-1
		for (i in 1:min(n, 2*nWorkers, dependsStep)){
			node = (i-1)%%nWorkers+1
			submit(node, i)
			jobState[i] <- "processing"
		}
		for (i in 1:n) {
			d <- recvOneResult(cl)
			val[d$tag] <- list(d$value)
		#cat(paste("r",d$tag,sep=""))
			jobState[d$tag] <- "completed"
			jobState[d$tag+dependsStep] <- "waiting"
			# get next waiting result j for which j-dependsStep has been completed
			j <- match("waiting",jobState)
		#cat(paste("_j",j,",",sep=""))
			if (j <= n){ 
				submit(d$node, j)
				jobState[j] <- "processing"
			}
		}
		#if( !all(jobState[1:n]=="completed")) recover()
		checkForRemoteErrors(val)
	}
	### same (snow-internal) \code{dynamicClusterApply}
}


sfFArgsApplyDep <- function (
	### Load balanced application of F_APPLY with using result from a previous run to construct arguments.
	N_APPLYCASES,	##<< number of cases to calculate
	F_ARGS, 		##<< function returning arguments for case i, see details
	F_APPLY, 		##<< function to calculate
	SFFARGSAPPLY_initVal,	##<< intial values	
	SFFARGSAPPLY_dependsStep=length(SFFARGSAPPLY_initVal),	##<< dependencies in 1..n, 
	...,							##<< further arguments passed to F_APPLY  
	SFFARGSAPPLY_ADDARGS=list(),
		###  results presented to argfun for the first 1..dependsStep results
	debugSequential=FALSE	##<< the number of processors (might be smaller than cluster to assure that previous case i-useNCpus has been evaluated.)
	,freeMasterNode=FALSE	##<< if set to TRUE, no job is submitted to first node, so that this node can dispatch jobs without waiting
){
	if( !is.vector(SFFARGSAPPLY_initVal) || !(length(SFFARGSAPPLY_initVal)>=SFFARGSAPPLY_dependsStep) )
		stop("initVal must be vector of mode list with length dependsStep")
	if( mode(SFFARGSAPPLY_initVal)!= "list" )
		SFFARGSAPPLY_initVal <- as.vector(SFFARGSAPPLY_initVal, mode="list")
	ds <- SFFARGSAPPLY_dependsStep	#avoid long name, which is necessary to prevent partial agrument matching
	snowfall:::sfCheck()
	snowfall:::checkFunction(F_ARGS)
	addArgs = c(SFFARGSAPPLY_ADDARGS, list(...))
	argfun <- function(i,prevRes) c(F_ARGS(i,prevRes), addArgs)
	#argfun(1,SFFARGSAPPLY_initVal[[1]])
	if (sfParallel() && !debugSequential) 
		return(twDynamicClusterApplyDep(sfGetCluster(), F_APPLY, N_APPLYCASES, argfun,
				dependsStep=ds,initVal=SFFARGSAPPLY_initVal, freeMasterNode=freeMasterNode))
	else {
		val <- c(SFFARGSAPPLY_initVal[1:ds], vector("list",N_APPLYCASES) ) #prepend with initial values
		for( i in 1:N_APPLYCASES){
			val[[i+ds]] <- do.call(F_APPLY, argfun(i,val[[i]]),quote=TRUE)
		}
		val[(1:ds)] <- NULL		#remove the initial values
		val
	}
	### a list of results of F_APPLY
	
	##seealso<< 
	## \code{\link{twSnowfall}}
	## \code{\link{twDynamicClusterApplyDep}}
}
#twUtestF("applyLB","test.sfFArgsApplyLB")
attr(sfFArgsApplyDep,"ex") <- function(){
	#sfInit(parallel=TRUE,cpus=2)

	# using as many cpus as rows in Z
	(Z<-matrix(letters[1:12],nrow=3))
	F_APPLY <- function(x,z) paste(x,z,sep="");
	F_ARGS <- function(i,prevRes){list(x=prevRes,z=Z[i])}	
	.res0 <- rep("_",nrow(Z))	# dependStep will be length of .res0
	resSeq <- sfFArgsApplyDep( length(Z), F_ARGS, F_APPLY, .res0)
	(res <- matrix(sfSimplifyLBResult(resSeq),nrow=nrow(Z)))
	
	# Gives the same results as having one parallel call per column.
	# However, in this implementation, the finished nodes do not need to 
	# wait for possibly slower finishing of other rows, and can tackle
	# already further columns.
}

.df.iproc <- function(
	### subsetting a dataframe to assign to 1 out of nproc parts
	df, 	##<< data frame to subset
	iproc, 	##<< ith subset
	nproc	##<< total number of subsets
){
	ni <- ceiling(nrow(df)/nproc)
	df[ ((iproc-1)*ni+1):min( (iproc*ni), nrow(df)), ]
}

.tmp.f <- function(
	### interactive development code
){
	#mtrace(.df.iproc)
	#mtrace.off()
	tmp <- data.frame(a=1:7,b="bla")
	.df.iproc(tmp,1,3)	#1,2,3
	.df.iproc(tmp,2,3)	#4,5,6
	.df.iproc(tmp,3,3)   #7
}

Try the twSnowfall package in your browser

Any scripts or data that you put into this service are public.

twSnowfall documentation built on May 2, 2019, 4:47 p.m.