async
package {.smaller}:::: {.columns} ::: {.column width="50%"}
```{R, fig.asp=1} library(qrcode) plot(qr_code("http://github.com/crowding/iterors"))
::: {.column width="50%"} Provides *generators*, *async* blocks and *streams*. * `gen()` / `yield()` * `async()`/ `await()` * `stream()` / `awaitNext()` (these let you write functions that can pause and resume where they left off) ::: :::: ## Generators {.smaller} Generators allow you to write a computation that returns multiple values incrementally. ::::: {.incremental} :::: {.columns} ::: {.column width="50%"} ```{R echo=TRUE} library(async) filteredLines <- gen({ for (line in ireadLines("/var/log/syslog")) { if (grepl("avahi", line)) { yield(line) } } })
gen
: yield(x)
returns x and pauses.gen
: nextElem(filteredLines)
resumes and runs the generator until the next yield
.
:::
::: {.column width="50%"}
::::::{.fragment}
```{R echo=TRUE}
nextOr(filteredLines)
nextOr(filteredLines)* **Applications** * Data too big to fit in memory at once * Scan this 100GB log file for ... * Compute a statistic over all 2^N permutations of... * Data whose size you don't know ahead of time * Data defined by a _process_ ::: :::: ::::: ## Example: Hailstone sequences {.smaller} :::: {.columns} ::: {.column width="50%"} * Pick an arbitrary starting integer $N_0$. * If $N_i$ is even, $N_{i+1} = N_i / 2$. * If $N_i$ is odd, $N_{i+1} = 3N_i +1$. **Collatz conjecture**: all sequences generated this way will eventually reach the loop 4, 2, 1, 4, 2, 1.... ::: ::: {.column width="50%"} ::::: {.fragment} ```{R echo=TRUE} library(async) hailstoneSeq <- gen(function(n) { repeat { yield(n) if (n == 1) break if (n %% 2 == 0) n <- n / 2 else n <- n * 3 + 1 } })
::::: :::::{.fragment} ```{R echo=TRUE} s <- hailstoneSeq(7) nextOr(s, NULL) nextOr(s, NULL) nextOr(s, NULL) as.numeric(s)
::: :::: ## Example: Hailstone sequences ```{R echo=TRUE} S27 <- hailstoneSeq(27) plot(as.numeric(S27))
iterors
package {.smaller}:::: {.columns} ::: {.column width="50%"}
Which starting value between 1 and 1000 produces the longest sequence?
:::::{.fragment} ```{R echo=TRUE} library(iterors) hs_len <- ( icount() |> iapply(hailstoneSeq) |> iapply(count) |> take(1000, mode="numeric")) which(hs_len == max(hs_len)) max(hs_len)
::: ::: {.column width="50%"} :::::{.fragment} Package `iterors` has many tools for working with iterators as collections. ```{R fig.asp=1, fig.width=3} plot(qr_code("http://github.com/crowding/iterors"))
http://github.com/crowding/iterors ::::: ::: ::::
async
{.smaller}:::: {.columns} ::: {.column width="50%"}
Built on the promises
package.
A promise
represents a value that hasn't been computed yet. The
promise
API works in terms of callback functions:
processed_results <- promise(\(resolve, reject) { p_results <- async_request(url) then(p_results, onFulfilled = \(results) { p_processed <- async_process(results) then(p_processed, onFulfilled= \(processed) { resolve(processed) }, onRejected = \(x) reject(x)) }, onRejected = \(x) reject(x) }
:::
::: {.column width="50%"}
:::::{.fragment}
async({...})
allows you to create and work with promises without writing a tangled mess of callbacks.
processed_results <- async({ results <- await(async_request(url)) await(async_process(results)) })
Inside of async()
, await(pr)
pauses until promise pr
resolves, then continues.
:::::
:::::{.incremental}
Parallel processing (running worker tasks)
Interacting with databases, web APIs
* Reactive UI (Shiny apps)
:::::
:::
::::
:::: {.columns} ::: {.column width="60%"}
curl_is_active <- FALSE curl_fetch_async <- function(url) { pr <- promise(function(resolve, reject) { curl_fetch_multi(url, done=resolve, fail=reject) }) if (!curl_is_active) { curl_is_active <<- TRUE poll <- function() { multi_run(timeout = 0.01, poll = TRUE) if (length(multi_list()) == 0) { curl_is_active <<- FALSE } else { later(poll) } } later(poll) } pr }
```{R echo=TRUE} start <- c("http://crowding.github.io/async") filter <- (url) grepl("crowding\.github\.io/", url) limit <- 100 known <- new.env() graph <- new.env() errors <- new.env()
spider_page <- async(function(url) { cat("Fetching", url, "\n") data <- await(curl_fetch_async(url), error = (x) errors[[url]] <<- x) cat("Processing", url, "\n") parse <- XML::htmlParse(rawToChar(data$content)) links <- XML::getHTMLLinks( parse, baseURL=url, relative=TRUE, externalOnly=TRUE) for (link in links) { if (length(promises) < limit && filter(link) && !exists(link, known)) { known[[link]] <<- fetch_page(link) } } data <- NULL graph[[url]] <<- links })
::: {.column width="40%"} Using `curl` with `async`, multiple page downloads will happen concurrently. ::: :::: ## `stream` {.smaller} :::::{.incremental} :::: {.columns} ::: {.column width="50%"} `channel` objects represent a _sequence_ of unrealized future values, like a combination of an Iterator and Promise. ```{.R} stream({ logfile <- curl_fetch_channel(logfile_url) repeat { chunk <- awaitNext(ch, or=break) events <- filterEvents(ch) for (i in filterEvents(chunk)) yield(i) } })
stream({...})
constructs a channel
object running the given code.
awaitNext(ch)
fetches the next value from another channel.yield(val)
emits a value.::: ::: {.column width="50%"}
::: :::: :::::
:::: {.columns} ::: {.column width="50%"}
gen(repeat { yield(n) if (n == 1) break if (n %% 2 == 0) n <- n / 2 else n <- n * 3 + 1 })
Input
::: :::{.column width="50%} :::::{.fragment}
async:::repeat_cps( async:::`{_cps`( async:::R(yield(n)), async::if_cps( async:::R(n == 1), async:::break_cps()), async:::R( if (n%%2 == 0) n <- n/2 else n <- n * 3 + 1)))
Swap out repeat
, if
and other functions that have a yield
underneath them
:::::
:::
::::
::::{.columns} :::{.column width="50%"}
async:::repeat_cps( async:::`{_cps`( async:::R(yield(n)), async::if_cps( async:::R(n == 1), async:::break_cps()), async:::R( if (n%%2 == 0) n <- n/2 else n <- n * 3 + 1)))
*_cps
functions plug into each other to form a graph,
constructing the mess of callbacks for you
To pause, the state machine just saves the next function in the graph to call later.
::: ::: {.column width="50%"}
```{R include=FALSE} graphAsync(hailstoneSeq(27), "hailstone", type="dot", handlers=TRUE)
```{dot} //| file: hailstone.dot
::: ::::
::::{.columns} :::{.column width="50%"}
async
is on CRAN:
```{R, fig.asp=1, fig.width=3} plot(qr_code("http://github.com/crowding/async"))
[http://github.com/crowding/async](http://github.com/crowding/async) ::: :::{.column width="50%"} `iterors` is on GitHub (CRAN soon): ```{R, fig.asp=1, fig.width=3} plot(qr_code("http://github.com/crowding/iterors"))
http://github.com/crowding/iterors
::: ::::
Give it a try (and give me bug reports!)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.