
Defines functions hash mfetch fetch insert delete delete_queue.character delete_queue.queue delete_queue peek.queue peek is_empty.mdb_txn is_empty.queue is_empty dequeue.queue dequeue enqueue.queue enqueue print.queue init_queue create_queue

Documented in create_queue delete_queue dequeue enqueue init_queue is_empty peek

#' Create a queue
#' Create a new queue along with necessary files
#' @param qfile the name of the queue
#' @param mapsize size of the map for LMDB database
#' @param ... other arguments to be passed to \code{mdb_env}
#' @details The queue will be created as a subdirectory named \code{qfile}
#' under the current working directory
#' @return an object of class \code{"queue"} (invisibly)
#' @importFrom thor mdb_env
#' @export

create_queue <- function(qfile, mapsize = 2^30, ...) {
        qdb <- mdb_env(qfile, lock = TRUE, subdir = TRUE,
                       create = TRUE, mapsize = mapsize, ...)
        txn.f <- function(txn) {
                insert(txn, "head", NULL)
                insert(txn, "tail", NULL)
        qdb$with_transaction(txn.f, write = TRUE)
        invisible(structure(list(queue = qdb,
                                 path = qfile),
                            class = "queue"))

#' Initialize a queue
#' Initialize an existing queue created by \code{create_queue}
#' @param qfile the name of the queue
#' @param mapsize size of the map for LMDB database
#' @param ... other arguments to be passed to \code{mdb_env}
#' @details \code{qfile} should be a subdirectory under the current working
#' directory
#' @return an object of class \code{"queue"}
#' @importFrom thor mdb_env
#' @export

init_queue <- function(qfile, mapsize = 2^30, ...) {
        qdb <- mdb_env(qfile, lock = TRUE, subdir = TRUE,
                       create = FALSE, mapsize = mapsize, ...)
        structure(list(queue = qdb, path = qfile),
                  class = "queue")

#' @export
print.queue <- function(x, ...) {
        cat(sprintf("<queue: %s>\n", basename(x$path)))

#' Add an Element
#' Add a new element on to the back of the queue
#' @param x a queue object
#' @param val an R object
#' @param ... arguments passed to other methods
#' @details Insert an R object into the queue at the tail
#' @return Nothing useful
#' @export
enqueue <- function(x, val, ...) {

#' @importFrom stats runif
#' @export
enqueue.queue <- function(x, val, ...) {
        qdb <- x$queue
        node <- list(value = val,
                     nextkey = NULL,
                     salt = runif(1))
        key <- hash(node)
        txn.f <- function(txn) {
                        insert(txn, "head", key)
                else {
                        ## Convert tail node to regular node
                        tailkey <- fetch(txn, "tail")
                        oldtail <- fetch(txn, tailkey)
                        oldtail$nextkey <- key
                        insert(txn, tailkey, oldtail)
                ## Insert new node and point tail to new node
                insert(txn, key, node)
                insert(txn, "tail", key)
        qdb$with_transaction(txn.f, write = TRUE)

#' Get the Next Queue Element
#' Return the next queue element and remove it from the queue
#' @param x a queue object
#' @param ... arguments passed to other methods
#' @details Return the head of the queue while also removing the element from
#' the queue
#' @return An R object representing the head of the queue
#' @export
dequeue <- function(x, ...) {

#' @export
dequeue.queue <- function(x, ...) {
        qdb <- x$queue
        txn.f <- function(txn) {
                        stop("queue is empty")
                h <- fetch(txn, "head")
                node <- fetch(txn, h)
                insert(txn, "head", node$nextkey)
                delete(txn, h)
        qdb$with_transaction(txn.f, write = TRUE)

#' Check if Queue is Empty
#' Check to see if the queue is empty
#' @param x a queue object
#' @param ... arguments passed to other methods
#' @return \code{TRUE} or \code{FALSE} depending on whether the queue is empty
#' or not
#' @export
is_empty <- function(x, ...) {

#' @export
is_empty.queue <- function(x, ...) {
        qdb <- x$queue
        val <- fetch(qdb, "head")

is_empty.mdb_txn <- function(x, ...) {
        val <- fetch(x, "head")

#' Get the next element of the queue
#' Return the next element of the queue
#' @param x a queue object
#' @param ... arguments passed to other methods
#' @return the value of the head of the queue
#' @note For \code{job_queue} objects this returns the next element of the input
#' queue
#' @export
peek <- function(x, ...) {

#' @export
peek.queue <- function(x, ...) {
        qdb <- x$queue
        txn.f <- function(txn) {
                key <- fetch(txn, "head")
                node <- fetch(txn, key)
                qdb$with_transaction(txn.f, write = FALSE)
        }, error = function(e) {
                stop("problem retrieving head value; queue is likely empty",
                     call. = FALSE)

#' Delete a Queue
#' Delete a Queue
#' @param x a queue object
#' @param ... other options passed to methods
#' @export
delete_queue <- function(x, ...) {

#' @export
delete_queue.queue <- function(x, ...) {

#' @export
delete_queue.character <- function(x, ...) {
        unlink(x, recursive = TRUE)

## Helper functions

delete <- function(obj, key) {

insert <- function(obj, key, value) {
        value_raw <- serialize(value, NULL)
        obj$put(key, value_raw)

fetch <- function(obj, key) {
        value_raw <- obj$get(key, as_raw = TRUE)

mfetch <- function(obj, key) {
        values_raw <- obj$mget(key, as_raw = TRUE)
        r <- lapply(values_raw, unserialize)
        names(r) <- key

#' @importFrom digest sha1
hash <- function(x) {
        sha1(x, digits = 14L, zapsmall = 7L)
rdpeng/queue documentation built on June 9, 2022, 11:27 a.m.