txtq: Create a message queue.

Description Usage Arguments NFS Examples

View source: R/txtq.R

Description

See the README at https://github.com/wlandau/txtq and the examples in this help file for instructions.

Usage

1
txtq(path, use_lock_file = TRUE)

Arguments

path

Character string giving the file path of the queue. The txtq() function creates a folder at this path to store the messages.

use_lock_file

Logical, whether to use a lock file for blocking operations. Should only be FALSE in specialized use cases with no parallel computing (for example, when a txtq is used as a database and accessed by only one process.)

NFS

As an interprocess communication tool, txtq relies on the filelock package to prevent race conditions. Unfortunately, filelock cannot prevent race conditions on network file systems (NFS), which means neither can txtq. In other words, on certain common kinds of clusters, txtq cannot reliably manage interprocess communication for processes on different computers. However, it can still serve as a low-tech replacement for a simple non-threadsafe database.

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
  path <- tempfile() # Define a path to your queue.
  q <- txtq(path) # Create a new queue or recover an existing one.
  q$validate() # Check if the queue is corrupted.
  list.files(q$path()) # The queue lives in this folder.
  q$list() # You have not pushed any messages yet.
  # Let's say two parallel processes (A and B) are sharing this queue.
  # Process A sends Process B some messages.
  # You can only send character vectors.
  q$push(title = "Hello", message = "process B.")
  q$push(
    title = c("Calculate", "Calculate"),
    message = c("sqrt(4)", "sqrt(16)")
  )
  q$push(title = "Send back", message = "the sum.")
  # See your queued messages.
  # The `time` is a formatted character string from `Sys.time()`
  # indicating when the message was pushed.
  q$list()
  q$count() # Number of messages in the queue.
  q$total() # Number of messages that were ever queued.
  q$empty()
  # Now, let's assume process B comes online. It can consume
  # some messages, locking the queue so process A does not
  # mess up the data.
  q$pop(2) # Return and remove the first messages that were added.
  # With those messages popped, we are farther along in the queue.
  q$list()
  q$count() # Number of messages in the queue.
  q$list(1) # You can specify the number of messages to list.
  # But you still have a log of all the messages that were ever pushed.
  q$log()
  q$total() # Number of messages that were ever queued.
  # q$pop() with no arguments just pops one message.
  # Call pop(-1) to pop all the messages at once.
  q$pop()
  # There are more instructions.
  q$pop()
  # Let's say Process B follows the instructions and sends
  # the results back to Process A.
  q$push(title = "Results", message = as.character(sqrt(4) + sqrt(16)))
  # Process A now has access to the results.
  q$pop()
  # Clean out the popped messages
  # so the database file does not grow too large.
  q$push(title = "not", message = "popped")
  q$count()
  q$total()
  q$list()
  q$log()
  q$clean()
  q$count()
  q$total()
  q$list()
  q$log()
  # Optionally remove all messages from the queue.
  q$reset()
  q$count()
  q$total()
  q$list()
  q$log()
  # Destroy the queue's files altogether.
  q$destroy()
  # This whole time, the queue was locked when either Process A
  # or Process B accessed it. That way, the data stays correct
  # no matter who is accessing/modifying the queue and when.
  #
  # You can import a `txtq` into another `txtq`.
  # The unpopped messages are grouped together
  # and sorted by timestamp.
  # Same goes for the popped messages.
  q_from <- txtq(tempfile())
  q_to <- txtq(tempfile())
  q_from$push(title = "from", message = "popped")
  q_from$push(title = "from", message = "unpopped")
  q_to$push(title = "to", message = "popped")
  q_to$push(title = "to", message = "unpopped")
  q_from$pop()
  q_to$pop()
  q_to$import(q_from)
  q_to$list()
  q_to$log()

wlandau/txtq documentation built on March 29, 2021, 5:18 p.m.