Async Iterators (design sketch)

The async package will implement a class called asyncIter which is a kind of combination of promises and iterators. An asyncIter is meant to be a useful abstraction for data that arrives intermittently -- requests on a network interface, data coming from logging sensors, result sets that arrive in pages, and so on.

The programming interface for an asyncIter is a blend of both iterators and promises. Given an asyncIter representing a connection, calls to nextElem(C) or [nextOr(C, or)] will return a series of [promise] objects. As the asyncIterator produces values, the promise objects will be fulfilled in the order that they were produced.

Note that even though async implements the iterator interface, if you were to do something like as.list(conn), you would get an endless list of unfulfilled promises. (In practice, the asyncIter should complain about too many outstanding promises and throw an error.) To work with an async iterator, you generally should wait for a promise to fulfill before calling nextElem() again.

For example, let's say you have a stream of numbers and you want to add them all together as they come in, keeping a running total. One way to do this with an async iterator is:

numbers <- listenToNumbersStation()

runningTotal <- function(conn) {
  total <- 0
  increment <- function(val) {
    total <<- total+val
    then(nextElem(conn), increment)
  }
  then(nextElem(conn), increment)

  function() total # return an accessor function.
}

This pulls one promise using nextElem, and registers a handler using [then]. When the first value is emitted from the connection, the handler increments the running total, then the handler re-registers for the next value.

This can also be written perhaps more intuitively using an async block. In an async block you can use awaitNext() or awaitNextOr() to pull the next element and await it in one call.

runningTotal <- function(conn) {
  total <- 0
  async(repeat {
    item <- awaitNextOr(conn, break)
    total <<- item + total
  })
  function() total
}

This covers how to consume async iterators.

Creating asynchronous iterators

One way to create an async iterator is with the asyncIter constructor. The argument to the constructor is a function you provide, taking a channel object. The channel object is a mirror of the asyncIter object -- you can call method emit(channel, value) which will fulfill promises held by its clients. Here, for instance is an asyncIter that emits one random number between one and 100, every three to six seconds. It uses delay and then from the promises library to achieve this:

numbersStation <- function(generator=function() floor(runif(1,100)),
                           interval=function() runif(1, 3, 6)) {
  asyncIter(lazy=FALSE, function(channel) {
    speak <- function(p) {
      n <- numbers()
      cat()
      emit(channel, n)
      then(delay(interval()), speak)
    }
    then(delay(interval()), speak)
  })
}

Here, upon receiving a "channel" object, we define a callback speak and then register to have that callback called after a delay() of a random interval(). When our callback is called, we emit a random value to the channel. Finally we recursively registering the same callback after another delay, to repeat the process again.

In addition to emit(channel, value), the channel class also has methods finish(channel) to signal the normal end of iteration, and cancel(channel, err) to signal an abnormal end.

Asynchronous iterators can also be created by the asyncGen constructor. This combines the capabilities of async and gen; in an asyncGen you can use await, awaitNext, yield, and yieldFrom. either to wait for or emit information. Here is the asyncGen version of our numbers station:

numbersStation <- function(numbers=function() floor(runif(1,100))
                           interval=function() runif(1, 3, 6)) {
  asyncGen({
    repeat {
      await(delay(interval()))
      yield(numbers())
    }
  })
}

Consuming and producing

Outgoing message and request queue

It's possible to call emit on a channel object several times without anything calling nextElem on the iterator end. Similarly, an asyncGen can (if lazy=FALSE) call yield several times without anything awaiting. Therefore, asyncIter maintains an outgoing queue of values emitted but not yet consumed.

By the reverse token, It is also possible for an asyncIter to have several promises outstanding; for instance you can have one asyncIter handing out tasks and N worker tasks that are each called nextElem and are holding a promise.

awaitEach

You can turn an async iterator into an ersatz iterator with the awaitEach. At "top level" this just has different methods for nextElem that will pause R until each promise is fulfulled in turn.

In an async block, iter <- awaitEach(receiver); nextElem(iter) will have the same effect as await(nextElem(receiver)).

All that is happening here is it is adding an s3 class, which has different nextElem etc methods, and some special handling in for_cps, nextElem_cps, etc.

lazy vs eager asyncIters

Note that when lazy=TRUE, the asynciter constructor waits to construct the channel and invoke your provided inner function until someone has requested an element using nextElem. When lazy=FALSE, the inner function is run immediately, and can queue up messages before anything is listening. However, there is no difference after that first invocation.

In the case of an asyncGen the lazy parameter does more. When an asyncGen is created, a lazy one will do nothing until someone has requested an outstanding promise. Then it will run to either the first await or the first yield. If it reaches a yield and there are no outstanding promises, it will pause at the yield and not continue. So a lazy asyncGen will not build up an outgoing message queue.

When lazy=FALSE, an asyncGen does not pause after a yield. It will store the yielded message in the outgoing queue and only pause when it reaches an await for an unfulfilled promise.



crowding/generators documentation built on June 28, 2023, 6:14 a.m.