#' @importFrom R6 R6Class
event_loop <- R6Class(
"event_loop",
public = list(
initialize = function()
el_init(self, private),
add_http = function(handle, callback, file = NULL, progress = NULL,
data = NULL)
el_add_http(self, private, handle, callback, file, progress, data),
http_setopt = function(total_con = NULL, host_con = NULL, multiplex = NULL)
el_http_setopt(self, private, total_con, host_con, multiplex),
add_process = function(conns, callback, data)
el_add_process(self, private, conns, callback, data),
add_r_process = function(conns, callback, data)
el_add_r_process(self, private, conns, callback, data),
add_pool_task = function(callback, data)
el_add_pool_task(self, private, callback, data),
add_delayed = function(delay, func, callback, rep = FALSE)
el_add_delayed(self, private, delay, func, callback, rep),
add_next_tick = function(func, callback, data = NULL)
el_add_next_tick(self, private, func, callback, data),
cancel = function(id)
el_cancel(self, private, id),
cancel_all = function()
el_cancel_all(self, private),
run = function(mode = c("default", "nowait", "once"))
el_run(self, private, mode = match.arg(mode)),
suspend = function()
el_suspend(self, private),
wakeup = function()
el_wakeup(self, private)
),
private = list(
create_task = function(callback, ..., id = NULL, type = "foobar")
el__create_task(self, private, callback, ..., id = id, type = type),
ensure_pool = function()
el__ensure_pool(self, private),
get_poll_timeout = function()
el__get_poll_timeout(self, private),
run_pending = function()
el__run_pending(self, private),
run_timers = function()
el__run_timers(self, private),
is_alive = function()
el__is_alive(self, private),
update_time = function()
el__update_time(self, private),
io_poll = function(timeout)
el__io_poll(self, private, timeout),
update_curl_data = function()
el__update_curl_data(self, private),
id = NULL,
time = Sys.time(),
stop_flag = FALSE,
tasks = list(),
timers = Sys.time()[numeric()],
pool = NULL,
curl_fdset = NULL, # return value of multi_fdset()
curl_poll = TRUE, # should we poll for curl sockets?
curl_timer = NULL, # call multi_run() before this
next_ticks = character(),
worker_pool = NULL,
http_opts = NULL
)
)
el_init <- function(self, private) {
private$id <- new_event_loop_id()
invisible(self)
}
el_add_http <- function(self, private, handle, callback, progress, file,
data) {
self; private; handle; callback; progress; outfile <- file; data
id <- private$create_task(callback, list(handle = handle, data = data),
type = "http")
private$ensure_pool()
if (!is.null(outfile)) cat("", file = outfile)
content <- NULL
curl::multi_add(
handle = handle,
pool = private$pool,
done = function(response) {
task <- private$tasks[[id]]
task$data$data$event_emitter$emit("end")
private$tasks[[id]] <- NULL
response$content <- do.call(c, as.list(content))
response$file <- outfile
task$callback(NULL, response)
},
data = function(bytes, ...) {
task <- private$tasks[[id]]
task$data$data$event_emitter$emit("data", bytes)
if (!is.null(outfile)) {
## R runs out of connections very quickly, especially because they
## are not removed until a gc(). However, calling gc() is
## expensive, so we only do it if we have to. This is a temporary
## solution until we can use our own connections, that are not
## so limited in their numbers.
con <- tryCatch(
file(outfile, open = "ab"),
error = function(e) { gc(); file(outfile, open = "ab") } # nocov
)
writeBin(bytes, con)
close(con)
} else {
content <<- c(content, list(bytes))
}
},
fail = function(error) {
task <- private$tasks[[id]]
private$tasks[[id]] <- NULL
error <- make_error(message = error)
class(error) <- unique(c("async_rejected", "async_http_error",
class(error)))
task$callback(error, NULL)
}
)
id
}
el_add_process <- function(self, private, conns, callback, data) {
self; private; conns; callback; data
data$conns <- conns
private$create_task(callback, data, type = "process")
}
el_add_r_process <- function(self, private, conns, callback, data) {
self; private; conns; callback; data
data$conns <- conns
private$create_task(callback, data, type = "r-process")
}
el_add_pool_task <- function(self, private, callback, data) {
self; private; callback; data
id <- private$create_task(callback, data, type = "pool-task")
if (is.null(async_env$worker_pool)) {
async_env$worker_pool <- worker_pool$new()
}
async_env$worker_pool$add_task(data$func, data$args, id, private$id)
id
}
el_add_delayed <- function(self, private, delay, func, callback, rep) {
force(self); force(private); force(delay); force(func); force(callback)
force(rep)
id <- private$create_task(
callback,
data = list(delay = delay, func = func, rep = rep),
type = "delayed"
)
# This has to be real time, because our event loop time might
# be very much in the past when his is called.
private$timers[id] <- Sys.time() + as.difftime(delay, units = "secs")
id
}
el_add_next_tick <- function(self, private, func, callback, data) {
force(self) ; force(private) ; force(callback); force(data)
data$func <- func
id <- private$create_task(callback, data = data, type = "nexttick")
private$next_ticks <- c(private$next_ticks, id)
}
el_cancel <- function(self, private, id) {
private$next_ticks <- setdiff(private$next_ticks, id)
private$timers <- private$timers[setdiff(names(private$timers), id)]
if (id %in% names(private$tasks) && private$tasks[[id]]$type == "http") {
curl::multi_cancel(private$tasks[[id]]$data$handle)
} else if (id %in% names(private$tasks) &&
private$tasks[[id]]$type %in% c("process", "r-process")) {
private$tasks[[id]]$data$process$kill()
} else if (id %in% names(private$tasks) &&
private$tasks[[id]]$type == "pool-task") {
async_env$worker_pool$cancel_task(id)
}
private$tasks[[id]] <- NULL
invisible(self)
}
el_cancel_all <- function(self, private) {
http <- curl::multi_list(pool = private$pool)
lapply(http, curl::multi_cancel)
private$next_ticks <- character()
private$timers <- Sys.time()[numeric()]
## Need to cancel pool tasks, these are interrupts for the workers
types <- vcapply(private$tasks, "[[", "type")
ids <- vcapply(private$tasks, "[[", "id")
for (id in ids[types == "pool-task"]) {
self$cancel(id)
}
private$tasks <- list()
invisible(self)
}
el_run <- function(self, private, mode) {
## This is closely modeled after the libuv event loop, on purpose,
## because some time we might switch to that.
alive <- private$is_alive()
if (! alive) private$update_time()
while (alive && !private$stop_flag) {
private$update_time()
private$update_curl_data()
private$run_timers()
ran_pending <- private$run_pending()
## private$run_idle()
## private$run_prepare()
timeout <- 0
if ((mode == "once" && !ran_pending) || mode == "default") {
timeout <- private$get_poll_timeout()
}
private$io_poll(timeout)
## private$run_check()
## private$run_closing_handles()
if (mode == "once") {
## If io_poll returned without doing anything, that means that
## we have some timers that are due, so run those.
## At this point we have surely made progress
private$update_time()
private$run_timers()
}
alive <- private$is_alive()
if (mode == "once" || mode == "nowait") break
}
private$stop_flag <- FALSE
alive
}
el_suspend <- function(self, private) {
## TODO
}
el_wakeup <- function(self, private) {
## TODO
}
el__run_pending <- function(self, private) {
next_ticks <- private$next_ticks
private$next_ticks <- character()
for (id in next_ticks) {
task <- private$tasks[[id]]
private$tasks[[id]] <- NULL
call_with_callback(task$data$func, task$callback,
info = task$data$error_info)
}
## Check for workers from the pool finished before, while another
## event loop was active
finished_pool <- FALSE
pool <- async_env$worker_pool
if (!is.null(pool)) {
done_pool <- pool$list_tasks(event_loop = private$id, status = "done")
finished_pool <- nrow(done_pool) > 0
for (tid in done_pool$id) {
task <- private$tasks[[tid]]
private$tasks[[tid]] <- NULL
res <- pool$get_result(tid)
err <- res$error
res <- res[c("result", "stdout", "stderr")]
task$callback(err, res)
}
}
length(next_ticks) > 0 || finished_pool
}
el__io_poll <- function(self, private, timeout) {
types <- vcapply(private$tasks, "[[", "type")
## The things we need to poll, and their types
## We put the result here as well
pollables <- data.frame(
stringsAsFactors = FALSE,
id = character(),
pollable = I(list()),
type = character(),
ready = character()
)
## HTTP.
if (private$curl_poll) {
curl_pollables <- data.frame(
stringsAsFactors = FALSE,
id = "curl",
pollable = I(list(processx::curl_fds(private$curl_fdset))),
type = "curl",
ready = "silent")
pollables <- rbind(pollables, curl_pollables)
}
## Processes
proc <- types %in% c("process", "r-process")
if (sum(proc)) {
conns <- unlist(lapply(
private$tasks[proc], function(t) t$data$conns),
recursive = FALSE)
proc_pollables <- data.frame(
stringsAsFactors = FALSE,
id = names(private$tasks)[proc],
pollable = I(conns),
type = types[proc],
ready = rep("silent", sum(proc)))
pollables <- rbind(pollables, proc_pollables)
}
## Pool
px_pool <- if (!is.null(async_env$worker_pool)) {
async_env$worker_pool$get_poll_connections()
}
if (length(px_pool)) {
pool_pollables <- data.frame(
stringsAsFactors = FALSE,
id = names(px_pool),
pollable = I(px_pool),
type = rep("pool", length(px_pool)),
ready = rep("silent", length(px_pool)))
pollables <- rbind(pollables, pool_pollables)
}
if (!is.null(private$curl_timer) && private$curl_timer <= private$time) {
curl::multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
private$curl_timer <- NULL
}
if (nrow(pollables)) {
## OK, ready to poll
pollables$ready <- unlist(processx::poll(pollables$pollable, timeout))
## Any HTTP?
if (private$curl_poll &&
pollables$ready[match("curl", pollables$type)] == "event") {
curl::multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
}
## Any processes
proc_ready <- pollables$type %in% c("process", "r-process") &
pollables$ready == "ready"
for (id in pollables$id[proc_ready]) {
p <- private$tasks[[id]]
private$tasks[[id]] <- NULL
## TODO: this should be async
p$data$process$wait(1000)
p$data$process$kill()
res <- list(
status = p$data$process$get_exit_status(),
stdout = read_all(p$data$stdout, p$data$encoding),
stderr = read_all(p$data$stderr, p$data$encoding),
timeout = FALSE
)
error <- FALSE
if (p$type == "r-process") {
res$result <- tryCatch({
p$data$process$get_result()
}, error = function(e) { error <<- TRUE; e })
}
unlink(c(p$data$stdout, p$data$stderr))
if (p$data$error_on_status && (error || res$status != 0)) {
err <- make_error("process exited with non-zero status")
err$data <- res
res <- NULL
} else {
err <- NULL
}
p$callback(err, res)
}
## Worker pool
pool_ready <- pollables$type == "pool" & pollables$ready == "ready"
if (sum(pool_ready)) {
pool <- async_env$worker_pool
done <- pool$notify_event(as.integer(pollables$id[pool_ready]),
event_loop = private$id)
mine <- intersect(done, names(private$tasks))
for (tid in mine) {
task <- private$tasks[[tid]]
private$tasks[[tid]] <- NULL
res <- pool$get_result(tid)
err <- res$error
res <- res[c("result", "stdout", "stderr")]
task$callback(err, res)
}
}
} else if (length(private$timers) || !is.null(private$curl_timer)) {
Sys.sleep(timeout / 1000)
}
}
el__create_task <- function(self, private, callback, data, ..., id, type) {
id <- id %||% get_uuid()
private$tasks[[id]] <- list(
type = type,
id = id,
callback = callback,
data = data,
error = NULL,
result = NULL
)
id
}
el__ensure_pool <- function(self, private) {
getopt <- function(nm) {
anm <- paste0("async_http_", nm)
if (!is.null(v <- getOption(anm))) return(v)
if (!is.na(v <- Sys.getenv(toupper(anm), NA_character_))) return(v)
NULL
}
if (is.null(private$pool)) {
private$http_opts <- list(
total_con = getopt("total_con") %||% 100,
host_con = getopt("host_con") %||% 6,
multiplex = getopt("multiplex") %||% TRUE
)
private$pool <- curl::new_pool(
total_con = private$http_opts$total_con,
host_con = private$http_opts$host_con,
multiplex = private$http_opts$multiplex
)
}
}
el_http_setopt <- function(self, private, total_con, host_con, multiplex) {
private$ensure_pool()
if (!is.null(total_con)) private$http_opts$total_con <- total_con
if (!is.null(host_con)) private$http_opts$host_con <- host_con
if (!is.null(multiplex)) private$http_opts$multiplex <- multiplex
curl::multi_set(
pool = private$pool,
total_con = private$http_opts$total_con,
host_con = private$http_opts$host_con,
multiplex = private$http_opts$multiplex
)
}
el__get_poll_timeout <- function(self, private) {
t <- if (length(private$next_ticks)) {
## TODO: can this happen at all? Probably not, but it does not hurt...
0 # nocov
} else {
max(0, min(Inf, private$timers - private$time))
}
if (!is.null(private$curl_timer)) {
t <- min(t, private$curl_timer - private$time)
}
t <- max(t, 0)
if (is.finite(t)) as.integer(t * 1000) else -1L
}
el__run_timers <- function(self, private) {
expired <- names(private$timers)[private$timers <= private$time]
expired <- expired[order(private$timers[expired])]
for (id in expired) {
task <- private$tasks[[id]]
if (private$tasks[[id]]$data$rep) {
## If it is repeated, then re-init
private$timers[id] <-
private$time + as.difftime(task$data$delay, units = "secs")
} else {
## Otherwise remove
private$tasks[[id]] <- NULL
private$timers <- private$timers[setdiff(names(private$timers), id)]
}
call_with_callback(task$data$func, task$callback)
}
}
el__is_alive <- function(self, private) {
length(private$tasks) > 0 ||
length(private$timers) > 0 ||
length(private$next_ticks) > 0
}
el__update_time <- function(self, private) {
private$time <- Sys.time()
}
el__update_curl_data <- function(self, private) {
private$curl_fdset <- curl::multi_fdset(private$pool)
num_fds <- length(unique(unlist(private$curl_fdset[1:3])))
private$curl_poll <- num_fds > 0
private$curl_timer <- if ((t <- private$curl_fdset$timeout) != -1) {
private$time + as.difftime(t / 1000.0, units = "secs")
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.