Nothing
#' @title Create a message queue.
#' @description See the README at
#' [https://github.com/wlandau/txtq](https://github.com/wlandau/txtq)
#' and the examples in this help file for instructions.
#' @export
#' @param path Character string giving the file path of the queue.
#' The `txtq()` function creates a folder at this path to store
#' the messages.
#' @param use_lock_file Logical, whether to use a lock file
#' for blocking operations. Should only be `FALSE` in specialized
#' use cases with no parallel computing (for example, when a
#' `txtq` is used as a database and accessed by only one process.)
#' @section NFS:
#' As an interprocess communication tool,
#' `txtq` relies on the [`filelock`](https://github.com/r-lib/filelock)
#' package to prevent race conditions.
#' Unfortunately, `filelock` cannot prevent race conditions
#' on network file systems (NFS), which means neither can `txtq`.
#' In other words, on certain common kinds of clusters,
#' `txtq` cannot reliably manage interprocess communication
#' for processes on different computers.
#' However, it can still serve as a low-tech replacement
#' for a simple non-threadsafe database.
#' @examples
#' path <- tempfile() # Define a path to your queue.
#' q <- txtq(path) # Create a new queue or recover an existing one.
#' q$validate() # Check if the queue is corrupted.
#' list.files(q$path()) # The queue lives in this folder.
#' q$list() # You have not pushed any messages yet.
#' # Let's say two parallel processes (A and B) are sharing this queue.
#' # Process A sends Process B some messages.
#' # You can only send character vectors.
#' q$push(title = "Hello", message = "process B.")
#' q$push(
#' title = c("Calculate", "Calculate"),
#' message = c("sqrt(4)", "sqrt(16)")
#' )
#' q$push(title = "Send back", message = "the sum.")
#' # See your queued messages.
#' # The `time` is a formatted character string from `Sys.time()`
#' # indicating when the message was pushed.
#' q$list()
#' q$count() # Number of messages in the queue.
#' q$total() # Number of messages that were ever queued.
#' q$empty()
#' # Now, let's assume process B comes online. It can consume
#' # some messages, locking the queue so process A does not
#' # mess up the data.
#' q$pop(2) # Return and remove the first messages that were added.
#' # With those messages popped, we are farther along in the queue.
#' q$list()
#' q$count() # Number of messages in the queue.
#' q$list(1) # You can specify the number of messages to list.
#' # But you still have a log of all the messages that were ever pushed.
#' q$log()
#' q$total() # Number of messages that were ever queued.
#' # q$pop() with no arguments just pops one message.
#' # Call pop(-1) to pop all the messages at once.
#' q$pop()
#' # There are more instructions.
#' q$pop()
#' # Let's say Process B follows the instructions and sends
#' # the results back to Process A.
#' q$push(title = "Results", message = as.character(sqrt(4) + sqrt(16)))
#' # Process A now has access to the results.
#' q$pop()
#' # Clean out the popped messages
#' # so the database file does not grow too large.
#' q$push(title = "not", message = "popped")
#' q$count()
#' q$total()
#' q$list()
#' q$log()
#' q$clean()
#' q$count()
#' q$total()
#' q$list()
#' q$log()
#' # Optionally remove all messages from the queue.
#' q$reset()
#' q$count()
#' q$total()
#' q$list()
#' q$log()
#' # Destroy the queue's files altogether.
#' q$destroy()
#' # This whole time, the queue was locked when either Process A
#' # or Process B accessed it. That way, the data stays correct
#' # no matter who is accessing/modifying the queue and when.
#' #
#' # You can import a `txtq` into another `txtq`.
#' # The unpopped messages are grouped together
#' # and sorted by timestamp.
#' # Same goes for the popped messages.
#' q_from <- txtq(tempfile())
#' q_to <- txtq(tempfile())
#' q_from$push(title = "from", message = "popped")
#' q_from$push(title = "from", message = "unpopped")
#' q_to$push(title = "to", message = "popped")
#' q_to$push(title = "to", message = "unpopped")
#' q_from$pop()
#' q_to$pop()
#' q_to$import(q_from)
#' q_to$list()
#' q_to$log()
txtq <- function(path, use_lock_file = TRUE) {
R6_txtq$new(path = path, use_lock_file = use_lock_file)
}
#' @title R6 class for `txtq` objects
#' @description See the [txtq()] function for full documentation and usage.
#' @seealso txtq
#' @export
R6_txtq <- R6::R6Class(
classname = "R6_txtq",
private = list(
path_dir = character(0),
db_file = character(0),
head_file = character(0),
lock_file = character(0),
total_file = character(0),
use_lock_file = logical(0),
txtq_establish = function(path, use_lock_file) {
dir_create(path)
private$path_dir <- path
private$db_file <- file.path(private$path_dir, "db")
private$head_file <- file.path(private$path_dir, "head")
private$total_file <- file.path(private$path_dir, "total")
private$lock_file <- file.path(private$path_dir, "lock")
private$use_lock_file <- use_lock_file
private$txtq_exclusive({
file_create(private$db_file)
if (!file.exists(private$head_file)) {
private$txtq_set_head(0)
}
if (!file.exists(private$total_file)) {
private$txtq_set_total(0)
}
})
private$txtq_validate()
},
txtq_exclusive = function(code) {
if (private$use_lock_file) {
on.exit(filelock::unlock(lock))
lock <- filelock::lock(private$lock_file)
}
force(code)
},
txtq_get_head = function() {
scan(private$head_file, quiet = TRUE, what = integer())
},
txtq_set_head = function(n) {
write(x = as.integer(n), file = private$head_file, append = FALSE)
},
txtq_get_total = function() {
scan(private$total_file, quiet = TRUE, what = integer())
},
txtq_set_total = function(n) {
write(x = as.integer(n), file = private$total_file, append = FALSE)
},
# Faster than txtq_get_total and txtq_set_total
# because it uses fewer connections:
txtq_inc_total = function(n) {
con <- file(private$total_file, "r+")
on.exit(close(con))
old <- scan(con, quiet = TRUE, what = integer())
out <- old + as.integer(n)
write(x = out, file = con, append = FALSE)
},
txtq_count = function() {
as.integer(
private$txtq_get_total() - private$txtq_get_head()
)
},
txtq_pop = function(n) {
out <- private$txtq_list(n = n)
new_head <- private$txtq_get_head() + nrow(out)
private$txtq_set_head(new_head)
out
},
txtq_push = function(title, message) {
time <- base64_urlencode(microtime())
out <- paste(
base64_urlencode(as.character(title)),
base64_urlencode(as.character(message)),
time = rep(time, length(title)),
sep = "|"
)
if (length(out) > 0) {
private$txtq_inc_total(length(out))
out <- paste(out, collapse = "\n")
write(x = out, file = private$db_file, append = TRUE)
}
},
txtq_reset = function() {
unlink(private$db_file, force = TRUE)
file_create(private$db_file)
private$txtq_set_head(0)
private$txtq_set_total(0)
},
txtq_clean = function() {
keep <- private$txtq_pop(n = private$txtq_count())
private$txtq_reset()
private$txtq_push(title = keep$title, message = keep$message)
},
txtq_log = function() {
if (length(scan(private$db_file, quiet = TRUE, what = character())) < 1) {
return(null_log)
}
read_db_table(
dbfile = private$db_file,
skip = 0,
n = -1
)
},
txtq_list = function(n) {
if (n == 0L || private$txtq_count() < 1L) {
return(null_log)
}
read_db_table(
dbfile = private$db_file,
skip = private$txtq_get_head(),
n = n
)
},
txtq_validate = function() {
assert_dir(private$path_dir)
assert_file(private$db_file)
assert_file_scalar(private$head_file)
assert_file_scalar(private$total_file)
},
txtq_import = function(queue) {
stopifnot(inherits(queue, "R6_txtq"))
ext_log <- queue$log()
ext_total <- queue$total()
ext_count <- queue$count()
ext_head <- ext_total - ext_count
ext_popped <- seq_len(ext_total) <= ext_head
this_log <- private$txtq_log()
this_total <- private$txtq_get_total()
this_head <- private$txtq_get_head()
this_popped <- seq_len(this_total) <= this_head
# nolint start
new_popped <- rbind(
ext_log[ext_popped,, drop = FALSE],
this_log[this_popped,, drop = FALSE]
)
new_unpopped <- rbind(
ext_log[!ext_popped,, drop = FALSE],
this_log[!this_popped,, drop = FALSE]
)
new_popped <- new_popped[order(new_popped$time),, drop = FALSE]
new_unpopped <- new_unpopped[order(new_unpopped$time),, drop = FALSE]
# nolint end
new_log <- rbind(new_popped, new_unpopped)
unlink(private$db_file)
file_create(private$db_file)
private$txtq_push(title = new_log$title, message = new_log$message)
new_head <- this_head + ext_total - ext_count
private$txtq_set_head(new_head)
}
),
public = list(
#' @description Initialize a txtq.
#' @param path Character string giving the file path of the queue.
#' The `txtq()` function creates a folder at this path to store
#' the messages.
#' @param use_lock_file Logical, whether to use a lock file
#' for blocking operations. Should only be `FALSE` in specialized
#' use cases with no parallel computing (for example, when a
#' `txtq` is used as a database and accessed by only one process.)
initialize = function(path, use_lock_file = TRUE) {
private$txtq_establish(path, use_lock_file)
},
#' @description Get the txtq path.
path = function() {
private$path_dir
},
#' @description Get the number of messages in the queue.
count = function() {
private$txtq_exclusive(private$txtq_count())
},
#' @description Get the number of messages in the database.
total = function() {
private$txtq_exclusive(private$txtq_get_total())
},
#' @description Detect whether the txtq is empty.
empty = function() {
private$txtq_exclusive(private$txtq_count()) < 1
},
#' @description List the whole database.
log = function() {
private$txtq_exclusive(private$txtq_log())
},
#' @description List messages.
#' @param n Number of messages.
list = function(n = -1) {
private$txtq_exclusive(private$txtq_list(n = n))
},
#' @description Pop messages.
#' @param n Number of messages.
pop = function(n = 1) {
private$txtq_exclusive(private$txtq_pop(n = n))
},
#' @description Push messages.
#' @param title Titles of the messages.
#' @param message Contents of the messages.
push = function(title, message) {
private$txtq_exclusive(
private$txtq_push(title = title, message = message)
)
invisible()
},
#' @description Reset the txtq.
reset = function() {
private$txtq_exclusive(private$txtq_reset())
invisible()
},
#' @description Clean the txtq.
clean = function() {
private$txtq_exclusive(private$txtq_clean())
invisible()
},
#' @description Destroy the txtq.
destroy = function() {
unlink(private$path_dir, recursive = TRUE, force = TRUE)
invisible()
},
#' @description Validate the txtq.
validate = function() {
private$txtq_validate()
invisible()
},
#' @description Import another txtq.
#' @param queue External txtq to import.
import = function(queue) {
private$txtq_exclusive(private$txtq_import(queue = queue))
invisible()
}
)
)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.