CRAN
status R-CMD-check pkgdown codecov

This is an R package implementing generators, async blocks, and streams (collectively known as "coroutines.")

New features in version 0.3

For more details see NEWS.md.

Generators

g <- gen({...}) is like a function that knows how to "pause." The code in a generator runs until it hits a yield() call, then returns that value. The next time you call the generator it picks up where it left off and runs until the next yield.

From the outside a generator implements the iteror interface. You extract each yielded value with nextOr(g, or), and you can use generators anywhere you can use an iteror. The iteror class is cross compatible with the iterators package.

Example: Collatz sequence

Consider a sequence of numbers x[i], starting with an arbitrary x[1], where each subsequent element is produced by applying the rule:

An infinite sequence of numbers will continue form each staring point x[1], but it is conjectured that all sequences will eventually reach the loop 1, 4, 2, 1, 4, 2, .... The following generator produces the Collatz sequence, starting from x, and terminating when (or if?) the sequence reaches 1.

library(async)
collatz <- gen(function(x) {
  yield(x)
  while (x > 1) {
    if (x %% 2 == 0)
      yield(x <- x / 2L)
    else yield(x <- 3L * x + 1)
  }
})

The call to gen produces a generator. You can get values one at a time with nextOr().

ctz <- collatz(12)
nextOr(ctz, NA)
nextOr(ctz, NA)
nextOr(ctz, NA)
nextOr(ctz, NA)
nextOr(ctz, NA)

You can also use any other method that applies to an iterator, like as.list.

collatz(27L) |> as.list() |> as.numeric()
#Try collatz(63728127L) |> as.list() |> as.numeric()...

For more examples, see the "Clapping Music" vignette.

Async/await

Like gen, async({...}) takes a block of sequential code, which runs until it reaches a call to await(p). The argument p should be a promise, (as defined by the promises package, which represents an unfinished external computation.) In turn, async() constructs and returns a promise.

An async block runs until it reaches a call to await(p) and pauses. When the promise p resolves, the async block continues. If p rejects, that is evaluated like an error; you can use await(p) into a tryCatch to handle rejections. When the async block finishes, or throws an error, its promise resolves or rejects.

Examples:

async doesn't handle running concurrent tasks by itself; it builds on existing packages like future and later. The later package lets you assign tasks to be done in the event loop, when R is idle.

Ring a bell 5 times at 10 second intervals (subject to R being idle):

```{R, eval=FALSE} async({ for (i in 1:5) { await(delay(10)) #delay() uses later::later() cat("Beep", i, "\n") beepr::beep(2) } })

#### Shiny apps

[cranwhales]: https://github.com/crowding/cranwhales-await
`async()` can be used in Shiny apps! For an example, here is a version
of the ["Cranwhales" demo app using async/await.][cranwhales].

#### Web scraping

`async()` allows you to naturally keep track of more than one concurrent process. The [web spider vignette][spider] shows how this can improve the speed of web scraping using concurrent connections.

[spider]: https://crowding.github.io/async/articles/spider.html

#### Background processing

`async` can also work with `future` objects to run computations in parallel.
Download, parse, and summarize a dataset in background processes like this:

```{R, eval=FALSE}
library(future)
library(dplyr)
plan(multiprocess(workers=2))

url <- "http://analytics.globalsuperhypermegamart.com/2020/March.csv.gz"
dest <- "March.csv.gz"

dataset <- async({
  if(!file.exists(dest)) {
    await(future({
      cat("Downloading\n")
      download.file(url, dest)
    }))
  }
  data <- await(future({
    cat("Parsing\n")
    read.csv(dest) |>
    mutate(time = hms::trunc_hms(time, 60*60)) |>
    group_by(time) |>
    summarize(sales=sum(amount))
  }))
})

# When the data is ready, plot it (in the main process:)
async({
  await(dataset) |>
  ggplot(aes(time, n)) +
    xlab("Time") +
    ylab("Sales")
})

Streams

New in version 0.3 are asynchronous streams and channels. A channel is an interface for asynchronous iteration; stream() lets you do things with channels by writing code with await and yield. Here is an example of channels being used to "walk and chew gum concurrently:"

walk <- stream({
  for (i in 1:10)
    for (step in c("left", "right")) {
      yield(step)
      await(delay(0.5))
    }
})

chewGum <- stream(for (i in 1:12) {
  yield("chew")
  await(delay(0.8))
})

printEach <- async(function(st) {
  for (each in st) {cat(each, ", ", sep="")}
  cat("\n")
})

all <- combine(walk, chewGum) |> printEach()
async:::wait_for_it()

How does this work anyway?

A longer article will be forthcoming, but the basic gist is the async package transforms your given program into a state machine.

A coroutine expression is first scanned for uses of await, yield, for, break and other control flow calls. Those calls are swapped out for implementations local to the async package. Other R calls are wrapped in functions; all these functions are linked together in so that each function calls the next in sequence. The result is a graph of functions calling each other, each call corresponding to a step in the program.

As of async version 0.3 you can extract and visualize this graph with graphAsync(g). (You will need Graphviz dot installed to render these graphs.

```{R eval=FALSE} ctz <- collatz(23) graphAsync(ctz, type="svg") #creates a file "ctz.svg"

![Graph of the Collatz generator.](ctz.svg){width=75%}

[continuation]: https://en.wikipedia.org/wiki/Continuation
Since each step in the program's execution corresponds to a function call, when execution reaches a `yield`, the program's state is just the "next function" that would have been called (that is, a [continuation][continuation].) To pause and resume execution, a generator saves that "next function" until the next time  `nextOr()` is called.

You can also enable single-stepping at the graph level by calling:

```{R eval=FALSE}
debugAsync(ctz, internal=TRUE)


crowding/generators documentation built on June 28, 2023, 6:14 a.m.