#internal function: prepare MPI connection and slaves

#note: this function reads and writes to non-local variables (i.e. variables declared in the calling function, usually optim_p*)
#although poor style, this method was chosen to avoid passing large arrays of arguments and results, which is time-intensive

#tag-codes for master-slave communications:
#0: task from master
#2: results returning from slave
#3: slave sends good-bye message
#4: execution error from slave
#5: object request from slave or subsequent object transfer from master to slave
#6: object pushed from slave to master
#7: command slaves to abort everything

globvars$mpi_mode="loop" #"loop"    ("bcast" is probably obsolete, as it is less stable and slower)

prepare_mpi_cluster=function(nslaves, working_dir_list=NULL, verbose_slave=FALSE, ...)
  if (!is.loaded("mpi_initialize")) {         
  	if (!require("Rmpi")) stop("Package Rmpi not found. Install it or use serial version of this call (optim_pso or optim_dds).")
    globvars$mpi_initialized_before = FALSE 
  } else
  globvars$mpi_initialized_before = TRUE #indicate that MPI was already initialized by the user (and should not be terminated later)
  if (!is.null(working_dir_list)) 
    if  (!is.data.frame(working_dir_list))
      stop("'working_dir_list' must be a dataframe.")
    if  (length(setdiff(names(working_dir_list), c("host", "wd"))) > 0  )
      stop("'working_dir_list' must be a dataframe with columns 'host' and 'wd'.")
  if (nslaves == -1) nslaves=mpi.universe.size() else  # Spawn as many slaves as possible
  if (nslaves > mpi.universe.size()) warning("Number of specified slaves exceeds number of available slaves.")

  if (mpi.comm.size()>0)
	  print(paste(nslaves,"running slaves detected, no spawning."))
  } else {
 	  if (verbose_master) {print("spawning slaves..."); flush.console()}

  	print(paste(mpi.comm.size(),"slaves spawned."))
  globvars$is_mpi = TRUE
  while(mpi.iprobe(mpi.any.source(),mpi.any.tag()))                #empty MPI queue if there is still something in there
    slave_message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())

     .Last <- function(){
      if (is.loaded("mpi_initialize")){
          if (mpi.comm.size(1) > 0){
              #print("Please use mpi.close.Rslaves() to close slaves.")
          #print("Please use mpi.quit() to quit R")
   options(error=.Last)     #close rmpi on errors

  perform_task = function(params,slave_id) {
    # put all arguments into a list
    argl <- additional_args
    argl[[length(argl)+1]] <- params

      if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": received message for slave",slave_id))

      if (!(mpi.comm.rank() %in% slave_id)) return(1) #only the adressed slaves should attend to the task
      if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": task received"))

      if (tryCall)
        if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": calling objective function..."))
        #results=try(objective_function(params),silent=TRUE)  # call the objective function with the respective parameters, and create results (with error handling, slower)
        results=try(do.call(what = objective_function, args=argl),silent=TRUE)  # call the objective function with the respective parameters, and create results (with error handling, slower)
        if (verbose_slave)
    			print(paste(Sys.time(),"slave",mpi.comm.rank(),":  ...objective function evaluation completed"))

        if (is.numeric(results))
          if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": returning results to master..."))
          mpi.send.Robj(results,0,2)      # Send the results back as a task_done slave_message            
        } else                             #an error occured during execution
          if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": error during call of objective function, returning message to master. Error message:",as.character(results)))
          mpi.send.Robj(paste("(",Sys.info()["nodename"],"):",as.character(results)),0,4)    #return the error message, tagged as "error" (4)
      } else        #non-tryCall option
        if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": calling objective function..."))
        results=do.call(what = objective_function, args=argl)  # call the objective function with the respective parameters, and create results (without error handling, faster)
        if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": ...objective function evaluation completed"))   
        if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": returning results to master..."))
        mpi.send.Robj(results,0,2)      # Send the results back as a task_done slave_message            #ii isend doesn't work - why?

      if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": ...results returned, back to idle mode."))    
  } #end function "perform_task"
  additional_args <- list(...) #get list of arguments hidden in ...
  mpi.bcast.Robj2slave(additional_args)                   #send any additional arguments to slaves

  mpi.bcast.Robj2slave(verbose_slave)                   #send verbose-flags to slaves
  if (verbose_slave)
    mpi.bcast.cmd(sink(paste("slave",mpi.comm.rank(),sep="")))               #put output of slaves into files, if desired

  mpi.bcast.Robj2slave(tryCall)                     #tryCall-flag
  mpi.bcast.Robj2slave(objective_function)         #send objective function to slaves
  mpi.bcast.Robj2slave(perform_task)               #send activation function to slaves

  if (!is.null(working_dir_list))    #change working directories of slaves, if specified
    slave_hosts = capture.output(slave.hostinfo(short = FALSE))
    slave_hosts = sub(" ","",sub("[^:]*: ","",slave_hosts[-1]))  #extract hostnames of slaves, discard master

    wd_to_be_set = array("",length(slave_hosts))                                       #this will be a list with a default directory to each slave
    default_dir=working_dir_list[which(working_dir_list[,"host"]=="default"),"wd"]  #get name of default_dir
    if (length(default_dir)==0) default_dir="./"                                    #if default dir not specified, use "./"
    for (i in 1:length(slave_hosts))
      if (length(entry)==0)
        wd_to_be_set[i]=default_dir                                                 #set default dir
        wd_to_be_set[i]=working_dir_list[entry[1],"wd"]                             #set specified dir
      if (length(entry)>1) working_dir_list=working_dir_list[-entry[1],]            #the current host was listed more than once in the list, remove the first entry  
    mpi.bcast.Robj2slave(wd_to_be_set)            #sent the directory list to all slaves
    #failed_dirchanges= (as.character(mpi.remote.exec(getwd()))!=wd_to_be_set)     #check which of the dirchanges did not succeed
    for (i in 1:length(res))
      if (!is.null(attr(res[[i]],"class")))
    if (any(failed_dirchanges))
      warning(paste("The following slave(s) could not change into the respective directory and remain unused:\n",
        paste(t(cbind(slave_hosts," : ",wd_to_be_set,"\n ")[failed_dirchanges,]),collapse="")))
      globvars$closed_slaves=length(failed_dirchanges)         #count the slaves with failed dirchange as "closed"
      globvars$idle_slaves=globvars$idle_slaves[-failed_dirchanges]     #remove the slaves from list of ready slaves

  if (globvars$mpi_mode=="loop")     #alternative slave-MPI-mode
    if (verbose_master) print(paste(Sys.time(),"initiating slaves in loop-mode."))      
    mpi_receive_loop = function()
      while (tag !=7 )
          messge = mpi.recv.Robj(source=0, tag=mpi.any.tag())
          messge_info = mpi.get.sourcetag()
          tag      = messge_info[2]

          if ((tag == 0) )     #task message
            perform_task(params=messge, slave_id=mpi.comm.rank())        #do task
          if (((tag == 7) && (messge == "kill")) ||
              !is.null(globvars$kill_msg))     #kill-message
            mpi.send.Robj(obj="bye", dest=0, tag=3) #tag 3 demarks good-bye

      if (verbose_slave) print(paste(Sys.time(),"slave",mpi.comm.rank(),": leaving loop, good bye."))    

    mpi.bcast.Robj2slave(mpi_receive_loop)               #transfer mpi-loop function to slaves
    mpi.bcast.cmd(mpi_receive_loop())                    #call     mpi-loop function on slaves

send_task = function(params, slave_id=slave_id)        #submit job to slave
#this function is used to transfer a task to a slave
#two different modi (globvars$mpi_mode="bcast" or "loop") for running the slaves are implemented, so there are two branches
  if (globvars$mpi_mode=="bcast")
   mpi.remote.exec(cmd=perform_task, params=params, slave_id=slave_id, ret=FALSE)        #submit job to slave by broadcasting to all
   mpi.send.Robj(obj=params, dest=slave_id, tag=0) #send params to specified slave with tag=0 (this is a task)
