
Defines functions initializeSnowOnWindows

# Copyright 2015 Hewlett-Packard Development Company, L.P.
# This program is free software; you can redistribute it 
# and/or modify it under the terms of the GNU General Public 
# License, version 2 as published by the Free Software Foundation.

# This program is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# General Public License for more details.

# You should have received a copy of the GNU General Public License 
# along with this program; if not, write to the Free Software 
# Foundation, Inc., 59 Temple Place, Suite 330, 
# Boston, MA 02111-1307 USA.

setClass("ParallelddR", contains="ddRDriver")

#' The default parallel driver
#' @examples
#' \dontrun{
#' useBackend(parallel,executors=4)
#' }
#' @export 
# Exported Driver
parallel <- new("ParallelddR",DListClass = "ParallelObj",DFrameClass = "ParallelObj",DArrayClass = "ParallelObj",backendName = "parallel")
# Driver for the parallel package. Parallel is also the default backend.
ddR.env$driver <- parallel

#Set environment and default number of cores to total no. of cores (but one on windows)
#Note that DetectCores() can return NA. If it's windows we use SNOW by default
parallel.ddR.env <- new.env(emptyenv())
parallel.ddR.env$cores <- parallel::detectCores(all.tests=TRUE, logical=FALSE) 
parallel.ddR.env$clusterType <- "FORK"
parallel.ddR.env$snowCluster <- NULL
if(is.na(parallel.ddR.env$cores)){ parallel.ddR.env$cores <- 1 }
if((.Platform$OS.type == "windows")) {parallel.ddR.env$clusterType <- "PSOCK"}

#Function to initialize SNOW on windows
  cl <- parallel::makeCluster(getOption("cl.cores", parallel.ddR.env$cores))
  parallel.ddR.env$snowCluster <- cl
  parallel.ddR.env$clusterType <- "PSOCK"

