R/messages.R

Defines functions remove_failed_messages requeue_failed_messages list_failed_messages list_messages print.liteq_message nack ack try_consume consume publish message_lock_file make_message

Documented in ack consume list_failed_messages list_messages make_message nack publish remove_failed_messages requeue_failed_messages try_consume

#' Make a message object
#'
#' It creates the lock for the message as well.
#'
#' The message object contains the connection to the message lock. If the
#' worker crashes, then there will be no reference to the connection, and
#' the lock will be released. This is how we detect crashed workers.
#'
#' @param id Message id, integer, auto-generated.
#' @param title Title of message.
#' @param message The message itself.
#' @param db Main DB file.
#' @param queue Name of the queue.
#' @param lockdir Directory to create the message lock in.
#' @return message object
#'
#' @keywords internal

make_message <- function(id, title, message, db, queue, lockdir) {
  if (is.null(id)) return(NULL)
  dir.create(lockdir, recursive = TRUE, showWarnings = FALSE)
  lock <- message_lock_file(lockdir, queue, id)
  con <- db_connect(lock)
  db_execute(con, "CREATE TABLE foo (id INT)")
  db_execute(con, "BEGIN EXCLUSIVE")

  structure(
    list(
      id = id,
      title = title,
      message = message,
      db = db, queue = queue,
      lock = con
    ),
    class = "liteq_message"
  )
}

message_lock_file <- function(lockdir, queue, id) {
  file.path(lockdir, paste0(queue, "-", id, ".lock"))
}

#' Publish a message in a queue
#'
#' @param queue The queue object.
#' @param title The title of the message. It can be the empty string.
#' @param message The body of the message. It can be the empty string.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

publish <- function(queue, title = "", message = "") {
  assert_that(is_queue(queue))
  assert_that(is_string(title))
  assert_that(is_string(message))
  db_publish(queue$db, queue$name, title, message)
}

#' Consume a message from a queue
#'
#' Blocks and waits for a message if there isn't one to work on currently.
#'
#' @param queue The queue object.
#' @return A message.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

consume <- function(queue) {
  assert_that(is_queue(queue))
  msg <- db_consume(queue$db, queue$name)
  make_message(msg$msg$id, msg$msg$title, msg$msg$message, msg$db,
               msg$queue, msg$lockdir)
}

#' Consume a message if there is one available
#'
#' @param queue The queue object.
#' @return A message, or `NULL` if there is not message to work on.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

try_consume <- function(queue) {
  assert_that(is_queue(queue))
  msg <- db_try_consume(queue$db, queue$name)
  make_message(msg$msg$id, msg$msg$title, msg$msg$message, msg$db,
               msg$queue, msg$lockdir)
}

#' Acknowledge that the work on a message has finished successfully
#'
#' @param message The message object.
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

ack <- function(message) {
  assert_that(is_message(message))
  db_ack(message$db, message$queue, message$id, message$lock, TRUE)
}

#' Report that the work on a message has failed
#'
#' @param message The message object.
#' @seealso [liteq] for examples
#' @export

nack <- function(message) {
  assert_that(is_message(message))
  db_ack(message$db, message$queue, message$id, message$lock, FALSE)
}

#' @export

print.liteq_message <- function(x, ...) {
  cat("liteq message from queue ", sQuote(x$queue), ":\n", sep = "")
  msg_bytes <- nchar(x$message, type = "bytes")
  cat("  ", x$title, " (", msg_bytes, " B)\n", sep = "")
  invisible(x)
}

#' List all messages in a queue
#'
#' @param queue The queue object.
#' @return Data frame with columns: `id`, `title`, `status`.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

list_messages <- function(queue) {
  assert_that(is_queue(queue))
  db_list_messages(queue$db, queue$name)
}

#' List failed messages in a queue
#'
#' @param queue The queue object.
#' @return Data frame with columns: `id`, `title`, `status`.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

list_failed_messages <- function(queue) {
  assert_that(is_queue(queue))
  db_list_messages(queue$db, queue$name, failed = TRUE)
}

#' Requeue failed messages
#'
#' @param queue The queue object.
#' @param id Ids of the messages to requeue. If it is `NULL`, then all
#'   failed messages will be requeued.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

requeue_failed_messages <- function(queue, id = NULL) {
  assert_that(is_queue(queue))
  assert_that(is_message_ids_or_null(id))
  db_requeue_failed_messages(queue$db, queue$name, id)
}

#' Remove failed messages from the queue
#'
#' @param queue The queue object.
#' @param id Ids of the messages to requeue. If it is `NULL`, then all
#'   failed messages will be removed.
#'
#' @family liteq messages
#' @seealso [liteq] for examples
#' @export

remove_failed_messages <- function(queue, id = NULL) {
  assert_that(is_queue(queue))
  assert_that(is_message_ids_or_null(id))
  db_remove_failed_messages(queue$db, queue$name, id)
}
r-lib/liteq documentation built on Dec. 18, 2017, 9:12 a.m.