R/deamon.r

Defines functions deamon

# Cluster Demaon Deamon Dispatcher
deamon <- function(n = 5,
                   wdir = q.wd(),
                   timer = -1,
                   wait=TRUE)
{

  q.setwd(wdir = wdir)

  if(file.exists(.pkg.log$scheduler.shutdown))
    file.remove(.pkg.log$scheduler.shutdown)

  require(parallel)
  print( sprintf("QWD:%s", q.wd() ))

  # Cluster computing deamon process, load balancing manager
  cl     <- makeCluster(n)
  START  <- Sys.time()

  while (!is.scheduler.shutdown())
  {
    # time watch
    if (timer != -1)
      if (as.integer(Sys.time() - START) > timer)
        break

    # find queues status
    INBOX     <-   q.ls('inbox')
    AGENT     <-   dir(sprintf('%s/agent',q.wd()))

    N.INBOX   <-   nrow(INBOX)
    N.RUNNING <-   length(AGENT)

    if (is.null(N.INBOX))
      N.INBOX = 0
    if (is.null(N.RUNNING))
      N.RUNNING = 0

    message(sprintf("INBOX:%s\tRUNNING:%s",
                    N.INBOX,N.RUNNING))

    if(N.RUNNING < n)
    {
      # estimate available task handler
      N.TO.RUN = n - N.RUNNING
      N.TO.RUN = min(N.TO.RUN, N.INBOX)

      # check avialabity and run job
      if (N.INBOX > 0 && N.RUNNING < n)
      {
        for (i in 1:N.TO.RUN)
        {
          print(INBOX$jid[i])
          JOB  <- INBOX$jid[i]

          message(sprintf("Folk JID: %s", JOB))

          jobs <- mcparallel({
            require(qrmarkdown)
            q.dispatcher(jid  = JOB,
                         wdir = wdir,
                         wait = TRUE)
          })

          # if(wait)
          #   res <- mccollect(jobs)
        }

      } # not other thing to do now.

    } else message("WAITING !!! ")


    Sys.sleep(.pkg.log$n.sleep)

  }

  stopCluster(cl)
} # session
okux/qrmarkdown documentation built on Dec. 22, 2021, 4:17 a.m.