This is an R package implementing generators, async blocks, and streams (collectively known as "coroutines.")
debugAsync(obj, TRUE) to pause before each call at R level. You can also use debugAsync(obj, internal=TRUE) to step through at the coroutine implementation level.switch supports goto() to transfer to a different branch.on.exit().gen(function(x, y) ...) returns a function that constructs generators.run(...) will execute a generator expression immediately and collect the results in a list.stream() coroutine backed by a channel class (asynchronous iterator).graphAsync(gen) (this requires you have Graphviz dot command installed on your system.)For more details see NEWS.md.
g <- gen({...}) is like a function that knows how to "pause." The
code in a generator runs until it hits a yield() call, then returns
that value. The next time you call the generator it picks up where it
left off and runs until the next yield.
From the outside a generator implements the iteror interface. You
extract each yielded value with nextOr(g, or), and you can use
generators anywhere you can use an iteror. The iteror class is
cross compatible with the iterators package.
Consider a sequence of numbers x[i], starting with an arbitrary x[1], where
each subsequent element is produced by applying the rule:
x[i] is even, then the next value will be x[i+1] = x[i]/2.x[i] is odd, the next value will be x[i+1] = 3*x[i]+1.An infinite sequence of numbers will continue form each staring point
x[1], but it is conjectured that all sequences will
eventually reach the loop 1, 4, 2, 1, 4, 2, .... The following
generator produces the Collatz sequence, starting from x, and
terminating when (or if?) the sequence reaches 1.
library(async) collatz <- gen(function(x) { yield(x) while (x > 1) { if (x %% 2 == 0) yield(x <- x / 2L) else yield(x <- 3L * x + 1) } })
The call to gen produces a generator. You can get values one at a
time with nextOr().
ctz <- collatz(12) nextOr(ctz, NA) nextOr(ctz, NA) nextOr(ctz, NA) nextOr(ctz, NA) nextOr(ctz, NA)
You can also use any other method that applies to an iterator, like as.list.
collatz(27L) |> as.list() |> as.numeric() #Try collatz(63728127L) |> as.list() |> as.numeric()...
For more examples, see the "Clapping Music" vignette.
Like gen, async({...}) takes a block of sequential code, which
runs until it reaches a call to await(p). The argument p should be
a promise, (as defined by the promises package, which
represents an unfinished external computation.) In turn, async()
constructs and returns a promise.
An async block runs until it reaches a call to await(p) and
pauses. When the promise p resolves, the async block continues.
If p rejects, that is evaluated like an error; you can use
await(p) into a tryCatch to handle rejections. When the async
block finishes, or throws an error, its promise resolves or rejects.
async doesn't handle running concurrent tasks by itself; it builds
on existing packages like future and later. The later package
lets you assign tasks to be done in the event loop, when R is idle.
Ring a bell 5 times at 10 second intervals (subject to R being idle):
```{R, eval=FALSE} async({ for (i in 1:5) { await(delay(10)) #delay() uses later::later() cat("Beep", i, "\n") beepr::beep(2) } })
#### Shiny apps [cranwhales]: https://github.com/crowding/cranwhales-await `async()` can be used in Shiny apps! For an example, here is a version of the ["Cranwhales" demo app using async/await.][cranwhales]. #### Web scraping `async()` allows you to naturally keep track of more than one concurrent process. The [web spider vignette][spider] shows how this can improve the speed of web scraping using concurrent connections. [spider]: https://crowding.github.io/async/articles/spider.html #### Background processing `async` can also work with `future` objects to run computations in parallel. Download, parse, and summarize a dataset in background processes like this: ```{R, eval=FALSE} library(future) library(dplyr) plan(multiprocess(workers=2)) url <- "http://analytics.globalsuperhypermegamart.com/2020/March.csv.gz" dest <- "March.csv.gz" dataset <- async({ if(!file.exists(dest)) { await(future({ cat("Downloading\n") download.file(url, dest) })) } data <- await(future({ cat("Parsing\n") read.csv(dest) |> mutate(time = hms::trunc_hms(time, 60*60)) |> group_by(time) |> summarize(sales=sum(amount)) })) }) # When the data is ready, plot it (in the main process:) async({ await(dataset) |> ggplot(aes(time, n)) + xlab("Time") + ylab("Sales") })
New in version 0.3 are asynchronous streams and channels. A channel is
an interface for asynchronous iteration; stream() lets you do things
with channels by writing code with await and yield. Here is an
example of channels being used to "walk and chew gum concurrently:"
walk <- stream({ for (i in 1:10) for (step in c("left", "right")) { yield(step) await(delay(0.5)) } }) chewGum <- stream(for (i in 1:12) { yield("chew") await(delay(0.8)) }) printEach <- async(function(st) { for (each in st) {cat(each, ", ", sep="")} cat("\n") }) all <- combine(walk, chewGum) |> printEach()
async:::wait_for_it()
A longer article will be forthcoming, but the basic gist is the async package transforms your given program into a state machine.
A coroutine expression is first scanned for uses of await, yield, for, break and other control flow calls. Those calls are swapped out for implementations local to the async package. Other R calls are wrapped in functions; all these functions are linked together in so that each function calls the next in sequence. The result is a graph of functions calling each other, each call corresponding to a step in the program.
As of async version 0.3 you can extract and visualize this graph with graphAsync(g). (You will need Graphviz dot installed to render these graphs.
```{R eval=FALSE} ctz <- collatz(23) graphAsync(ctz, type="svg") #creates a file "ctz.svg"
{width=75%}
[continuation]: https://en.wikipedia.org/wiki/Continuation
Since each step in the program's execution corresponds to a function call, when execution reaches a `yield`, the program's state is just the "next function" that would have been called (that is, a [continuation][continuation].) To pause and resume execution, a generator saves that "next function" until the next time `nextOr()` is called.
You can also enable single-stepping at the graph level by calling:
```{R eval=FALSE}
debugAsync(ctz, internal=TRUE)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.