library(DBI) knitr::opts_chunk$set( error = (Sys.getenv("IN_PKGDOWN") != "true"), collapse = TRUE, comment = "#>", eval = RPostgres::postgresHasDefault() ) con <- NULL rp <- NULL rs <- NULL
Imagine you have an R process that is relatively intensive, based on user input.
To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots. However, to do this you need to co-ordinate between all of your servers (or workers). How do you decide which server works on what? What if one server dies mid-way? To decide this, we need a work queue, also known as a job queue or task queue. This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool, like RabbitMQ.
In this example, our work will be generating square roots. We'll keep track of the results in a table:
library(DBI) con <- dbConnect(RPostgres::Postgres()) dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") dbExecute(con, " CREATE TABLE sqroot_vignette_example ( in_val INTEGER PRIMARY KEY, out_val DOUBLE PRECISION NULL ) ")
When a client wants a square root value, it can insert a new row into a table, filling in_val
.
We'll then have a bunch of workers that will calculate the results for the client, and fill in out_val
.
To manage these workers, we will combine 2 PostgreSQL concepts:
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } if (!is.null(rp)) { rp$wait() ; rp <- NULL }
The Postgres LISTEN
and NOTIFY
commands allow you to send and receive messages between clients connected to a PostgreSQL database.
This is known as a publish/subscribe architecture.
We tell Postgres that we are interested in receiving messages using LISTEN
. For example:
con <- dbConnect(RPostgres::Postgres()) dbExecute(con, "LISTEN grapevine")
...in this case, "grapevine" is arbitrary, we don't need to create channels ahead of time.
To make sure we have something to receive, we can start a separate R process using callr.
Ordinarily this would be part of another R script, maybe on another computer.
This will wait a bit, and use NOTIFY
to send a message, then finish:
rp <- callr::r_bg(function() { library(DBI) Sys.sleep(0.3) db_notify <- dbConnect(RPostgres::Postgres()) dbExecute(db_notify, "NOTIFY grapevine, 'psst'") dbDisconnect(db_notify) })
Finally, we should wait for any incoming messages. To do this, use postgresWaitForNotify
.
The payload will contain the message from the other R process:
# Sleep until we get the message n <- NULL while (is.null(n)) { n <- RPostgres::postgresWaitForNotify(con) } n$payload
We can use LISTEN/NOTIFY to inform all workers that there is something to be done, but how do we decide which worker actually does the work?
This is done using SKIP LOCKED
.
We notify all workers that the input 99
is ready for processing.
After receiving this, they all do the following:
rs <- dbSendQuery(con, " SELECT in_val FROM sqroot_vignette_example WHERE in_val = $1 FOR UPDATE SKIP LOCKED ", params = list(99))
One lucky worker will get a row back, but thanks to FOR UPDATE
, the row is now locked.
For any other worker, as the row is now locked, they will skip over it (SKIP LOCKED
) and find something else to do.
If there are no other jobs available, then nothing will be returned.
Using SKIP LOCKED is discussed in more detail in this article.
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } if (!is.null(rp)) { rp$wait() ; rp <- NULL }
Now we can put the concepts together. The following implements our worker as a function (again, this would be running as a script on several servers):
worker <- function() { library(DBI) db_worker <- dbConnect(RPostgres::Postgres()) on.exit(dbDisconnect(db_worker)) dbExecute(db_worker, "LISTEN sqroot") dbExecute(db_worker, "LISTEN sqroot_shutdown") while (TRUE) { # Wait for new work to do n <- RPostgres::postgresWaitForNotify(db_worker, 60) if (is.null(n)) { # If nothing to do, send notifications of any not up-to-date work dbExecute(db_worker, " SELECT pg_notify('sqroot', in_val::TEXT) FROM sqroot_vignette_example WHERE out_val IS NULL ") next } # If we've been told to shutdown, stop right away if (n$channel == 'sqroot_shutdown') { writeLines("Shutting down.") break } in_val <- strtoi(n$payload) tryCatch( { dbWithTransaction(db_worker, { # Try and fetch the item we got notified about rs <- dbSendQuery(db_worker, " SELECT in_val FROM sqroot_vignette_example WHERE out_val IS NULL -- if another worker already finished, don't reprocess AND in_val = $1 FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time ", params = list(in_val)) in_val <- dbFetch(rs)[1, 1] dbClearResult(rs) if (!is.na(in_val)) { # Actually do the sqrt writeLines(paste("Sqroot-ing", in_val, "... ")) Sys.sleep(in_val * 0.1) out_val <- sqrt(in_val) # Update the datbase with the result dbExecute(db_worker, " UPDATE sqroot_vignette_example SET out_val = $1 WHERE in_val = $2 ", params = list(out_val, in_val)) } else { writeLines(paste("Not sqroot-ing as another worker got there first")) } }) }, error = function(e) { # Something went wrong. Report error and carry on writeLines(paste("Failed to sqroot:", e$message)) }) } }
The worker connects to the database, starts listening and loops indefinitely.
Let's use callr again to start 2 workers:
stdout_1 <- tempfile() stdout_2 <- tempfile() rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1) rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2) Sys.sleep(1) # Give workers a chance to set themselves up
Now our client can add some values to our table and notify the workers that there's something to do:
con <- dbConnect(RPostgres::Postgres()) add_sqroot <- function(in_val) { dbExecute(con, " INSERT INTO sqroot_vignette_example (in_val) VALUES ($1) ", params = list(in_val)) dbExecute(con, " SELECT pg_notify('sqroot', $1) ", params = list(in_val)) } add_sqroot(7) add_sqroot(8) add_sqroot(9)
...after a wait, the answers should have been populated by the workers for us:
Sys.sleep(3) rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val") dbFetch(rs) dbClearResult(rs) ; rs <- NULL
Finally, we can use NOTIFY
to stop all the workers:
dbExecute(con, "NOTIFY sqroot_shutdown, ''")
And see what messages were printed as they run:
# We can't control which worker will process the first entry, # so we sort the results so the vignette output stays the same. outputs <- sort(c( paste(readLines(con = stdout_1), collapse = "\n"), paste(readLines(con = stdout_2), collapse = "\n"))) writeLines(outputs[[1]]) writeLines(outputs[[2]])
Notice that the work has been shared between the 2 workers. If these 2 weren't enough, we could happily add more to keep the system going.
dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") dbDisconnect(con) rp$wait()
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.