async package {.smaller}

:::: {.columns} ::: {.column width="50%"}

```{R, fig.asp=1} library(qrcode) plot(qr_code("http://github.com/crowding/iterors"))

::: {.column width="50%"}

Provides *generators*, *async* blocks and *streams*.

* `gen()` /  `yield()`
* `async()`/ `await()`
* `stream()` / `awaitNext()`

(these let you write functions that can pause and resume where they left off)

:::
::::

## Generators {.smaller}

Generators allow you to write a computation that returns multiple values incrementally.

::::: {.incremental}
:::: {.columns}
::: {.column width="50%"}

```{R echo=TRUE}
library(async)

filteredLines <- gen({
  for (line in ireadLines("/var/log/syslog")) {
    if (grepl("avahi", line)) {
      yield(line)
    }
  }
})
* **Applications**
* Data too big to fit in memory at once
  * Scan this 100GB log file for ...
  * Compute a statistic over all 2^N permutations of...
* Data whose size you don't know ahead of time
* Data defined by a _process_

:::
::::
:::::

## Example: Hailstone sequences {.smaller}

:::: {.columns}
::: {.column width="50%"}
* Pick an arbitrary starting integer $N_0$.
* If $N_i$ is even, $N_{i+1} = N_i / 2$.
* If $N_i$ is odd, $N_{i+1} = 3N_i +1$.

**Collatz conjecture**: all sequences generated this way will eventually reach the loop 4, 2, 1, 4, 2, 1....
:::
::: {.column width="50%"}
::::: {.fragment}

```{R echo=TRUE}
library(async)

hailstoneSeq <- gen(function(n) {
  repeat {
    yield(n)
    if (n == 1) break
    if (n %% 2 == 0)
      n <- n / 2
    else
      n <- n * 3 + 1
  }
})

::::: :::::{.fragment} ```{R echo=TRUE} s <- hailstoneSeq(7) nextOr(s, NULL) nextOr(s, NULL) nextOr(s, NULL) as.numeric(s)

:::
::::

## Example: Hailstone sequences

```{R echo=TRUE}
S27 <- hailstoneSeq(27)
plot(as.numeric(S27))

iterors package {.smaller}

:::: {.columns} ::: {.column width="50%"}

Which starting value between 1 and 1000 produces the longest sequence?

:::::{.fragment} ```{R echo=TRUE} library(iterors) hs_len <- ( icount() |> iapply(hailstoneSeq) |> iapply(count) |> take(1000, mode="numeric")) which(hs_len == max(hs_len)) max(hs_len)

:::
::: {.column width="50%"}
:::::{.fragment}
Package `iterors` has many tools for working with iterators as collections.

```{R fig.asp=1, fig.width=3}
plot(qr_code("http://github.com/crowding/iterors"))

http://github.com/crowding/iterors ::::: ::: ::::

async {.smaller}

:::: {.columns} ::: {.column width="50%"}

Built on the promises package.

A promise represents a value that hasn't been computed yet. The promise API works in terms of callback functions:

processed_results <- promise(\(resolve, reject) {
  p_results <- async_request(url)
  then(p_results,
       onFulfilled = \(results) {
         p_processed <- async_process(results)
         then(p_processed,
              onFulfilled= \(processed) {
                resolve(processed)
              },
              onRejected = \(x) reject(x))
  }, onRejected = \(x) reject(x)
}

::: ::: {.column width="50%"} :::::{.fragment} async({...}) allows you to create and work with promises without writing a tangled mess of callbacks.

processed_results <- async({
  results <- await(async_request(url))
  await(async_process(results))
})

Inside of async(), await(pr) pauses until promise pr resolves, then continues. ::::: :::::{.incremental} Parallel processing (running worker tasks) Interacting with databases, web APIs * Reactive UI (Shiny apps) ::::: ::: ::::

Example: Web spider {.smaller}

:::: {.columns} ::: {.column width="60%"}

curl_is_active <- FALSE

curl_fetch_async <- function(url) {
  pr <- promise(function(resolve, reject) {
    curl_fetch_multi(url, done=resolve, fail=reject)
  })
  if (!curl_is_active) {
    curl_is_active <<- TRUE
    poll <- function() {
      multi_run(timeout = 0.01, poll = TRUE)
      if (length(multi_list()) == 0) {
        curl_is_active <<- FALSE
      } else {
        later(poll)
      }
    }
    later(poll)
  }
  pr
}

```{R echo=TRUE} start <- c("http://crowding.github.io/async") filter <- (url) grepl("crowding\.github\.io/", url) limit <- 100 known <- new.env() graph <- new.env() errors <- new.env()

