Nothing
params <-
list(EVAL = TRUE)
## ---- SETTINGS-knitr, include=FALSE-----------------------------------------------------
stopifnot(require(knitr))
options(width = 90)
opts_chunk$set(
comment = NA,
message = FALSE,
warning = FALSE,
eval = if (isTRUE(exists("params"))) params$EVAL else FALSE,
dev = "jpeg",
dpi = 100,
fig.asp = 0.8,
fig.width = 5,
out.width = "60%",
fig.align = "center"
)
library(ggplot2)
library(brms)
theme_set(theme_default())
## ---- fake-data-sim, include=FALSE, eval=TRUE-------------------------------------------
set.seed(54647)
# number of observations
N <- 1E4
# number of group levels
G <- round(N / 10)
# number of predictors
P <- 3
# regression coefficients
beta <- rnorm(P)
# sampled covariates, group means and fake data
fake <- matrix(rnorm(N * P), ncol = P)
dimnames(fake) <- list(NULL, paste0("x", 1:P))
# fixed effect part and sampled group membership
fake <- transform(
as.data.frame(fake),
theta = fake %*% beta,
g = sample.int(G, N, replace=TRUE)
)
# add random intercept by group
fake <- merge(fake, data.frame(g = 1:G, eta = rnorm(G)), by = "g")
# linear predictor
fake <- transform(fake, mu = theta + eta)
# sample Poisson data
fake <- transform(fake, y = rpois(N, exp(mu)))
# shuffle order of data rows to ensure even distribution of computational effort
fake <- fake[sample.int(N, N),]
# drop not needed row names
rownames(fake) <- NULL
## ---- model-poisson, include=FALSE------------------------------------------------------
model_poisson <- brm(
y ~ 1 + x1 + x2 + (1 | g),
data = fake,
family = poisson(),
iter = 500, # short sampling to speedup example
chains = 2,
prior = prior(normal(0,1), class = b) +
prior(constant(1), class = sd, group = g),
backend = "cmdstanr",
threads = threading(4),
save_pars = save_pars(all = TRUE)
)
## ---- benchmark, include=FALSE----------------------------------------------------------
# Benchmarks given model with cross-product of tuning parameters CPU
# cores, grainsize and iterations. Models are run with either static
# or non-static scheduler and initial values are set by default to 0 on the
# unconstrained scale. Function returns a data-frame with the
# cross-product of the tuning parameters and as result column the
# respective runtime.
benchmark_threading <- function(model, cores = 1, grainsize = 1, iter = 100,
static = FALSE) {
winfo <- extract_warmup_info(model)
sims <- rstan::extract(model$fit)
init <- list(extract_draw(sims, 1))
scaling_model <- update(
model, refresh = 0,
threads = threading(1, grainsize = grainsize[1], static = static),
chains = 1, iter = 2, backend = "cmdstanr"
)
run_benchmark <- function(cores, size, iter) {
bench_fit <- update(
scaling_model, warmup=0, iter = iter,
chains = 1, seed = 1234, init = init, refresh = 0, save_warmup=TRUE,
threads = threading(cores, grainsize = size, static = static),
inv_metric=winfo$inv_metric[[1]],
step_size=winfo$step_size[[1]],
adapt_engaged=FALSE
)
lf <- sum(subset(nuts_params(bench_fit, inc_warmup=TRUE), Parameter=="n_leapfrog__")$Value)
elapsed <- sum(colSums(rstan::get_elapsed_time(bench_fit$fit)))
c(num_leapfrog=lf, runtime=elapsed)
}
cases <- expand.grid(cores = cores, grainsize = grainsize, iter = iter)
res <- with(cases, mapply(run_benchmark, cores, grainsize, iter))
cbind(cases, as.data.frame(t(res)))
}
benchmark_reference <- function(model, iter=100, init=0) {
winfo <- extract_warmup_info(model)
sims <- rstan::extract(model$fit)
init <- list(extract_draw(sims, 1))
ref_model <- update(
model, refresh = 0, threads = NULL,
chains = 1, iter = 2, backend = "cmdstanr"
)
run_benchmark_ref <- function(iter_bench) {
bench_fit <- update(
ref_model, warmup=0, iter = iter_bench,
chains = 1, seed = 1234, init = init, refresh = 0,
inv_metric=winfo$inv_metric[[1]],
step_size=winfo$step_size[[1]],
adapt_engaged=FALSE
)
lf <- sum(subset(nuts_params(bench_fit, inc_warmup=TRUE), Parameter=="n_leapfrog__")$Value)
elapsed <- sum(colSums(rstan::get_elapsed_time(bench_fit$fit)))
c(num_leapfrog=lf, runtime=elapsed)
}
ref <- sapply(iter, run_benchmark_ref)
ref <- cbind(as.data.frame(t(ref)), iter=iter)
ref
}
extract_warmup_info <- function(bfit) {
adapt <- lapply(rstan::get_adaptation_info(bfit$fit), strsplit, split="\\n")
step_size <- lapply(adapt, function(a) as.numeric(strsplit(a[[1]][[1]], " = ")[[1]][2]))
inv_metric <- lapply(adapt, function(a) as.numeric(strsplit(sub("^# ", "", a[[1]][[3]]), ", ")[[1]]))
list(step_size=step_size, inv_metric=inv_metric)
}
extract_draw <- function(sims, draw) {
lapply(sims, brms:::slice, dim = 1, i = draw, drop = TRUE)
}
## ---- eval=FALSE------------------------------------------------------------------------
# fit_serial <- brm(
# count ~ zAge + zBase * Trt + (1|patient),
# data = epilepsy, family = poisson(),
# chains = 4, cores = 4, backend = "cmdstanr"
# )
## ---- eval=FALSE------------------------------------------------------------------------
# fit_parallel <- update(
# fit_serial, chains = 2, cores = 2,
# backend = "cmdstanr", threads = threading(2)
# )
## ---------------------------------------------------------------------------------------
kable(head(fake, 10), digits = 3)
## ---- eval=FALSE------------------------------------------------------------------------
# model_poisson <- brm(
# y ~ 1 + x1 + x2 + (1 | g),
# data = fake,
# family = poisson(),
# iter = 500, # short sampling to speedup example
# chains = 2,
# prior = prior(normal(0,1), class = b) +
# prior(constant(1), class = sd, group = g),
# backend = "cmdstanr",
# threads = threading(4),
# save_pars = save_pars(all = TRUE)
# )
## ---- chunking-scale, message=FALSE, warning=FALSE, results='hide'----------------------
chunking_bench <- transform(
data.frame(chunks = 4^(0:3)),
grainsize = ceiling(N / chunks)
)
iter_test <- c(10, 20, 40) # very short test runs
scaling_chunking <- benchmark_threading(
model_poisson,
cores = 1,
grainsize = chunking_bench$grainsize, # test various grainsizes
iter = iter_test,
static = TRUE # with static partitioner
)
# run as reference the model *without* reduce_sum
ref <- benchmark_reference(model_poisson, iter_test)
# for additional data munging please refer to the appendix
## ---- munge-chunking-scaling, include=FALSE---------------------------------------------
scaling_chunking <- merge(scaling_chunking, chunking_bench, by = "grainsize")
single_chunk <- transform(
subset(scaling_chunking, chunks == 1),
num_leapfrog_single = num_leapfrog, num_leapfrog = NULL,
runtime_single = runtime, runtime = NULL,
grainsize = NULL, chunks=NULL
)
scaling_chunking <- transform(
merge(scaling_chunking, single_chunk),
slowdown = runtime/runtime_single,
iter = factor(iter),
runtime_single = NULL
)
ref <- transform(ref, iter=factor(iter))
## ---------------------------------------------------------------------------------------
ggplot(scaling_chunking) +
aes(chunks, slowdown, colour = iter, shape = iter) +
geom_line() + geom_point() +
scale_x_log10(breaks = scaling_chunking$chunks) +
scale_y_log10(breaks=seq(0.8, 2.5, by=0.1)) +
ggtitle("Slowdown with increasing number of chunks")
ggplot(scaling_chunking) +
aes(chunks, 1E3 * runtime/num_leapfrog, colour = iter, shape=iter) +
geom_line() + geom_point() +
scale_x_log10(breaks = scaling_chunking$chunks) +
scale_y_log10(breaks=seq(0.1, 2.0, by=0.1)) +
geom_hline(data=ref, aes(yintercept=1E3 * runtime/num_leapfrog, colour=iter), linetype=I(2)) +
ggtitle("Time per leapfrog step vs number of chunks",
"Dashed line is reference model without reduce_sum") +
ylab("Time per leapfrog step [ms]")
## ---- speedup-scale, message=FALSE, warning=FALSE, results='hide'-----------------------
num_cpu <- parallel::detectCores(logical = FALSE)
num_cpu_logical <- parallel::detectCores(logical = TRUE)
grainsize_default <- ceiling(N / (2 * num_cpu))
cores <- c(2^seq(0, floor(log2(num_cpu_logical))), num_cpu, num_cpu_logical)
cores <- sort(unique(cores))
grainsize <- c(grainsize_default, grainsize_default/2, grainsize_default/4)
grainsize <- round(grainsize)
iter_scaling <- 20
scaling_cores <- benchmark_threading(
model_poisson,
cores = cores,
grainsize = grainsize,
iter = iter_scaling,
static = FALSE
)
single_core <- transform(
subset(scaling_cores, cores == 1),
runtime_single = runtime,
num_leapfrog=NULL, runtime=NULL, cores = NULL
)
scaling_cores <- transform(
merge(scaling_cores, single_core),
speedup = runtime_single/runtime,
grainsize = factor(grainsize)
)
## ---------------------------------------------------------------------------------------
ggplot(scaling_cores) +
aes(cores, runtime, shape = grainsize, color = grainsize) +
geom_vline(xintercept = num_cpu, linetype = 3) +
geom_line() + geom_point() +
scale_x_log10(breaks = scaling_cores$cores) +
scale_y_log10(breaks=seq(0.1, 1.4, by=0.1)) +
theme(legend.position = c(0.85, 0.8)) +
geom_hline(data=subset(ref, iter==iter_scaling), aes(yintercept=runtime), linetype=I(2)) +
ggtitle("Runtime with varying number of cores",
"Dashed line is reference model without reduce_sum")
ggplot(scaling_cores) +
aes(cores, speedup, shape = grainsize, color = grainsize) +
geom_abline(slope = 1, intercept = 0, linetype = 2) +
geom_vline(xintercept = num_cpu, linetype = 3) +
geom_line() + geom_point() +
scale_x_log10(breaks=scaling_cores$cores) +
scale_y_log10(breaks=scaling_cores$cores) +
theme(aspect.ratio = 1) +
coord_fixed(xlim = c(1, num_cpu_logical), ylim = c(1, num_cpu_logical)) +
ggtitle("Relative speedup vs 1 core")
## ---------------------------------------------------------------------------------------
kable(scaling_cores, digits = 2)
## ---- eval=FALSE------------------------------------------------------------------------
# set.seed(54647)
# # number of observations
# N <- 1E4
# # number of group levels
# G <- round(N / 10)
# # number of predictors
# P <- 3
# # regression coefficients
# beta <- rnorm(P)
#
# # sampled covariates, group means and fake data
# fake <- matrix(rnorm(N * P), ncol = P)
# dimnames(fake) <- list(NULL, paste0("x", 1:P))
#
# # fixed effect part and sampled group membership
# fake <- transform(
# as.data.frame(fake),
# theta = fake %*% beta,
# g = sample.int(G, N, replace=TRUE)
# )
#
# # add random intercept by group
# fake <- merge(fake, data.frame(g = 1:G, eta = rnorm(G)), by = "g")
#
# # linear predictor
# fake <- transform(fake, mu = theta + eta)
#
# # sample Poisson data
# fake <- transform(fake, y = rpois(N, exp(mu)))
#
# # shuffle order of data rows to ensure even distribution of computational effort
# fake <- fake[sample.int(N, N),]
#
# # drop not needed row names
# rownames(fake) <- NULL
## ---- eval=FALSE------------------------------------------------------------------------
# model_poisson <- brm(
# y ~ 1 + x1 + x2 + (1 | g),
# data = fake,
# family = poisson(),
# iter = 500, # short sampling to speedup example
# chains = 2,
# prior = prior(normal(0,1), class = b) +
# prior(constant(1), class = sd, group = g),
# backend = "cmdstanr",
# threads = threading(4),
# save_pars = save_pars(all = TRUE)
# )
## ---- eval=FALSE------------------------------------------------------------------------
# # Benchmarks given model with cross-product of tuning parameters CPU
# # cores, grainsize and iterations. Models are run with either static
# # or non-static scheduler and initial values are set by default to 0 on the
# # unconstrained scale. Function returns a data-frame with the
# # cross-product of the tuning parameters and as result column the
# # respective runtime.
# benchmark_threading <- function(model, cores = 1, grainsize = 1, iter = 100,
# static = FALSE) {
#
# winfo <- extract_warmup_info(model)
# sims <- rstan::extract(model$fit)
# init <- list(extract_draw(sims, 1))
#
# scaling_model <- update(
# model, refresh = 0,
# threads = threading(1, grainsize = grainsize[1], static = static),
# chains = 1, iter = 2, backend = "cmdstanr"
# )
#
# run_benchmark <- function(cores, size, iter) {
# bench_fit <- update(
# scaling_model, warmup=0, iter = iter,
# chains = 1, seed = 1234, init = init, refresh = 0, save_warmup=TRUE,
# threads = threading(cores, grainsize = size, static = static),
# inv_metric=winfo$inv_metric[[1]],
# step_size=winfo$step_size[[1]],
# adapt_engaged=FALSE
# )
# lf <- sum(subset(nuts_params(bench_fit, inc_warmup=TRUE), Parameter=="n_leapfrog__")$Value)
# elapsed <- sum(colSums(rstan::get_elapsed_time(bench_fit$fit)))
#
# c(num_leapfrog=lf, runtime=elapsed)
# }
#
# cases <- expand.grid(cores = cores, grainsize = grainsize, iter = iter)
# res <- with(cases, mapply(run_benchmark, cores, grainsize, iter))
# cbind(cases, as.data.frame(t(res)))
# }
#
# benchmark_reference <- function(model, iter=100, init=0) {
# winfo <- extract_warmup_info(model)
# sims <- rstan::extract(model$fit)
# init <- list(extract_draw(sims, 1))
#
# ref_model <- update(
# model, refresh = 0, threads = NULL,
# chains = 1, iter = 2, backend = "cmdstanr"
# )
#
# run_benchmark_ref <- function(iter_bench) {
# bench_fit <- update(
# ref_model, warmup=0, iter = iter_bench,
# chains = 1, seed = 1234, init = init, refresh = 0,
# inv_metric=winfo$inv_metric[[1]],
# step_size=winfo$step_size[[1]],
# adapt_engaged=FALSE
# )
#
# lf <- sum(subset(nuts_params(bench_fit, inc_warmup=TRUE), Parameter=="n_leapfrog__")$Value)
# elapsed <- sum(colSums(rstan::get_elapsed_time(bench_fit$fit)))
#
# c(num_leapfrog=lf, runtime=elapsed)
# }
#
# ref <- sapply(iter, run_benchmark_ref)
# ref <- cbind(as.data.frame(t(ref)), iter=iter)
# ref
# }
#
# extract_warmup_info <- function(bfit) {
# adapt <- lapply(rstan::get_adaptation_info(bfit$fit), strsplit, split="\\n")
# step_size <- lapply(adapt, function(a) as.numeric(strsplit(a[[1]][[1]], " = ")[[1]][2]))
# inv_metric <- lapply(adapt, function(a) as.numeric(strsplit(sub("^# ", "", a[[1]][[3]]), ", ")[[1]]))
# list(step_size=step_size, inv_metric=inv_metric)
# }
#
# extract_draw <- function(sims, draw) {
# lapply(sims, brms:::slice, dim = 1, i = draw, drop = TRUE)
# }
## ---- eval=FALSE------------------------------------------------------------------------
# scaling_chunking <- merge(scaling_chunking, chunking_bench, by = "grainsize")
#
# single_chunk <- transform(
# subset(scaling_chunking, chunks == 1),
# num_leapfrog_single = num_leapfrog, num_leapfrog = NULL,
# runtime_single = runtime, runtime = NULL,
# grainsize = NULL, chunks=NULL
# )
#
# scaling_chunking <- transform(
# merge(scaling_chunking, single_chunk),
# slowdown = runtime/runtime_single,
# iter = factor(iter),
# runtime_single = NULL
# )
#
# ref <- transform(ref, iter=factor(iter))
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.