R/worker.R

Defines functions worker

Documented in worker

#' @importFrom R6 R6Class
#' @import redux
#' @import glue
#' @import rzmq
NULL

#' Worker object
#'
#' This object listens for tasks sent by some Rworker instance.
#' After running, the status of the execution is sent back 
#' to the Rworker instance.
#'
#' @section Usage:
#' ```
#' w <- worker()
#' ```
#' 
#' @section Details:
#' `$listen()` listens for tasks sent by the Rworker instance
#' `$report()` returns the task execution state to Rworker instance 
#' @rdname Worker
#' @name Worker
#' @examples
#' \dontrun{
#' w <- worker()
#' w$listen()
#' }
NULL

#' @export
Worker <- R6::R6Class(
    'Worker',
    public = list(

        initialize = function() {
            private$context = rzmq::init.context()
            private$psock = rzmq::init.socket(private$context, 'ZMQ_PULL')
            private$ssock = rzmq::init.socket(private$context, 'ZMQ_PUSH')
            rzmq::connect.socket(private$psock, "ipc:///tmp/rworkerp.sock")
            rzmq::connect.socket(private$ssock, "ipc:///tmp/rworkers.sock")
        },

        listen = function() { 
            while(TRUE) {
                tereq = receive.socket(private$psock)
                private$inject_progress(tereq)

                tryCatch({
                    result <- NULL
                    suppressWarnings({
                        withCallingHandlers({
                            result <- do.call(tereq$task, tereq$kwargs)
                        }, warning=function(w) {
                            tereq$warns=c(tereq$warnings,
                                            gsub('\n', ';', as.character(w)))
                        })
                    })
                },
                    error=function(e) {tereq$errors=gsub('\n', ';', as.character(e))},
                    finally= {
                        self$report(tereq, result)
                        gc()
                    }
                )
            }
        },

        report = function(tereq, result=NULL) {
            if (length(tereq$errors) > 0) {
                tereq$status = 'FAILURE'
            } else {
                tereq$status = 'SUCCESS'
            }
            # If a result was provided, add this to the task object
            if(!is.null(result)) {
                tereq$result <- result
            }

            send.socket(private$ssock, data=tereq)
        }
    ),

    private = list(
        context = NULL,
        psock = NULL,
        ssock = NULL,

        inject_progress = function(tereq) {
            expr <- substitute({
                task_progress <- function(pg) {
                    rzmq::send.socket(socket,
                                      data=list(status='PROGRESS',
                                                progress=pg,
                                                errors=NULL,
                                                warnings=NULL,
                                                task_id=id))
                }
            }, list(id=tereq$task_id, socket=private$ssock))
            body(tereq$task) <- as.call(append(as.list(body(tereq$task)), expr, 1))
        }
    ),
)

#' @rdname Worker
#' @export
worker = function() {
    return(Worker$new())
}
lecardozo/rworker documentation built on May 12, 2021, 5:37 p.m.