library(jobqueue) jq <- jobqueue(workers = 1)
Callbacks are the cornerstone of asynchronous programming.
If you want to calculate 2 + 2
and show the results, the synchronous
programming approach would be:
message('Result = ', 2 + 2) #> Result = 20
In asynchronous programming, this task is broken apart into two discrete steps: computation and result handling. Using jobqueue, this looks like:
job <- jq$run({ 2 + 2 }) job$on('done', ~message('Result = ', .$result)) #> Result = 20
The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result.
Callbacks are not limited to just the job
finishing. See the "State Triggers"
section for a list of all events that can trigger a callback.
Important
Hooks are evaluated on the main process, not the background processes. Therefore, ensure that the callback functions execute quickly so as to not delay
job
handling.
The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed.
This is a great place for R's new shorthand function definitions (R >= 4.1.0).
Lambda syntax can also be used (see rlang::as_function()
).
# jobqueue hooks hook <- function (jq) { message('jobqueue is ', jq$state) } hook <- \(jq) message('jobqueue is ', jq$state) hook <- ~message('jobqueue is ', .$state) # worker hooks hook <- function (worker) { message('worker is ', worker$state) } hook <- \(w) message('worker is ', w$state) hook <- ~message('worker is ', .$state) # job hooks hook <- function (job) { message('job is ', job$state) } hook <- \(j) message('job is ', j$state) hook <- ~message('job is ', .$state)
jobqueue
, worker
, and job
objects update their $state
as described in
the tables below. Each time the state changes, any callbacks registered to that
state are executed. In addition, you can register state = '*'
or
state='.next'
which trigger regardless of the present state name.
| State | Triggers |
|----------------|---------------------------------------------------------|
| '*'
| Every time the state changes. |
| '.next'
| Only one time, the next time the state changes. |
jobqueue
States| State | Triggers |
|----------------|-----------------------------------------------------------|
| 'starting'
| After initialization, before worker
s are started. |
| 'idle'
| When all workers
are idle. Also, after initial startup. |
| 'busy'
| At least one worker
is busy. |
| 'stopped'
| After <jobqueue>$stop()
is called. |
worker
States| State | Triggers |
|----------------|---------------------------------------------------------|
| 'starting'
| Background process is being configured. |
| 'idle'
| Waiting on job
s to be submitted. |
| 'busy'
| After a job
starts running. |
| 'stopped'
| After <worker>$stop()
is called. |
job
States| State | Triggers |
|----------------|---------------------------------------------------------|
| 'created'
| After job
object initialization. |
| 'submitted'
| After <job>$queue
is assigned. |
| 'queued'
| After stop_id
and copy_id
are resolved. |
| 'starting'
| Before evaluation begins. |
| 'running'
| After <job>$worker
is set and evaluation begins. |
| 'done'
| After <job>$output
is assigned. |
Callbacks can be attached to jobqueue
, worker
, or job
objects.
You can add callbacks either in the constructor, or later with $on()
.
hook <- ~message(.$uid, ' is ', .$state) jq <- jobqueue(hooks = list(q_idle = hook)) w <- worker_class$new(hooks = list(idle = hook)) j <- job_class$new(hooks = list(done = hook)) jq$on('busy', hook) w$on('busy', hook) j$on('starting', hook)
In jobqueue()
, hooks
can set hooks for the jobqueue
, worker
, and job
.
The rules are:
q_
, w_
, or j_
attaches the hook to the jobqueue
, worker
, or job
, respectively.jobs
.hooks
, of the format:jobqueue( 'hooks' = list( 'queue' = list(idle = hook), 'worker' = list(idle = hook), 'job' = list(done = hook) ))
Callbacks attached in the constructor cannot be removed.
When you attach a callback with $on()
, the return value is a function, which,
when called, will remove that callback from the object.
job <- job_class$new( 'expr' = { 3.14 }, 'hooks' = list(done = ~message('ABC')) ) off <- job$on('done', ~message('XYZ')) off() jq$submit(job) #> ABC
job
HooksWhen you create a jobqueue
, you can define a set of callbacks to automatically
apply to all job
s created with the <jobqueue>$run()
command.
n <- 0 jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:5) jq$run({ 'Hi' }) n #> [1] 5
How does jobqueue()
know to apply the hooks
to job
objects instead of the jobqueue
or worker
objects? Unless otherwise indicated, jobqueue()
hooks
are assumed to be for job
s. You can also explicitly specify that hooks
are for job
s by using the formats described in the "Attaching Callbacks" section above.
jq <- jobqueue(hooks = list(j_created = ~{ n <<- n + 1 } )) # or jq <- jobqueue(hooks = list(job = list(created = ~{ n <<- n + 1 } )))
If you set hooks
in <jobqueue>$run()
, those hooks will REPLACE the job
hooks from jobqueue()
.
n <- 0 jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:3) jq$run({ 'Hi' }, hooks = list(done = ~message(.$result))) #> Hi #> Hi #> Hi n #> [1] 0
Below, we'll set up a callback function that triggers when each job
enters the 'queued'
state. It will modify the job
, adding a custom <job>$priority
field to the job
object. Then it will modify the jobqueue
's internal list of jobs (<jobqueue>$jobs
), sorting them according to each job
's <job>$priority
. Last, it will attach additional callbacks to the job
to output timing information upon exit from the 'queued
' state and upon entry into the 'done'
state.
library(glue) library(jobqueue) # Our callback/hook function. prioritize <- function (job) { queue <- job$jobqueue queue_jobs <- job$jobqueue$jobs # Apply a random priority to this job. job$priority <- round(runif(1) * 10) - 5 # Sort all this jobqueue's jobs by priority (including this job). priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] # Add hooks to this job to report queued/total times. t1 <- Sys.time() tdiff <- function () format(round(Sys.time() - t1, 1)) job$on('.next', ~message(glue( 'job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' ))) job$on('done', ~message(glue( 'job {.$uid} (priority {.$priority}) finished in {tdiff()}' ))) } # A single worker best illustrates processing order. jq <- jobqueue( 'workers' = 1, 'hooks' = list(queued = prioritize) ) for (i in 1:5) { job <- jq$run({ 3.14 }) message(glue_data(job, 'Created job {uid} with priority {priority}')) } #> job J11 (priority -3) was dispatched after 0.1 secs #> Created job J11 with priority -3 #> Created job J12 with priority -2 #> Created job J13 with priority 1 #> Created job J14 with priority 0 #> Created job J15 with priority -1 #> job J11 (priority -3) finished in 0.7 secs #> job J12 (priority -2) was dispatched after 0.6 secs #> job J12 (priority -2) finished in 1.1 secs #> job J15 (priority -1) was dispatched after 0.7 secs #> job J15 (priority -1) finished in 1.3 secs #> job J14 (priority 0) was dispatched after 1.4 secs #> job J14 (priority 0) finished in 2 secs #> job J13 (priority 1) was dispatched after 2.1 secs #> job J13 (priority 1) finished in 2.6 secs
In an actual application, you could set <job>$priority
based on <job>$vars
.
prioritize <- function (job) { # Give priority to lower number of replications job$priority <- job$vars$replications queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } jq <- jobqueue(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))
Or, define the custom <job>$priority
field in <jobqueue>$run()
.
prioritize <- function (job) { queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } jq <- jobqueue(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvv for (reps in 5:1) job <- jq$run({ 3.14 }, priority = reps)
Or, set <job>$priority
in a hook that triggers before prioritize()
is triggered.
set_priority <- function (job) { job$priority <- job$vars$replications } prioritize <- function (job) { queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } # vvvvvvvvvvvvvvvvvvvvvv jq <- jobqueue(hooks = list(created = set_priority, queued = prioritize)) for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))
Say you're hosting a web service, where users are allowed to submit one job
every 30 seconds. job
s can take more than 30 seconds, so the solution is more
complex than setting stop_id = user_id
. And what if you want to stop the new
job
instead of the old one?
rate_limit <- function (job) { job$t_start <- Sys.time() for (j in job$jobqueue$jobs) if (j$user_id == job$user_id) if (job$t_start - j$t_start < 30) job$stop('Rate Limit Exceeded') } jq <- jobqueue(hooks = list(submitted = rate_limit)) j_A1 <- jq$run({ 42 }, user_id = 'A') j_B1 <- jq$run({ 42 }, user_id = 'B') j_B2 <- jq$run({ 42 }, user_id = 'B') j_B1$result #> [1] 42 j_B2$result #> <interrupt: Rate Limit Exceeded>
Note that the above code won't completely solve the rate limiting task. If a
user's job
only takes five seconds to complete, then they could submit a job
every six seconds and the jobqueue
would be none the wiser.
To give the jobqueue
awareness of previously completed job
s, you'll need to
persistently store per-user job
start times - like in the below solution.
t_user <- list() rate_limit <- function (job) { t_start <- Sys.time() t_diff <- t_user[[job$user_id]] - t_start if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded') } else { t_user[[job$user_id]] <<- t_start } } jq <- jobqueue(hooks = list(created = rate_limit))
In the first example, we attached a hook to 'submitted'
because that's when
<job>$jobqueue
becomes available in callbacks. In the latter example, we
attached a hook to 'created'
instead because we didn't need <job>$jobqueue
for that solution. Check the trigger order listed in the "Callback Triggers"
section above, and attach callbacks as early as possible to expedite job
handling.
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.