Nothing
#' Assigns Jobs to Workers (R6 Class)
#'
#' @description
#'
#' Jobs go in. Results come out.
#'
#'
#' @param globals A named list of variables that all `<job>$expr`s will have
#' access to. Alternatively, an object that can be coerced to a named
#' list with `as.list()`, e.g. named vector, data.frame, or environment.
#'
#' @param packages Character vector of package names to load on
#' [`workers`][worker_class].
#'
#' @param namespace The name of a package to attach to the
#' [`worker's`][worker_class] environment.
#'
#' @param init A call or R expression wrapped in curly braces to evaluate on
#' each [`worker`][worker_class] just once, immediately after start-up.
#' Will have access to variables defined by `globals` and assets from
#' `packages` and `namespace`. Returned value is ignored.
#'
#' @param expr A call or R expression wrapped in curly braces to evaluate on a
#' [`worker`][worker_class]. Will have access to any variables defined
#' by `vars`, as well as the [`jobqueue's`][jobqueue_class] `globals`,
#' `packages`, and `init` configuration. See `vignette('eval')`.
#'
#' @param vars A named list of variables to make available to `expr` during
#' evaluation. Alternatively, an object that can be coerced to a named
#' list with `as.list()`, e.g. named vector, data.frame, or environment.
#' Or a `function (job)` that returns such an object.
#'
#' @param timeout A named numeric vector indicating the maximum number of
#' seconds allowed for each state the [`job`][job_class] passes through,
#' or 'total' to apply a single timeout from 'submitted' to 'done'. Can
#' also limit the 'starting' state for [`workers`][worker_class]. A
#' `function (job)` can be used in place of a number.
#' Example: `timeout = c(total = 2.5, running = 1)`.
#' See `vignette('stops')`.
#'
#' @param hooks A named list of functions to run when the [`job`][job_class]
#' state changes, of the form
#' `hooks = list(created = function (worker) {...})`. Or a
#' `function (job)` that returns the same. Names of
#' [`worker`][worker_class] hooks are typically `'created'`,
#' `'submitted'`, `'queued'`, `'dispatched'`, `'starting'`, `'running'`,
#' `'done'`, or `'*'` (duplicates okay). See `vignette('hooks')`.
#'
#' @param reformat Set `reformat = function (job)` to define what
#' `<job>$result` should return. The default, `reformat = NULL` passes
#' `<job>$output` to `<job>$result` unchanged.
#' See `vignette('results')`.
#'
#' @param signal Should calling `<job>$result` signal on condition objects?
#' When `FALSE`, `<job>$result` will return the object without
#' taking additional action. Setting to `TRUE` or a character vector of
#' condition classes, e.g. `c('interrupt', 'error', 'warning')`, will
#' cause the equivalent of `stop(<condition>)` to be called when those
#' conditions are produced. Alternatively, a `function (job)` that
#' returns `TRUE` or `FALSE`. See `vignette('results')`.
#'
#' @param cpus How many CPU cores to reserve for this [`job`][job_class]. Or a
#' `function (job)` that returns the same. Used to limit the number of
#' [`jobs`][job_class] running simultaneously to respect
#' `<jobqueue>$max_cpus`. Does not prevent a [`job`][job_class] from
#' using more CPUs than reserved.
#'
#' @param max_cpus Total number of CPU cores that can be reserved by all
#' running [`jobs`][job_class] (`sum(<job>$cpus)`). Does not enforce
#' limits on actual CPU utilization.
#'
#' @param workers How many background [`worker`][worker_class] processes to
#' start. Set to more than `max_cpus` to enable standby
#' [`workers`][worker_class] to quickly swap out with
#' [`workers`][worker_class] that need to restart.
#'
#' @param job A [`job`][job_class] object, as created by `job_class$new()`.
#'
#' @param stop_id If an existing [`job`][job_class] in the
#' [`jobqueue`][jobqueue_class] has the same `stop_id`, that
#' [`job`][job_class] will be stopped and return an 'interrupt'
#' condition object as its result. `stop_id` can also be a
#' `function (job)` that returns the `stop_id` to assign to a given
#' [`job`][job_class]. A `stop_id` of `NULL` disables this feature.
#' See `vignette('stops')`.
#'
#' @param copy_id If an existing [`job`][job_class] in the
#' [`jobqueue`][jobqueue_class] has the same `copy_id`, the newly
#' submitted [`job`][job_class] will become a "proxy" for that earlier
#' [`job`][job_class], returning whatever result the earlier
#' [`job`][job_class] returns. `copy_id` can also be a `function (job)`
#' that returns the `copy_id` to assign to a given [`job`][job_class].
#' A `copy_id` of `NULL` disables this feature.
#' See `vignette('stops')`.
#'
#' @param reason Passed to `<job>$stop()` for any [`jobs`][job_class]
#' currently managed by this [`jobqueue`][jobqueue_class].
#'
#' @param cls Passed to `<job>$stop()` for any [`jobs`][job_class] currently
#' managed by this [`jobqueue`][jobqueue_class].
#'
#' @param state
#' The name of a [`jobqueue`][jobqueue_class] state. Typically one of:
#'
#' * `'*'` - Every time the state changes.
#' * `'.next'` - Only one time, the next time the state changes.
#' * `'starting'` - [`workers`][worker_class] are starting.
#' * `'idle'` - All [`workers`][worker_class] are ready/idle.
#' * `'busy'` - At least one [`worker`][worker_class] is busy.
#' * `'stopped'` - Shutdown is complete.
#'
#' @param func A function that accepts a [`jobqueue`][jobqueue_class] object
#' as input. Return value is ignored.
#'
#' @param ... Arbitrary named values to add to the returned [`job`][job_class]
#' object.
#'
#' @export
jobqueue_class <- R6Class(
classname = "jobqueue",
cloneable = FALSE,
lock_objects = FALSE,
public = list(
#' @description
#' Creates a pool of background processes for handling `$run()` and
#' `$submit()` calls. These [`workers`][worker_class] are initialized
#' according to the `globals`, `packages`, and `init` arguments.
#'
#' @param timeout,hooks,reformat,signal,cpus,stop_id,copy_id
#' Defaults for this [`jobqueue's`][jobqueue_class] `$run()` method.
#' Here only, `stop_id` and `copy_id` must be either a
#' `function (job)` or `NULL`. `hooks` can set
#' [`jobqueue`][jobqueue_class], [`worker`][worker_class], and/or
#' [`job`][job_class] hooks - see the "Attaching" section in
#' `vignette('hooks')`.
#'
#' @return A [`jobqueue`][jobqueue_class] object.
initialize = function (
globals = NULL,
packages = NULL,
namespace = NULL,
init = NULL,
max_cpus = availableCores(),
workers = ceiling(max_cpus * 1.2),
timeout = NULL,
hooks = NULL,
reformat = NULL,
signal = FALSE,
cpus = 1L,
stop_id = NULL,
copy_id = NULL ) {
q_initialize(
self, private,
globals, packages, namespace, init, max_cpus, workers,
timeout, hooks, reformat, signal, cpus,
stop_id, copy_id )
},
#' @description
#' Print method for a [`jobqueue`][jobqueue_class].
#' @param ... Arguments are not used currently.
print = function (...) q_print(self, private),
#' @description
#' Creates a [`job`][job_class] object and submits it to the
#' [`jobqueue`][jobqueue_class] for running. Any `NA` arguments will be
#' replaced with their value from `jobqueue_class$new()`.
#'
#' @return The new [`job`][job_class] object.
run = function (
expr,
vars = list(),
timeout = NA,
hooks = NA,
reformat = NA,
signal = NA,
cpus = NA,
stop_id = NA,
copy_id = NA,
... ) {
q_run(
self, private,
expr, vars,
timeout, hooks, reformat, signal,
cpus, stop_id, copy_id, ... )
},
#' @description
#' Adds a [`job`][job_class] to the [`jobqueue`][jobqueue_class] for
#' running on a background process.
#' @return This [`jobqueue`][jobqueue_class], invisibly.
submit = function (job)
q_submit(self, private, job),
#' @description
#' Blocks until the [`jobqueue`][jobqueue_class] enters the given state.
#' @param timeout Stop the [`jobqueue`][jobqueue_class] if it takes longer
#' than this number of seconds, or `NULL`.
#' @param signal Raise an error if encountered (will also be recorded in
#' `<jobqueue>$cnd`).
#' @return This [`jobqueue`][jobqueue_class], invisibly.
wait = function (state = 'idle', timeout = NULL, signal = TRUE)
u_wait(self, private, state, timeout, signal),
#' @description
#' Attach a callback function to execute when the
#' [`jobqueue`][jobqueue_class] enters `state`.
#' @return A function that when called removes this callback from the
#' [`jobqueue`][jobqueue_class].
on = function (state, func)
u_on(self, private, 'QH', state, func),
#' @description
#' Stop all [`jobs`][job_class] and [`workers`][worker_class].
#' @return This [`jobqueue`][jobqueue_class], invisibly.
stop = function (reason = 'jobqueue shut down by user', cls = NULL) {
q_stop(self, private, reason, cls)
}
),
private = list(
finalize = function (reason = 'jobqueue was garbage collected', cls = NULL) {
fmap(private$.workers, 'stop', reason, cls)
fmap(private$.jobs, 'stop', reason, cls)
try(silent = TRUE, dir_delete(private$q_dir))
return (invisible(NULL))
},
.hooks = list(),
.jobs = list(),
.workers = list(),
.state = 'initializing',
.cnd = NULL,
.is_done = FALSE,
n_workers = NULL,
up_since = NULL,
total_runs = 0L,
is_ready = FALSE,
j_conf = list(),
w_conf = list(),
max_cpus = NULL,
q_dir = NULL,
set_state = function (state) u__set_state(self, private, state),
dispatch = function (...) q__dispatch(self, private)
),
active = list(
#' @field hooks
#' A named list of currently registered callback hooks.
hooks = function () private$.hooks,
#' @field jobs
#' Get or set - List of [jobs][job_class] currently managed by this
#' [`jobqueue`][jobqueue_class].
jobs = function (value) q_jobs(private, value),
#' @field state
#' The [`jobqueue's`][jobqueue_class] state: `'starting'`, `'idle'`,
#' `'busy'`, `'stopped'`, or `'error.'`
state = function () private$.state,
#' @field uid
#' A short string, e.g. `'Q1'`, that uniquely identifies this
#' [`jobqueue`][jobqueue_class].
uid = function () basename(private$q_dir),
#' @field tmp
#' The [`jobqueue`][jobqueue_class]'s temporary directory.
tmp = function () private$q_dir,
#' @field workers
#' Get or set - List of [`workers`][worker_class] used for processing
#' [`jobs`][job_class].
workers = function (value) q_workers(private, value),
#' @field cnd
#' The error that caused the [`jobqueue`][jobqueue_class] to stop.
cnd = function () private$.cnd
)
)
q_initialize <- function (
self, private,
globals, packages, namespace, init, max_cpus, workers,
timeout, hooks, reformat, signal, cpus,
stop_id, copy_id ) {
init_subst <- substitute(init, env = parent.frame())
private$up_since <- Sys.time()
timeout <- validate_timeout(timeout, func_ok = TRUE)
# Assign hooks by q_ and w_ prefixes
hooks <- validate_list(hooks)
if (!all(names(hooks) %in% c('queue', 'worker', 'job')))
hooks <- list(
job = hooks[grep('^[qwj]_', names(hooks), invert = TRUE)],
queue = hooks[grep('^q_', names(hooks))],
worker = hooks[grep('^w_', names(hooks))] )
hooks[['worker']] <- c(list('idle' = private$dispatch), hooks[['worker']])
# Queue configuration
private$q_dir <- dir_create(ENV$jq_dir, 'Q')
private$.hooks <- validate_hooks(hooks[['queue']], 'QH')
private$max_cpus <- validate_positive_integer(max_cpus, if_null = availableCores())
private$n_workers <- validate_positive_integer(workers, if_null = ceiling(private$max_cpus * 1.2))
# Worker configuration
private$w_conf[['hooks']] <- validate_hooks(hooks[['worker']], 'WH')
private$w_conf[['timeout']] <- timeout[['starting']]
saveRDS(
file = file.path(private$q_dir, 'config.rds'),
object = list(
'globals' = validate_list(globals),
'packages' = validate_character_vector(packages),
'namespace' = validate_string(namespace, null_ok = TRUE),
'init' = validate_expression(init, init_subst) ))
# Job configuration defaults
private$j_conf[['timeout']] <- timeout[setdiff(names(timeout), 'starting')]
private$j_conf[['hooks']] <- validate_hooks(hooks[['job']], 'JH', func_ok = TRUE)
private$j_conf[['signal']] <- validate_character_vector(signal, func_ok = TRUE, bool_ok = TRUE)
private$j_conf[['cpus']] <- validate_positive_integer(cpus, func_ok = TRUE, if_null = 1L)
private$j_conf[['reformat']] <- validate_function(reformat)
private$j_conf[['stop_id']] <- validate_function(stop_id)
private$j_conf[['copy_id']] <- validate_function(copy_id)
private$set_state('starting')
# Creates `n_workers`, at most `max_cpus` starting at a time.
repeat {
workers <- self$workers
states <- map(workers, 'state')
if (length(i <- which(states == 'stopped'))) {
i <- i[[1]]
cnd <- as_error(workers[[i]]$cnd)
self$stop(cnd)
cnd_signal(cnd)
stop(cnd$message)
}
else if (sum(states == 'idle') == private$n_workers) {
private$is_ready <- TRUE
private$set_state('idle')
break
}
else { # Start more workers
n <- min(
private$n_workers - length(states),
private$max_cpus - sum(states != 'idle') )
for (i in integer(n)) {
worker <- worker_class$new(
hooks = private$w_conf[['hooks']],
globals = self,
wait = FALSE,
timeout = private$w_conf[['timeout']] )
self$workers %<>% c(worker)
}
}
later::run_now(timeoutSecs = 0.6)
}
return (invisible(self))
}
q_print <- function (self, private) {
js <- map(self$jobs, 'state')
ws <- map(self$workers, 'state')
cpus <- sum(map(self$jobs[js == 'running'], 'cpus'))
uptime <- format(round(Sys.time() - private$up_since))
width <- min(50L, getOption("width"))
rlang::local_options(cli.width = width)
state <- switch(
private$.state,
starting = col_yellow('{.emph starting...}'),
idle = col_green('idle'),
busy = col_green('busy'),
col_red('{private$.state}') )
cli_rule(left = '{self$uid} {.cls {class(self)}}', right = state)
cli_text("\n")
div <- cli_div(theme = list('div.bullet-*' = list('margin-left' = 4)))
cli_bullets(c('*' = '{.val {length(js)}} job{?s} - {.val {sum(js == "running")}} {?is/are} running'))
cli_bullets(c('*' = '{.val {length(ws)}} worker{?s} - {.val {sum(ws == "busy")}} {?is/are} busy'))
cli_bullets(c('*' = '{.val {cpus}} of {.val {private$max_cpus}} CPU{?s} {?is/are} currently in use'))
for (i in setdiff(unique(ws), c('idle', 'busy')))
cli_bullets(c('!' = '{.val {sum(ws == i)}} worker{?s} {?is/are} {col_red(i)}'))
cli_end(div)
cli_text("\n")
cli_rule(center = col_grey('{private$total_runs} job{?s} run in {uptime}'))
return (invisible(self))
}
q_run <- function (
self, private,
expr, vars,
timeout, hooks, reformat, signal,
cpus, stop_id, copy_id, ...) {
expr_subst <- substitute(expr, env = parent.frame())
expr <- validate_expression(expr, expr_subst, null_ok = FALSE)
# Replace NA values with defaults from jobqueue_class$new() call.
j_conf <- private$j_conf
if (is_formula(stop_id)) stop_id <- as_function(stop_id)
if (is_formula(copy_id)) copy_id <- as_function(copy_id)
job <- job_class$new(
expr = expr,
vars = vars,
queue = self,
timeout = if (is_na(timeout)) j_conf[['timeout']] else timeout,
hooks = if (is_na(hooks)) j_conf[['hooks']] else hooks,
reformat = if (is_na(reformat)) j_conf[['reformat']] else reformat,
signal = if (is_na(signal)) j_conf[['signal']] else signal,
cpus = if (is_na(cpus)) j_conf[['cpus']] else cpus,
stop_id = if (is_na(stop_id)) j_conf[['stop_id']] else stop_id,
copy_id = if (is_na(copy_id)) j_conf[['copy_id']] else copy_id,
... )
job$.trace <- trace_back(bottom = 2L)
self$submit(job)
return (invisible(job))
}
q_submit <- function (self, private, job) {
if (!inherits(job, 'job'))
cli_abort('`job` must be a job object, not {.type {job}}.')
if (job$is_done) return (invisible(job))
if (is_null(job$.call)) job$.call <- caller_env(n = 2L)
if (is_null(job$.trace)) job$.trace <- trace_back(bottom = job$.call)
job$queue <- self
private$total_runs <- private$total_runs + 1L
job$state <- 'submitted'
if (job$state != 'submitted')
return (invisible(job))
# Check for `stop_id` hash collision => stop the other job.
if (is_formula(job$stop_id)) job$stop_id <- as_function(job$stop_id)
if (is_function(job$stop_id)) job$stop_id <- job$stop_id(job)
if (!is_null(id <- job$stop_id))
if (nz(stop_jobs <- get_eq(self$jobs, 'stop_id', id)))
fmap(stop_jobs, 'stop', 'duplicated stop_id', 'superseded')
# Check for `copy_id` hash collision => proxy the other job.
if (is_formula(job$copy_id)) job$copy_id <- as_function(job$copy_id)
if (is_function(job$copy_id)) job$copy_id <- job$copy_id(job)
if (!is_null(id <- job$copy_id))
if (nz(copy_jobs <- get_eq(self$jobs, 'copy_id', id)))
job$proxy <- copy_jobs[[1]]
if (job$state == 'submitted') {
self$jobs <- c(self$jobs, list(job))
job$state <- 'queued'
private$dispatch()
}
return (invisible(job))
}
# Stop all jobs and prevent more from being added.
q_stop <- function (self, private, reason, cls) {
private$is_ready <- FALSE
private$.is_done <- TRUE
private$.cnd <- as_cnd(reason, c(cls, 'interrupt'))
private$finalize(reason = private$.cnd)
private$set_state('stopped')
self$workers <- list()
self$jobs <- list()
return (invisible(self))
}
# Use any idle workers to run queued jobs.
q__dispatch <- function (self, private) {
# Purge jobs that are done.
self$jobs <- get_eq(self$jobs, 'is_done', FALSE)
# Only start jobs if this jobqueue is ready.
if (!private$is_ready) return (invisible(NULL))
# See how many remaining CPUs are available.
running_jobs <- get_eq(self$jobs, 'state', 'running')
free_cpus <- private$max_cpus - sum(map(running_jobs, 'cpus'))
if (free_cpus < 1) return (invisible(NULL))
# Connect queued jobs to idle workers.
idle_workers <- get_eq(self$workers, 'state', 'idle')
queued_jobs <- get_eq(self$jobs, 'state', 'queued')
queued_jobs <- queued_jobs[cumsum(map(queued_jobs, 'cpus')) <= free_cpus]
for (i in seq_len(min(length(idle_workers), length(queued_jobs)))) {
try(idle_workers[[i]]$run(queued_jobs[[i]]))
}
# Update `<jobqueue>$state` and trigger callback hooks.
busy_workers <- get_eq(self$workers, 'state', 'busy')
queue_state <- ifelse(length(busy_workers) > 0, 'busy', 'idle')
private$set_state(state = queue_state)
return (invisible(NULL))
}
# Active bindings to validate new values.
q_jobs <- function (private, value) {
if (missing(value)) return (private$.jobs)
private$.jobs <- validate_list(value, of_type = 'job', named = FALSE)
}
q_workers <- function (private, value) {
if (missing(value)) return (private$.workers)
private$.workers <- validate_list(value, of_type = 'worker', named = FALSE)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.