Nothing
## ----- setup
ensureExists <- function(packages){
installed <- library()$results[,1]
needs.install <- setdiff(packages, installed)
if(length(needs.install) > 0) {
install.packages(needs.install)
}
}
if (exists("clust") && !is.null(clust)) {
stopCluster(clust)
clust <- NULL
}
ensureExists(c(
"knitr",
"rmarkdown",
"purrr",
"dplyr",
"magrittr",
"tibble",
"printr",
"ggplot2",
"nycflights13",
"parallel",
"ssh.utils",
"tools",
"msgpack",
"jsonlite",
"msgpackR",
"rjson",
"RJSONIO",
"yaml",
"broom"
))
library(knitr)
library(rmarkdown)
library(purrr)
library(dplyr)
library(magrittr)
library(tibble)
library(printr)
library(ggplot2)
## ---- dataset
library(nycflights13)
dataset <- as.list(as.environment("package:nycflights13"))
subsample <- function(dataset, rate) {
lapply(dataset, function(df) {
df[1:(1 + round(nrow(df) * rate)), ]
})
}
onerow <- subsample(dataset, 0)
## ---- definitions
remoteHost <- "paradoxus.local"
port <- 42170
bufferBytes <- function(f) function(con, chunk = 65536L) {
buf <- rawConnection(raw(0), open="w")
repeat {
result <- readBin(con = con, what = "raw", n = chunk)
if (length(result) == 0) break
writeBin(result, buf)
}
f(rawConnectionValue(buf))
}
bufferBytes2 <- function(f) function(con, chunk = 65536L) {
buf <- msgpack:::catenator(raw(0)) # this is faster
repeat {
result <- readBin(con = con, what = "raw", n = chunk)
if (length(result) == 0) break
buf(result)
}
f(buf(action="read"))
}
bufferRawConn <- function(f) function(con, chunk = 65536L) {
buf <- msgpack:::catenator(raw(0)) # this is faster
repeat {
result <- readBin(con = con, what = "raw", n = chunk)
if (length(result) == 0) break
buf(result)
}
f(rawConnection(buf(action="read"), open="r"))
}
bufferLines <- function(reader) function(con) {
lines <- msgpack:::catenator(character(0))
repeat {
l <- readLines(con)
if (length(l) == 0) break()
lines(l)
}
buf <- textConnection(lines(action="read"))
on.exit(close(buf))
reader(buf)
}
forked <- function(ifParent, ifChild, catch=TRUE) {
# Fork the R process. In the parent process call ifParent(), in
# the child process call ifChild(). Return a list of return value of
# both functions, or error messages.
other <- parallel:::mcfork()
if (inherits(other, "masterProcess")) {
# we are child process
if (catch) {
tryCatch({
result <- ifChild(x)
parallel:::mcexit(0, send = result)
},
error =
function(e) {
parallel:::mcexit(1, send = list(error=e, calls=sys.calls()))
}
)
} else {
result <- ifChild(x)
parallel:::mcexit(0, send = result)
}
} else {
# we are master process
if (catch) {
mine <- tryCatch(ifParent(x),
error = if (catch) function(e) list(error=e, calls = sys.calls()) else NULL)
} else {
mine <- ifParent(x)
}
theirs <- tryCatch(
unserialize(parallel:::readChild(other)),
error = if(catch) function(e) list(error=e, calls=sys.calls()) else NULL)
child_pids <- vapply(parallel:::children(), function(x) x$pid, 0)
if (other$pid %in% child_pids) {
warning("Killing child process ", deparse(other))
parallel:::mckill(other, tools::SIGTERM)
}
c(theirs, mine)
}
}
return_result <- FALSE
# Function accepts a set of sys.time readings and computes timings
times <- function(start.write, end.write,
start.read, end.read,
bytes, result,
start.parent, end.parent, ...) {
c(list(extra=list(list(...))),
if (missing(end.read) || missing(end.write))
list()
else
c(read.user = end.read[["user.self"]] - start.read[["user.self"]],
read.sys = end.read[["sys.self"]] - start.read[["sys.self"]],
read.elapsed = end.read[["elapsed"]] - start.read[["elapsed"]],
write.user = end.write[["user.self"]] - start.write[["user.self"]],
write.sys = end.write[["sys.self"]] - start.write[["sys.self"]],
write.elapsed = end.write[["elapsed"]] - start.write[["elapsed"]],
total.user =
( end.read[["user.self"]] + end.write[["user.self"]]
- start.write[["user.self"]] - start.read[["user.self"]]),
total.sys =
( end.read[["sys.self"]] + end.write[["sys.self"]]
- start.write[["sys.self"]] - start.read[["sys.self"]]),
total.elapsed = max(end.read[["elapsed"]] - start.read[["elapsed"]],
end.write[["elapsed"]] - start.write[["elapsed"]])),
bytes = if (missing(bytes)) list() else bytes,
result = if (missing(result) || !return_result) list() else list(result),
parent = if (missing(start.parent)) list()
else c(user = (end.parent[["user.self"]] - start.parent[["user.self"]]),
sys = (end.parent[["sys.self"]] - start.parent[["sys.self"]]),
elapsed = (end.parent[["elapsed"]] - start.parent[["elapsed"]])))
}
bytes <- function(x) {
switch(mode(x),
raw = {
length(x)
},
character = {
length(x) + sum(nchar(x))
})
}
timeConvert <- function(data,
from = unserialize,
to = function(data) serialize(data,NULL),
wrap = identity, ...) {
force(data)
start.write <- proc.time()
enc <- to(data)
end.write <- proc.time()
as.read <- from(enc)
end.read <- proc.time()
bytes <- bytes(enc)
times(start.write, end.write,
end.write, end.read,
bytes(enc), as.read)
}
timeConnection <- function(..., raw = TRUE) {
if (raw) {
timeRawConnection(...)
} else {
timeTextConnection(...)
}
}
timeRawConnection <- function(data,
reader = unserialize,
writer = serialize,
wrap = identity, ...) {
force(data)
conn <- wrap(rawConnection(raw(0), open="wb"))
on.exit(close(conn), add=TRUE)
start.write <- proc.time()
writer(data, conn)
end.write <- proc.time()
bytes <- rawConnectionValue(conn)
conn2 <- wrap(rawConnection(bytes, open="rb"))
on.exit(close(conn2), add=TRUE)
start.read <- proc.time()
as.read <- reader(conn2)
end.read <- proc.time()
times(start.write, end.write,
end.write, end.read,
length(bytes), as.read)
}
timeTextConnection <- function(data,
reader = function(x) source(x, TRUE),
writer = function(x, conn) dump("x", conn),
wrap = identity, ...) {
force(data)
theText <- character(0)
conn <- textConnection(NULL, open="w")
on.exit(close(conn), add=TRUE)
start.write <- proc.time()
writer(data, conn)
end.write <- proc.time()
theText <- textConnectionValue(conn)
nbytes <- bytes(theText)
conn2 <- wrap(textConnection(theText, open="r"))
on.exit(close(conn2), add=TRUE)
start.read <- proc.time()
as.read <- reader(conn2)
end.read <- proc.time()
times(start.write, end.write,
end.write, end.read,
nbytes, as.read)
}
timeFileIO <- function(data,
reader = unserialize,
writer = serialize,
raw = TRUE,
wrap = identity, ...) {
force(data)
fnam <- tempfile()
on.exit(unlink(fnam, force=TRUE))
con <- file(fnam, open=paste0("w", if(raw) "b" else ""), raw=raw)
start.write <- proc.time()
writer(data, con)
nbytes <- seek(con)
close(con)
end.write <- proc.time()
con <- wrap(file(fnam, open=paste0("r", if(raw) "b" else ""), raw=raw))
on.exit(close(con), add=TRUE)
start.read <- proc.time()
as.read <- reader(con)
end.read <- proc.time()
times(start.write, end.write,
start.read, end.read,
nbytes, as.read)
}
timeSocketTransfer <- function(data,
reader = unserialize,
writer = serialize,
wrap = identity,
raw = TRUE,
catch = FALSE, ...) {
force(data)
doRead <- function(other) {
conn <- wrap(socketConnection(port = port,
server = TRUE,
blocking = TRUE,
open = paste0("r", if(raw) "b" else "")))
on.exit(close(conn))
start.read <- proc.time()
as.read <- reader(conn)
end.read <- proc.time()
list(start.read = start.read,
end.read = end.read)
}
doWrite <- function(other) {
Sys.sleep(0.5)
conn <- wrap(socketConnection(port = port, server = FALSE,
blocking = TRUE,
open = paste0("w", if(raw) "b" else "")))
start.write <- proc.time()
on.exit(close(conn))
writer(data, conn)
flush(conn)
end.write <- proc.time()
list(start.write = start.write,
end.write = end.write, bytes=NA)
}
start.parent <- proc.time()
results <- forked(ifChild = doWrite, ifParent = doRead, catch = catch)
end.parent <- proc.time()
do.call(times, c(results,
list(start.parent = start.parent, end.parent = end.parent)),
quote=TRUE)
}
timeFifoTransfer <- function(data,
reader = unserialize,
writer = serialize,
wrap = identity,
catch = FALSE, ...) {
force(data)
fnam <- tempfile()
on.exit(unlink(fnam, force = TRUE))
system(paste("mkfifo", fnam))
doRead <- function(other) {
Sys.sleep(0.5)
conn <- wrap(fifo(fnam, open = "rb", blocking = TRUE))
on.exit({
close(conn)
})
start.read <- proc.time()
as.read <- reader(conn)
end.read <- proc.time()
list(start.read = start.read,
end.read = end.read)
}
doWrite <- function(other) {
conn <- fifo(fnam, open = "wb", blocking = TRUE)
on.exit({
close(conn)
})
start.write <- proc.time()
writer(data, conn)
flush(conn)
end.write <- proc.time()
list(start.write = start.write,
end.write = end.write,
bytes = NA)
}
start.parent <- proc.time()
results <- forked(ifChild = doWrite, ifParent = doRead, catch = catch)
end.parent <- proc.time()
do.call(times, c(results,
list(start.parent = start.parent,
end.parent = end.parent)),
quote=TRUE)
}
clust <<- NULL
startRemote <- function() {
message("starting remote")
ssh.utils::run.remote("killall R", remoteHost)
clust <- makePSOCKcluster(nnodes=1, c(remoteHost), rscript = "Rscript")
rtmp <- clusterCall(clust, tempdir)
message("rtmp is ", rtmp)
message("getwd is ", getwd())
ssh.utils::cp.remote("", "../inst/benchmarking.R",
remoteHost, paste0(rtmp, "/benchmarking.R"))
clusterCall(clust, options, repos = getOption("repos"))
parallel::clusterCall(clust, source, paste0(rtmp, "/benchmarking.R"))
clust <<- clust
message("started remote")
}
doRemoteWrite <- function(data, host, port, wrap, raw, writer) {
tryCatch({
Sys.sleep(1)
conn <- wrap(socketConnection(host = host,
port = port,
server = FALSE,
blocking = TRUE,
open = paste0("w", if(raw) "b" else "")))
start.write <- proc.time()
on.exit(close(conn))
writer(data, conn)
flush(conn)
end.write <- proc.time()
list(start.write = start.write,
end.write = end.write, bytes=NA)
},
error = function(e) list(error=e, calls=sys.calls()))
}
doRemoteRead <- function(port, wrap, raw, reader) {
# place logging statements in here...
tryCatch({
sock <- socketConnection(port = port,
server = TRUE,
blocking = TRUE,
open = paste0("r", if(raw) "b" else ""),
timeout = 60)
conn <- wrap(sock)
on.exit(close(conn))
start.read <- proc.time()
as.read <- reader(conn)
end.read <- proc.time()
list(start.read = start.read,
end.read = end.read)
},
error = function(e) list(error=e, calls=sys.calls())
)
}
timeRemoteTransfer <- function(data,
reader = bufferBytes(unserialize),
writer = serialize,
wrap = identity,
raw = TRUE,
...) {
force(data)
if (is.null(clust)) {
startRemote()
}
parallel:::sendCall(clust[[1]],
doRemoteRead,
list(port, wrap, raw, reader))
local_results <- tryCatch(
doRemoteWrite(data, remoteHost, port, wrap, raw, writer),
error = function(e) list(error = e, calls = sys.calls()))
remote_results <- parallel:::recvResult(clust[[1]])
do.call(times, c(local_results, remote_results), quote=TRUE)
}
timeCurve <- function(dataset,
method,
timeout = 60,
start = 0.01,
max = 1, ...
) {
results <- data_frame()
current <- start
if (missing(method)) {
message("method missing???")
return(data.frame())
}
while (current <= max) {
message(paste0("size = ", current))
data <- subsample(dataset, current)
result <- method(data, ...)
results <- bind_rows(results, as_tibble(c(size = current, result)))
if ("total.elapsed" %in% names(result)) {
if (result$total.elapsed > timeout) break() else NULL
} else {
break()
}
if (current == max) break()
current = min(current * sqrt(2), max)
}
results
}
arg_df <- function(tests) (tests
# produces an arg data frame: the labels in columns and the argument
# data structure in column "args"
%>% map(names)
%>% cross_df()
%>% pmap_dfr(function(...) {
labels <- list(...)
arglist <- pmap(list(labels, names(labels)),
function(label, column) list(tests[[column]][[label]]))
c(labels, args = list(list(arglist)))
})
%>% mutate(args = (args # unlist the args twice
%>% map(.
%>% unlist(recursive = FALSE, use.names = FALSE)
%>% unlist(recursive = FALSE, use.names = TRUE))))
)
`%*%` <- intersect
run_tests <- function(arg_df) {
pmap_dfr(arg_df, function(..., args) {
labels <- list(...)
message(paste0(collapse = "\n", deparse(labels, control=c())))
# what I want is a bind syntax like
## bind({
## list(strategy = ?fun, ??args) <- args
## fun(!!args)
## })
fun <- args$strategy
args <- args[names(args) != "strategy"]
the_env <- list2env(args, parent = environment())
# this dance is to avoid calling do.call with a giant unnamed dataset
# (which causes R to spin several minutes on writing a traceback)
call <- as.call(c(quote(fun),
structure(map(names(args), as.name),
names = names(args))))
results <- eval(call, the_env)
as_data_frame(c(labels, results))
})
}
store <- function(data, dataset) {
# key on all character columns the two tables hav in common
existing.keys <- names(benchmarks)[map_lgl(benchmarks, is.character)]
new.keys <- names(data)[map_lgl(data, is.character)]
keys <- intersect(existing.keys, new.keys)
message(paste("Replacing on keys:", paste0("\"", keys, "\"", collapse=", ")))
if (length(keys) > 0) {
dataset <- (dataset # update benchmarks
%>% anti_join(data, by=keys)
%>% bind_rows(data)
)
} else {
dataset <- data
}
dataset
}
common.options <- list(
strategy = list(
timeCurve = list(strategy = timeCurve, timeout = 10, start = 0.001)),
dataset = list(
nycflights13 = list(data = dataset)))
conversion.methods <- list(
method = list(
convert = list(method = timeConvert))
)
synchronous.methods <- list(
method = list(
conn = list(method = timeConnection),
file = list(method = timeFileIO)
))
concurrent.methods <- list(
method = list(
remote = list(method = timeRemoteTransfer),
socket = list(method = timeSocketTransfer),
fifo = list(method = timeFifoTransfer))
)
connection.methods <- list(
method = c(concurrent.methods$method,
synchronous.methods$method)
)
convert.common.options <- c(
common.options,
conversion.methods
)
raw.common.options <- c(
common.options,
conversion.methods,
raw = list('TRUE' = list(raw = TRUE))
)
text.common.options <- c(
common.options,
conversion.methods,
raw = list('FALSE' = list(raw = FALSE))
)
all.common.options <- c(
common.options,
list(
method = c(connection.methods$method,
conversion.methods$method))
)
buffer_read_options <- function(reader = identity,
raw = FALSE,
buffer = (if (raw) bufferBytes2 else bufferLines)) {
c(common.options,
list(raw = structure(list(list(raw = raw)), names = as.character(raw))),
list(method = list(
conn = list(method = timeConnection,
reader = reader),
file = list(method = timeFileIO,
reader = reader),
fifo = list(method = timeFifoTransfer,
reader = buffer(reader)),
socket = list(method = timeSocketTransfer,
reader = buffer(reader)),
remote = list(method = timeRemoteTransfer,
reader = buffer(reader))
))
)
}
combine_opts <- function(x, ...) {
for (l in list(...)) {
for (n in names(l)) {
if (n %in% names(x)) {
x[[n]] <- c(x[[n]], l[[n]])
} else {
x[[n]] <- c(x[[n]], l[[n]])
}
}
}
x;
}
`%but%` <- function(l, r) {
l[names(r)] <- r;
l
}
showTimings <- function(timings, label, ...) {
stats <- with(timings, c(
"total.elapsed (s)" = total.elapsed,
"read.cpu (s)" = read.user + read.sys,
"read.elapsed (s)" = read.elapsed,
"write.cpu (s)" = write.user + write.sys,
"write.elapsed (s)" = write.elapsed,
"data size (MB)" = bytes / 0x100000
))
kable(stats, col.names = label, digits=2)
}
## list(x = ?first, ??rest) -> list()
## f <- function(a, call) bind({
## list(?first, ??rest) <- callList
## })
## Local Variables:
## ess-r-package-info: ("msgpack" . "~/msgpack/")
## End:
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.