The async
package will implement a class called asyncIter
which is a kind of combination of promises and iterators. An asyncIter
is meant to be a useful abstraction for data that arrives intermittently -- requests on a network interface, data coming from logging sensors, result sets that arrive in pages, and so on.
The programming interface for an asyncIter
is a blend of both iterators and promises. Given an asyncIter
representing a connection, calls to nextElem(C)
or [nextOr(C, or)
] will return a series of [promise
] objects. As the asyncIterator produces values, the promise objects will be fulfilled in the order that they were produced.
Note that even though async implements the iterator interface, if you were to do something like as.list(conn)
, you would get an endless list of unfulfilled promises. (In practice, the asyncIter
should complain about too many outstanding promises and throw an error.) To work with an async iterator, you generally should wait for a promise to fulfill before calling nextElem()
again.
For example, let's say you have a stream of numbers and you want to add them all together as they come in, keeping a running total. One way to do this with an async iterator is:
numbers <- listenToNumbersStation() runningTotal <- function(conn) { total <- 0 increment <- function(val) { total <<- total+val then(nextElem(conn), increment) } then(nextElem(conn), increment) function() total # return an accessor function. }
This pulls one promise using nextElem
, and registers a handler using [then
]. When the first value is emitted from the connection, the handler increments the running total, then the handler re-registers for the next value.
This can also be written perhaps more intuitively using an async
block. In an async block you can use awaitNext()
or awaitNextOr()
to pull the next element and await it in one call.
runningTotal <- function(conn) { total <- 0 async(repeat { item <- awaitNextOr(conn, break) total <<- item + total }) function() total }
This covers how to consume async iterators.
One way to create an async iterator is with the asyncIter
constructor. The argument to the constructor is a function you provide, taking a channel
object. The channel object is a mirror of the asyncIter
object -- you can call method emit(channel, value)
which will fulfill promises held by its clients. Here, for instance is an asyncIter
that emits one random number between one and 100, every three to six seconds. It uses delay
and then
from the promises library to achieve this:
numbersStation <- function(generator=function() floor(runif(1,100)), interval=function() runif(1, 3, 6)) { asyncIter(lazy=FALSE, function(channel) { speak <- function(p) { n <- numbers() cat() emit(channel, n) then(delay(interval()), speak) } then(delay(interval()), speak) }) }
Here, upon receiving a "channel" object, we define a callback speak
and then register to have that callback called after a delay()
of a random interval()
. When our callback is called, we emit a random value to the channel. Finally we recursively registering the same callback after another delay, to repeat the process again.
In addition to emit(channel, value)
, the channel
class also has methods finish(channel)
to signal the normal end of iteration, and cancel(channel, err)
to signal an abnormal end.
Asynchronous iterators can also be created by the asyncGen
constructor. This combines the capabilities of async
and gen
; in an asyncGen
you can use await
, awaitNext
, yield
, and yieldFrom
. either to wait for or emit information. Here is the asyncGen
version of our numbers station:
numbersStation <- function(numbers=function() floor(runif(1,100)) interval=function() runif(1, 3, 6)) { asyncGen({ repeat { await(delay(interval())) yield(numbers()) } }) }
It's possible to call emit
on a channel object several times without anything calling nextElem
on the iterator end. Similarly, an asyncGen
can (if lazy=FALSE
) call yield
several times without anything awaiting. Therefore, asyncIter
maintains an outgoing queue of values emitted but not yet consumed.
By the reverse token, It is also possible for an asyncIter
to have several promises outstanding; for instance you can have one asyncIter
handing out tasks and N worker tasks that are each called nextElem
and are holding a promise.
You can turn an async iterator into an ersatz iterator with the awaitEach
. At "top level" this just has different methods for nextElem
that will pause R until each promise is fulfulled in turn.
In an async
block, iter <- awaitEach(receiver); nextElem(iter)
will have the same effect as await(nextElem(receiver))
.
All that is happening here is it is adding an s3 class, which has different nextElem
etc methods, and some special handling in for_cps
, nextElem_cps
, etc.
Note that when lazy=TRUE
, the asynciter
constructor waits to construct the channel and invoke your provided inner function until someone has requested an element using nextElem. When lazy=FALSE
, the inner function is run immediately, and can queue up messages before anything is listening. However, there is no difference after that first invocation.
In the case of an asyncGen
the lazy
parameter does more. When an
asyncGen
is created, a lazy one will do nothing until someone has
requested an outstanding promise. Then it will run to either the first
await
or the first yield
. If it reaches a yield
and there are no
outstanding promises, it will pause at the yield
and not continue. So a lazy asyncGen
will not build up an outgoing message queue.
When lazy=FALSE
, an asyncGen
does not pause after a yield
. It will store the yielded message in the outgoing queue and only pause when it reaches an await
for an unfulfilled promise.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.