#' @importFrom foreach %dopar% %do% foreach
#' @importFrom doParallel registerDoParallel stopImplicitCluster
#' @importFrom itertools isplitVector
#' @importFrom data.table rbindlist
#' @importFrom iterators icount
#' @import Formula
#' @export
Simulator <- R6::R6Class(
"Simulator",
class = FALSE,
public = list(
agents = NULL,
workers = NULL,
agent_count = NULL,
horizon = NULL,
simulations = NULL,
worker_max = NULL,
internal_history = NULL,
save_context = NULL,
save_theta = NULL,
do_parallel = NULL,
sims_and_agents_list = NULL,
t_over_sims = NULL,
set_seed = NULL,
progress_file = NULL,
log_interval = NULL,
save_interval = NULL,
include_packages = NULL,
outfile = NULL,
global_seed = NULL,
chunk_multiplier = NULL,
policy_time_loop = NULL,
cl = NULL,
initialize = function(agents,
horizon = 100L,
simulations = 100L,
save_context = FALSE,
save_theta = FALSE,
do_parallel = TRUE,
worker_max = NULL,
set_seed = 0,
save_interval = 1,
progress_file = FALSE,
log_interval = 1000,
include_packages = NULL,
t_over_sims = FALSE,
chunk_multiplier = 1,
policy_time_loop = FALSE) {
# save current seed
self$global_seed <- contextual::get_global_seed()
if (!is.list(agents)) agents <- list(agents)
self$progress_file <- progress_file
self$log_interval <- as.integer(log_interval)
self$horizon <- as.integer(horizon)
self$simulations <- as.integer(simulations)
self$save_theta <- save_theta
self$save_context <- save_context
self$agents <- agents
self$agent_count <- length(agents)
self$worker_max <- worker_max
self$do_parallel <- do_parallel
self$t_over_sims <- t_over_sims
self$set_seed <- set_seed
self$save_interval <- as.integer(save_interval)
self$include_packages <- include_packages
self$chunk_multiplier <- as.integer(chunk_multiplier)
self$policy_time_loop <- policy_time_loop
self$reset()
},
reset = function() {
set.seed(self$set_seed)
self$workers <- 1
# create or clear log files
if (self$progress_file) {
cat(paste0(""), file = "workers_progress.log", append = FALSE)
cat(paste0(""), file = "agents_progress.log", append = FALSE)
cat(paste0(""), file = "parallel.log", append = FALSE)
self$outfile <- "parallel.log"
}
# (re)create history data and meta data tables
self$internal_history <- History$new()
self$internal_history$set_meta_data("horizon",self$horizon)
self$internal_history$set_meta_data("agents",self$agent_count)
self$internal_history$set_meta_data("simulations",self$simulations)
self$internal_history$set_meta_data("sim_start_time",format(Sys.time(), "%a %b %d %X %Y"))
# unique policy name creation
agent_name_list <- list()
for (agent_index in 1L:self$agent_count) {
current_agent_name <- self$agents[[agent_index]]$name
agent_name_list <- c(agent_name_list,current_agent_name)
current_agent_name_occurrences <-
length(agent_name_list[agent_name_list == current_agent_name])
if (current_agent_name_occurrences > 1) {
self$agents[[agent_index]]$name <-
paste0(current_agent_name,'.',current_agent_name_occurrences)
}
agent_name <- self$agents[[agent_index]]$name
bandit_name <- self$agents[[agent_index]]$bandit$class_name
policy_name <- self$agents[[agent_index]]$policy$class_name
self$internal_history$set_meta_data("bandit", bandit_name , group = "sim", agent_name = agent_name)
self$internal_history$set_meta_data("policy", policy_name , group = "sim", agent_name = agent_name)
}
},
run = function() {
# set parallel or serial processing
`%fun%` <- foreach::`%do%`
# nocov start
if (self$do_parallel) {
self$register_parallel_backend()
`%fun%` <- foreach::`%dopar%`
# If Microsoft R, set MKL threads to 1
# Due to an unresolved incompatibility between MRAN and RStudio:
# https://github.com/rstudio/rstudio/issues/5933
# https://social.technet.microsoft.com/Forums/en-US/2791e896-c284-4330-88f2-2dcd4acea074
# setting MKL threads to 1 is disabled when running from RStudio.
isRStudio <- Sys.getenv("RSTUDIO") == "1"
if (!isRStudio && "RevoUtilsMath" %in% rownames(installed.packages())) {
RevoUtilsMath::setMKLthreads(1)
}
}
# nocov end
# create a list of all sims (sims*agents), to be divided into chunks
index <- 1
sims_and_agents_list <- vector("list", self$simulations*self$agent_count)
for (sim_index in 1L:self$simulations) {
for (agent_index in 1L:self$agent_count) {
sims_and_agents_list[[index]] <-
list(agent_index = agent_index, sim_index = sim_index)
index <- index + 1
}
}
# copy variables used in parallel processing to local environment
horizon <- self$horizon
agent_count <- self$agent_count
save_context <- self$save_context
save_theta <- self$save_theta
progress_file <- self$progress_file
save_interval <- self$save_interval
log_interval <- self$log_interval
t_over_sims <- self$t_over_sims
set_seed <- self$set_seed
agents <- self$agents
include_packages <- self$include_packages
policy_time_loop <- self$policy_time_loop
# calculate chunk size
if (length(sims_and_agents_list) <= self$workers) {
chunk_divider <- length(sims_and_agents_list)
} else {
chunk_divider <- self$workers * self$chunk_multiplier
}
# split sims vector into chuncks
sa_iterator <- itertools::isplitVector(sims_and_agents_list, chunks = chunk_divider)
# include packages that are used in parallel processes
par_packages <- c(c("data.table","iterators","itertools"),include_packages)
# some info messages
message(paste("Simulation horizon:",horizon))
message(paste("Number of simulations:",length(sims_and_agents_list)))
message(paste("Number of batches:",chunk_divider))
message("Starting main loop.")
# start running the main simulation loop
private$start_time <- Sys.time()
foreach_results <- foreach::foreach(
sims_agent_list = sa_iterator,
i = iterators::icount(),
.inorder = TRUE,
.export = c("History","Formula"),
.noexport = c("sims_and_agents_list","internal_history","sa_iterator"),
.packages = par_packages
) %fun% {
index <- 1L
sim_agent_counter <- 0
sim_agent_total <- length(sims_agent_list)
# TODO: Can be done smarter and cleaner?
multiplier <- 1
for (sim_agent_index in sims_agent_list) {
sim_agent <- agents[[sim_agent_index$agent_index]]
if(isTRUE(sim_agent$bandit$arm_multiply))
if(multiplier < sim_agent$bandit$k)
multiplier <- sim_agent$bandit$k
}
allocate_space <- floor((horizon * sim_agent_total * multiplier) / save_interval) + sim_agent_total
local_history <- History$new( allocate_space,
save_context,
save_theta)
for (sim_agent_index in sims_agent_list) {
sim_agent <- agents[[sim_agent_index$agent_index]]$clone(deep = TRUE)
sim_agent$sim_index <- sim_agent_index$sim_index
sim_agent$agent_index <- sim_agent_index$agent_index
sim_agent_counter <- sim_agent_counter + 1
if (isTRUE(progress_file)) {
sim_agent$progress_file <- TRUE
sim_agent$log_interval <- log_interval
cat(paste0("[",format(Sys.time(), format = "%H:%M:%OS6"),"] ",
" 0 > init - ",sprintf("%-20s", sim_agent$name),
" worker ", i,
" at sim ", sim_agent_counter,
" of ", sim_agent_total,"\n"),
file = "workers_progress.log", append = TRUE)
}
simulation_index <- sim_agent$sim_index
agent_name <- sim_agent$name
local_curent_seed <- simulation_index + set_seed * 42
set.seed(local_curent_seed)
sim_agent$bandit$post_initialization()
sim_agent$policy$post_initialization()
if(isTRUE(sim_agent$bandit$arm_multiply)) {
if(policy_time_loop)
horizon_loop <- horizon
else
horizon_loop <- horizon * sim_agent$bandit$k
data_length <- horizon * sim_agent$bandit$k
} else {
horizon_loop <- horizon
data_length <- horizon
}
set.seed(local_curent_seed + 1e+06)
sim_agent$bandit$generate_bandit_data(n = data_length)
if (isTRUE(t_over_sims)) sim_agent$set_t(as.integer((simulation_index - 1L) * horizon_loop))
step <- list()
loop_time <- 0L
while (loop_time < horizon_loop) {
step <- sim_agent$do_step()
if(isTRUE(policy_time_loop)) {
loop_time <- step[[5]]
} else {
loop_time <- loop_time + 1L
}
if (!is.null(step[[3]]) && ((step[[5]] == 1) || (step[[5]] %% save_interval == 0))) {
local_history$insert(
index, #index
step[[5]], #policy_t
step[[1]][["k"]], #k
step[[1]][["d"]], #d
step[[2]], #action
step[[3]], #reward
agent_name, #agentname
simulation_index, #sim
if (save_context) step[[1]][["X"]] else NA, #context
if (save_theta) step[[4]] else NA #theta
)
index <- index + 1L
}
}
}
sim_agent$bandit$final()
local_history$data[t!=0]
}
# bind all results
foreach_results <- data.table::rbindlist(foreach_results)
foreach_results[, agent := factor(agent)]
self$internal_history$set_data_table(foreach_results[sim > 0 & t > 0], auto_stats = FALSE)
rm(foreach_results)
private$end_time <- Sys.time()
gc()
message("Finished main loop.")
self$internal_history$set_meta_data("sim_end_time",format(Sys.time(), "%a %b %d %X %Y"))
formatted_duration <- contextual::formatted_difftime(private$end_time - private$start_time)
self$internal_history$set_meta_data("sim_total_duration", formatted_duration)
message(paste0("Completed simulation in ",formatted_duration))
start_time_stats <- Sys.time()
message("Computing statistics.")
# update statistics TODO: not always necessary, add option arg to class?
self$internal_history$update_statistics()
# load global seed
.Random.seed <- self$global_seed
# set meta data and messages
self$stop_parallel_backend()
self$internal_history
},
register_parallel_backend = function() {
# nocov start
# setup parallel backend
message("Setting up parallel backend.")
nr_cores <- parallel::detectCores()
if (nr_cores >= 3) self$workers <- nr_cores - 1
if (!is.null(self$worker_max)) {
if (self$workers > self$worker_max) self$workers <- self$worker_max
}
# make sure no leftover processes
doParallel::stopImplicitCluster()
if(!is.null(self$outfile)) {
self$cl <- parallel::makeCluster(self$workers, useXDR = FALSE, type = "PSOCK",
methods = FALSE, setup_timeout = 30, outfile = self$outfile)
} else {
self$cl <- parallel::makeCluster(self$workers, useXDR = FALSE, type = "PSOCK",
methods = FALSE, setup_timeout = 30)
}
message(paste0("Cores available: ",nr_cores))
message(paste0("Workers assigned: ",self$workers))
doParallel::registerDoParallel(self$cl)
# nocov end
},
stop_parallel_backend = function() {
# nocov start
if (self$do_parallel) {
try({
parallel::stopCluster(self$cl)
})
doParallel::stopImplicitCluster()
}
# nocov end
},
finalize = function() {
# set global seed back to value before
contextual::set_global_seed(self$global_seed)
#closeAllConnections()
}
),
private = list(
start_time = NULL,
end_time = NULL
),
active = list(
history = function(value) {
if (missing(value)) {
self$internal_history
} else {
warning("## history$data is read only", call. = FALSE)
}
}
)
)
#' Simulator
#'
#' The entry point of any \pkg{contextual} simulation.
#'
#' A Simulator takes, at a minimum, one or more \code{\link{Agent}} instances, a horizon
#' (the length of an individual simulation, \emph{t} = \{1, \ldots, T\}) and the number of simulations
#' (How many times to repeat each simulation over \emph{t} = \{1, \ldots, T\}, with a new seed
#' on each repeat*).
#'
#' It then runs all simulations (in parallel by default), keeping a log of all \code{\link{Policy}} and
#' \code{\link{Bandit}} interactions in a \code{\link{History}} instance.
#'
#' * Note: to be able to fairly evaluate and compare each agent's performance, and to make sure that
#' simulations are replicable, for each separate agent, seeds are set equally and deterministically for
#' each agent over all \code{horizon x simulations} time steps.
#'
#' ![](1simulator.jpeg "contextual diagram: simulator")
#'
#' @name Simulator
#' @aliases run simulator
#'
#' @section Usage:
#' \preformatted{
#' simulator <- Simulator$new(agents,
#' horizon = 100L,
#' simulations = 100L,
#' save_context = FALSE,
#' save_theta = FALSE,
#' do_parallel = TRUE,
#' worker_max = NULL,
#' set_seed = 0,
#' save_interval = 1,
#' progress_file = FALSE,
#' log_interval = 1000,
#' include_packages = NULL,
#' t_over_sims = FALSE,
#' chunk_multiplier = 1,
#' policy_time_loop = FALSE)
#' }
#'
#' @section Arguments:
#'
#' \describe{
#' \item{\code{agents}}{
#' An \code{Agent} instance or a \code{list} of \code{Agent} instances.
#' }
#' \item{\code{horizon}}{
#' \code{integer}. The number of pulls or time steps to run each agent, where \emph{t} = \{1, \ldots, T\}.
#' }
#' \item{\code{simulations}}{
#' \code{integer}. How many times to repeat each agent's simulation over \emph{t} = \{1, \ldots, T\},
#' with a new seed on each repeat (itself deterministically derived from set\_seed).
#' }
#' \item{\code{save_interval}}{
#' \code{integer}. Write data to historyonly every \code{save_interval} time steps. Default is 1.
#' }
#' \item{\code{save_context}}{
#' \code{logical}. Save the context matrices \code{X} to the History log during a simulation?
#' }
#' \item{\code{save_theta}}{
#' \code{logical}. Save the parameter list \code{theta} to the History log during a simulation?
#' }
#' \item{\code{do_parallel}}{
#' \code{logical}. Run \code{Simulator} processes in parallel?
#' }
#' \item{\code{worker_max}}{
#' \code{integer}. Specifies how many parallel workers are to be used.
#' If unspecified, the amount of workers defaults to \code{max(workers_available)-1}.
#' }
#' \item{\code{t_over_sims}}{
#' \code{logical}. Of use to, among others, offline Bandits.
#' If \code{t_over_sims} is set to \code{TRUE}, the current \code{Simulator}
#' iterates over all rows in a data set for each repeated simulation.
#' If \code{FALSE}, it splits the data into \code{simulations} parts,
#' and a different subset of the data for each repeat of an agent's simulation.
#' }
#' \item{\code{set_seed}}{
#' \code{integer}. Sets the seed of R's random number generator for the current \code{Simulator}.
#' }
#' \item{\code{progress_file}}{
#' \code{logical}. If \code{TRUE}, \code{Simulator} writes \code{workers_progress.log},
#' \code{agents_progress.log} and \code{parallel.log} files to the current working directory,
#' allowing you to keep track of respectively \code{workers}, \code{agents},
#' and potential errors when running a \code{Simulator} in parallel mode.
#' }
#' \item{\code{log_interval}}{
#' \code{integer}. Sets the log write interval. Default every 1000 time steps.
#' }
#' \item{\code{include_packages}}{
#' \code{List}. List of packages that (one of) the policies depend on. If a \code{Policy} requires an
#' R package to be loaded, this option can be used to load that package on each of the workers.
#' Ignored if \code{do_parallel} is \code{FALSE}.
#' }
#' \item{\code{chunk_multiplier}}{
#' \code{integer} By default, simulations are equally divided over available workers, and every
#' worker saves its simulation results to a local history file which is then aggregated.
#' Depending on workload, network bandwith, memory size and other variables it can sometimes be useful to
#' break these workloads into smaller chunks. This can be done by setting the chunk_multiplier to some
#' integer value, where the number of chunks will total chunk_multiplier x number_of_workers.
#' }
#' \item{\code{policy_time_loop}}{
#' \code{logical} In the case of replay style bandits, a Simulator's horizon equals the number of
#' accepted plus the number of rejected data points or samples. If \code{policy_time_loop} is \code{TRUE},
#' the horizon equals the number of accepted data points or samples. That is, when \code{policy_time_loop}
#' is \code{TRUE}, a Simulator will keep running until the number of data points saved to History is
#' equal to the Simulator's horizon.
#' }
#'
#' }
#'
#' @section Methods:
#'
#' \describe{
#'
#' \item{\code{reset()}}{
#' Resets a \code{Simulator} instance to its original initialisation values.
#' }
#'
#' \item{\code{run()}}{
#' Runs a \code{Simulator} instance.
#' }
#' \item{\code{history}}{
#' Active binding, read access to Simulator's History instance.
#' }
#'
#' }
#'
#' @seealso
#'
#' Core contextual classes: \code{\link{Bandit}}, \code{\link{Policy}}, \code{\link{Simulator}},
#' \code{\link{Agent}}, \code{\link{History}}, \code{\link{Plot}}
#'
#' Bandit subclass examples: \code{\link{BasicBernoulliBandit}}, \code{\link{ContextualLogitBandit}}, \code{\link{OfflineReplayEvaluatorBandit}}
#'
#' Policy subclass examples: \code{\link{EpsilonGreedyPolicy}}, \code{\link{ContextualLinTSPolicy}}
#'
#' @examples
#' \dontrun{
#'
#' policy <- EpsilonGreedyPolicy$new(epsilon = 0.1)
#' bandit <- BasicBernoulliBandit$new(weights = c(0.6, 0.1, 0.1))
#' agent <- Agent$new(policy, bandit, name = "E.G.", sparse = 0.5)
#'
#' history <- Simulator$new(agents = agent,
#' horizon = 10,
#' simulations = 10)$run()
#'
#' summary(history)
#'
#' plot(history)
#'
#' dt <- history$get_data_table()
#'
#' df <- history$get_data_frame()
#'
#' print(history$cumulative$E.G.$cum_regret_sd)
#'
#' print(history$cumulative$E.G.$cum_regret)
#'
#' }
#'
NULL
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.