# Initialize the no. of cores in parallel backend
# By default we use the FORK method of parallel which works only on UNIx environments. The "PSOCK" method requires SNOW but works on all OSes.
#' @param executors Number of cores to run with.
#' @param type If "FORK", will use UNIX fork() method. If "PSOCK", will use SNOW method.
#' @describeIn init Initialization for parallel
  function(x, executors=NULL, type= "FORK", ...){
    if(!((is.numeric(executors) || is.integer(executors)) && floor(executors)==executors && executors>=0)) stop("Argument 'executors' should be a non-negative integral value")
        parallel.ddR.env$cores <- executors

  #On windows parallel can use only a single core. We need to use socket based SNOW for more number of cores.
  if((.Platform$OS.type == "windows" &&  parallel.ddR.env$cores!=1 ) || type =="PSOCK") {
     message("Using socket based parallel (SNOW) backend.")
  } else{
     if(.Platform$OS.type == "windows" && parallel.ddR.env$cores>1) { stop("On windows, multi-process execution with FORK is not supported for more than one core. Use backend with type = 'PSOCK'")}
     parallel.ddR.env$clusterType <- "FORK"
  return (parallel.ddR.env$cores)

#' @describeIn shutdown Shutdown for parallel
  function(x) {
    if(!is.null(parallel.ddR.env$snowCluster) && parallel.ddR.env$clusterType == "PSOCK") {
        message("Switching out of using 'parallel (SNOW)'. Shutting down SNOW cluster...")
        parallel.ddR.env$clusterType <- "FORK"
	parallel.ddR.env$snowCluster <- NULL

#This function calls mclapply internally when using parallel with "FORK" or 
#snow's staticClusterApply when used with "PSOCK" option. 
#On windows only "PSOCK" provides true parallelism.

#' @rdname do_dmapply
          signature(driver="ParallelddR", func="function"), 
          function(driver, func, ..., MoreArgs = list(),
                   output.type =
                       c("dlist", "dframe", "darray", "sparse_darray"),
                   nparts = NULL,
                   combine = c("default","c", "rbind", "cbind"))
  output.type <- match.arg(output.type)
  if (!is.null(nparts)) {
                length(nparts) == 1L || length(nparts) == 2L)
  combine <- match.arg(combine)

  dots <- list(...)

  for(num in 1:dlen){
      #If this is a DObject, we need to extract the backend object and reassemble it (i.e., use collect)
      #By converting DObject into a normal R object, we let mcmapply handle how iterations on differnt data types
      dots[[num]] <- collect(dots[[num]])
      #There are two cases for a list (1) parts(dobj) or (2) list of parts(dobj)

      if(is(dots[[num]],"list") && any(rapply(dots[[num]], function(x) is(x,"ParallelObj"),how="unlist"))){    
        tmp <- rapply(dots[[num]],function(argument){
	              else return(argument)}, how="replace")

       #(iR): This is bit of a hack. rapply increases the depth of the list, but at the level that the replacement occured. 
       #Simple ulist() does not work. If this was just parts(A,..), we can call unlist. If this is a list of parts, then 
       #we unwrap the second layer of the list. Unwrapping the second layer is incorrect if parts(A) was embedded deeper than that.
       if(is(dots[[num]][[1]], "DObject")){
       	  dots[[num]] <- unlist(tmp, recursive=FALSE)
       } else {
          for(index in seq_along(tmp)){
			tmp[[index]] <- unlist(tmp[[index]], recursive=FALSE)
			tmp[[index]] <- tmp[[index]]
          dots[[num]] <- tmp

   #Check if MoreArgs contains a distributed object. If yes, convert it into a regular object via collect
   for(index in seq_along(MoreArgs)){
         if(is(MoreArgs[[index]], "DObject"))
      	   MoreArgs[[index]] <- collect(MoreArgs[[index]])

   answer <- NULL
   #Now iterate in parallel
   if(parallel.ddR.env$clusterType == "PSOCK"){
    # We are using the SNOW backend, i.e., clusterMap. Check code with print(clusterMap)

    #Wrap the input arguments and use do.call()
    dots <- c(list(cl = parallel.ddR.env$snowCluster, fun = func, MoreArgs = MoreArgs, RECYCLE = FALSE, SIMPLIFY = FALSE), dots)
    answer <- do.call(parallel::clusterMap, dots)

   } else {
   #Wrap inputs in a list to call mcmapply via do.call()

   dots <- c(list(FUN = func, MoreArgs = MoreArgs, SIMPLIFY = FALSE, mc.cores = parallel.ddR.env$cores), dots)
   answer <- do.call(parallel::mcmapply, dots)


   #Perform a cheap check on whether there was an error since man pages say that an error on one core will result in error messages on all. TODO: Sometimes the class of the error is "character"
   if(inherits(answer[[1]], "try-error")) {stop(answer[[1]])}
   if(class(answer[[1]]) == "character" && grepl("Error", answer[[1]])) {stop(answer[[1]])}

   #Create the output object, since we store partitions
   totalParts <- prod(nparts)
   outputObj<-vector("list", totalParts)

   #Create the output based on the number of partitions expected in the output
   #We create partitions with near equal sizes, i.e. if the #elements is not purely divisible by totalparts, we spread the remainder
   #to the 1:remainder elements
   if(totalParts > lenAnswer) stop("Number of elements generated by dmapply is less than those supplied in argument 'nparts'.")

   remainder<-lenAnswer %% totalParts
   elemInEachPart<- rep(floor(lenAnswer/totalParts), totalParts)
   if(remainder>0) {elemInEachPart[1:remainder]<-elemInEachPart[1:remainder]+1}

   #Decide how we may need to combine entries from the answer into partitions
   #We handle the "flatten" case in the while loop since simplify2array has to be called with parameter "higher=FALSE"
   combineFunc <- list

   if(output.type !="dlist"){
       #Setup the partition types that we will use later to check if partitions conform to output.type.   
       if(output.type == "darray") ptype<-"matrix"
       if(output.type == "dframe") ptype<-"data.frame"
       if(output.type == "sparse_darray") ptype<-c("dsCMatrix", "dgCMatrix")

	if(combine == "rbind"){
	   if(ddR.env$RminorVersion > 2) #If R >3.2, use new rbind
	   	   combineFunc <- rbind
	   	   combineFunc <- Matrix::rBind
	else if(combine == "cbind"){
	   if(ddR.env$RminorVersion > 2) #If R >3.2, use new cbind
	   	   combineFunc <- cbind
	   	   combineFunc <- Matrix::cBind
   psizes<-array(0L,dim=c(totalParts,2)) #Stores partition sizes
   while(index <= totalParts){
   	     if(output.type == "dlist"){
	        if(combine == "c")
			     outputObj[[index]] <- unlist(answer[(elemInEachPart[index]+1):elemInEachPart[index+1]], recursive=FALSE)
			     outputObj[[index]] <- answer[(elemInEachPart[index]+1):elemInEachPart[index+1]]
             }else {
	        if(combine == "c" || combine =="default"){
	        	     outputObj[[index]] <- simplify2array(answer[(elemInEachPart[index]+1):elemInEachPart[index+1]], higher=FALSE)
			     outputObj[[index]] <- do.call(combineFunc, answer[(elemInEachPart[index]+1):elemInEachPart[index+1]])
	        if(!(class(outputObj[[index]]) %in% ptype)) {stop("Each partition of the result should be of type = ", ptype, ", to match with output.type =", output.type)}

	     psizes[index,] <-(if(is.null(d)){c(length(outputObj[[index]]),0L)} else {d})


   #Check if partions conform and can be stitched together. 
   #Check partitions in each logical row have the same number of rows/height. Similary for columns
   rowseq<-seq(1, totalParts, by=nparts[2])
   for (index in rowseq){
	     if(any(psizes[index:(index+nparts[2]-1),1]!=psizes[index,1])) stop("Adjacent partitions have different number of rows, should be ", psizes[index,1])
   for (index in 1:nparts[2]){
	    if(any(psizes[(rowseq+(index-1)),2]!=psizes[index,2])) stop("Adjacent partitions have different number of columns, should be ", psizes[index,2])

  numcols<-sum(psizes[1:nparts[2],2]) #add cols of all partitions in the first row
  numrows<-sum(psizes[seq(1, totalParts, by=nparts[2]), 1]) #add all partitions in the first column group
  dims<-c(numrows, numcols)

  #Use single dimension if we know it's a list

   new("ParallelObj",pObj = outputObj, splits = 1:length(outputObj), psize = psizes, dim = as.integer(dims), nparts = nparts)