spider_page <- async(function(url) { cat("Fetching", url, "\n") data <- await(curl_fetch_async(url), error = (x) errors[[url]] <<- x) cat("Processing", url, "\n") parse <- XML::htmlParse(rawToChar(data$content)) links <- XML::getHTMLLinks( parse, baseURL=url, relative=TRUE, externalOnly=TRUE) for (link in links) { if (length(promises) < limit && filter(link) && !exists(link, known)) { known[[link]] <<- fetch_page(link) } } data <- NULL graph[[url]] <<- links })

::: {.column width="40%"}

Using `curl` with `async`, multiple page downloads will happen concurrently.

:::
::::

## `stream` {.smaller}
:::::{.incremental}
:::: {.columns}
::: {.column width="50%"}

`channel` objects represent a _sequence_ of unrealized future values, like a combination of an Iterator and Promise.

```{.R}
stream({
  logfile <- curl_fetch_channel(logfile_url)
  repeat {
    chunk <- awaitNext(ch, or=break)
    events <- filterEvents(ch)
    for (i in filterEvents(chunk)) yield(i)
  }
})

stream({...}) constructs a channel object running the given code.

::: ::: {.column width="50%"}

::: :::: :::::

How? {.smaller}

:::: {.columns} ::: {.column width="50%"}

gen(repeat {
  yield(n)
  if (n == 1) break
  if (n %% 2 == 0)
    n <- n / 2
  else
    n <- n * 3 + 1
})

Input

::: :::{.column width="50%} :::::{.fragment}

async:::repeat_cps(
  async:::`{_cps`(
    async:::R(yield(n)),
    async::if_cps(
      async:::R(n == 1),
      async:::break_cps()),
    async:::R(
      if (n%%2 == 0)
        n <- n/2
      else n <- n * 3 + 1)))

Swap out repeat, if and other functions that have a yield underneath them ::::: ::: ::::

How? {.smaller}

::::{.columns} :::{.column width="50%"}

async:::repeat_cps(
  async:::`{_cps`(
    async:::R(yield(n)),
    async::if_cps(
      async:::R(n == 1),
      async:::break_cps()),
    async:::R(
      if (n%%2 == 0)
        n <- n/2
      else n <- n * 3 + 1)))

*_cps functions plug into each other to form a graph, constructing the mess of callbacks for you

To pause, the state machine just saves the next function in the graph to call later.

::: ::: {.column width="50%"}

```{R include=FALSE} graphAsync(hailstoneSeq(27), "hailstone", type="dot", handlers=TRUE)

```{dot}
//| file: hailstone.dot

::: ::::

Try it out: {.smaller}

::::{.columns} :::{.column width="50%"}

async is on CRAN:

```{R, fig.asp=1, fig.width=3} plot(qr_code("http://github.com/crowding/async"))

[http://github.com/crowding/async](http://github.com/crowding/async)

:::
:::{.column width="50%"}

`iterors` is on GitHub (CRAN soon):

```{R, fig.asp=1, fig.width=3}
plot(qr_code("http://github.com/crowding/iterors"))

http://github.com/crowding/iterors

::: ::::

Give it a try (and give me bug reports!)



crowding/generators documentation built on June 28, 2023, 6:14 a.m.