Nothing
#' Run the simulation engine with constrained event processing using event queues
#'
#' This function implements a discrete event simulation engine that processes events
#' in chronological order across all patients within each arm, using shared event queues.
#' Unlike run_engine which processes patients sequentially, this function loads all
#' patients for an arm and then processes events in time order regardless of patient.
#'
#' @param arm_list A vector of the names of the interventions evaluated in the simulation
#' @param common_pt_inputs A list of inputs that change across patients but are not affected by the intervention.
#' These inputs are loaded into patient-specific environments that are children of the arm-level environment.
#' @param unique_pt_inputs A list of inputs that change across each intervention.
#' These inputs are loaded into patient-arm specific environments.
#' @param input_list A list of all other inputs including:
#' \itemize{
#' \item drc, drq: discount rates for costs and QALYs
#' \item psa_bool: whether PSA is enabled
#' \item init_event_list: list of initial events and event times
#' \item evt_react_list: list of event reactions
#' \item uc_lists: list containing util_ongoing_list, util_instant_list, etc.
#' \item input_out: vector of output variable names
#' \item npats: number of patients
#' \item simulation: simulation number
#' }
#' @param pb Progress bar function
#' @param seed Starting seed to be used for the whole analysis
#'
#' @return A list containing simulation results with the same structure as run_engine
#'
#' @importFrom stats setNames
#'
#' @details
#' The function creates a hierarchical environment structure per arm:
#' \enumerate{
#' \item Simulation-level environment (cloned per arm)
#' \item Patient-level environments (children of arm simulation environment)
#' \item Patient-arm environments (children of patient environment)
#' }
#'
#' This allows patients within an arm to modify shared inputs (like hospital resources)
#' while maintaining patient-specific inputs (like age) in isolation.
#'
#' Events are processed using a shared event queue per arm, ensuring chronological
#' ordering across all patients within the arm.
#'
#'
#' @noRd
run_engine_constrained <- function(arm_list,
common_pt_inputs = NULL,
unique_pt_inputs = NULL,
input_list = NULL,
pb = pb,
seed = seed,
.warden_ctx = .warden_ctx) {
on.exit({
# If engine exits abnormally, make sure we don’t lose what was in temp_log_pt
if (isTRUE(input_list$debug)) {
sink <- get0("log_sink", envir = input_list, inherits = TRUE, ifnotfound = NULL)
if (!is.null(sink) && exists("temp_log_pt", inherits = FALSE)) {
sink$entries <- c(sink$entries, lapply(temp_log_pt, transform_debug))
}
}
}, add = TRUE)
# Initial set-up --------------------------
arm_list <- arm_list
simulation <- input_list$simulation
sens <- input_list$sens
n_sensitivity <- input_list$n_sensitivity
length_sensitivities <- input_list$length_sensitivities
n_sim <- input_list$n_sim
n_arms <- length(arm_list)
npats <- input_list$npats
psa_bool <- input_list$psa_bool
env_setup_pt <- input_list$env_setup_pt
env_setup_arm <- input_list$env_setup_arm
debug <- input_list$debug
if (input_list$accum_backwards) {
input_list$ongoing_inputs_lu <- paste0(input_list$uc_lists$ongoing_inputs, "_lastupdate", recycle0 = TRUE)
input_out_v <- c(input_list$input_out, input_list$ongoing_inputs_lu)
# Initialize ongoing_list_temp for backwards accumulation
if (!is.null(input_list$uc_lists$ongoing_inputs)) {
input_list$ongoing_list_temp <- setNames(
rep(0, length(input_list$uc_lists$ongoing_inputs)),
input_list$uc_lists$ongoing_inputs
)
}
} else {
input_out_v <- c(input_list$input_out)
}
# Get priority order for event queue (from init_event_list)
if (!is.null(input_list$init_event_list)) {
priority_order <- input_list$init_event_list[[1]]$evts
} else {
priority_order <- "start" # default if no events defined
}
# Storage for patient data per arm
patdata <- vector("list", length=npats) # empty list with npats elements
temp_log_pt <- list()
temp_log <- list()
list_discrete_resources <- list()
list_shared_inputs <- list()
for (obj in ls(input_list)) {
if(inherits(input_list[[obj]],"resource_discrete")){
new_obj <- list(input_list[[obj]])
names(new_obj) <- obj
list_discrete_resources <- c(list_discrete_resources,new_obj)
}
if(inherits(input_list[[obj]],"shared_input")){
new_obj <- list(input_list[[obj]])
names(new_obj) <- obj
list_shared_inputs <- c(list_shared_inputs,new_obj)
}
}
l_disres <- length(list_discrete_resources)
l_sharedi <- length(list_shared_inputs)
if(l_disres>0){
names_disres <- names(list_discrete_resources)
cloned_resources <- list()
for (obj in 1:l_disres) {
cloned_resources[[obj]] <- discrete_resource_clone(list_discrete_resources[[obj]], n_arms)
}
}
if(l_sharedi>0){
names_sharedi <- names(list_shared_inputs)
cloned_sharedi <- list()
for (obj in 1:l_sharedi) {
cloned_sharedi[[obj]] <- list_shared_inputs[[obj]]$fork(n_arms)
}
}
# 1 Loop per arm ----------------------------------------------------------
for (arm in arm_list) {
# Clone simulation environment for this arm
# input_list_arm_base <- rlang::env_clone(input_list, parent.env(input_list))
input_list_arm_base <- new.env(parent = input_list)
list2env(as.list(input_list), input_list_arm_base)
input_list_arm_base$arm <- arm
if(l_disres>0){
which_arm <- which(arm==arm_list)
for (obj in 1:l_disres) {
input_list_arm_base[[names_disres[[obj]]]] <- cloned_resources[[obj]][[which_arm]]
}
}
if(l_sharedi>0){
which_arm <- which(arm==arm_list)
for (obj in 1:l_sharedi) {
input_list_arm_base[[names_sharedi[[obj]]]] <- cloned_sharedi[[obj]][[which_arm]]
}
}
# Create event queue for this arm
event_queue <- queue_create(priority_order)
# Storage for patient environments and data
patient_arm_environments <- vector("list", length = npats)
# 2 Load all patients for this arm and initialize events ---------------
for (i in 1:npats) {
set.seed((simulation * 1007 + i * 53) * seed)
# Progress update
if ((((sens - 1) * n_sim * npats * n_arms) + ((simulation - 1) * npats * n_arms) + i) %%
ceiling(npats * n_arms * n_sim * length_sensitivities / min(npats * n_arms * length_sensitivities * n_sim, 50)) == 0) {
pb(sprintf("Simulation %g", simulation))
}
# Create patient environment (child of arm base environment)
# input_list_pt <- rlang::env_clone(input_list_arm_base, parent.env(input_list_arm_base))
input_list_pt <- new.env(parent = input_list_arm_base)
list2env(as.list(input_list_arm_base), input_list_pt)
input_list_pt$i <- i
.set_last_ctx("Error in setup:common_pt_inputs", sens=input_list$sens,
simulation=input_list$simulation, patient_id=i, arm=arm, .warden_ctx = .warden_ctx)
# Load common patient inputs if they exist
if (!is.null(common_pt_inputs)) {
load_inputs(inputs = input_list_pt, list_uneval_inputs = common_pt_inputs)
if (input_list$debug) {
dump_info <- debug_inputs(input_list_arm_base, input_list_pt)
names(dump_info) <- paste0("Analysis: ", input_list_pt$sens, " ", input_list_pt$sens_name_used,
"; Sim: ", input_list_pt$simulation,
"; Arm: ", input_list_pt$arm,
"; Patient: ", input_list_pt$i,
"; Initial Patient Conditions")
temp_log_pt <- c(temp_log_pt, dump_info)
log_add(dump_info)
}
}
set.seed(seed*(simulation*1007 + i*53 + which(arm==arm_list)))
# Create patient-arm environment (child of patient environment)
# input_list_arm <- rlang::env_clone(input_list_pt, parent.env(input_list_pt))
input_list_arm <- new.env(parent = input_list_pt)
list2env(as.list(input_list_pt), input_list_arm)
.set_last_ctx("Error in setup:unique_pt_inputs", sens=input_list$sens,
simulation=input_list$simulation, patient_id=i, arm=arm, .warden_ctx = .warden_ctx)
# Load unique patient-arm inputs
if (!is.null(unique_pt_inputs)) {
load_inputs(inputs = input_list_arm, list_uneval_inputs = unique_pt_inputs)
if (input_list_pt$debug) {
dump_info <- debug_inputs(input_list_pt, input_list_arm)
names(dump_info) <- paste0("Analysis: ", input_list_arm$sens, " ", input_list_arm$sens_name_used,
"; Sim: ", input_list_arm$simulation,
"; Arm: ", input_list_arm$arm,
"; Patient: ", input_list_arm$i,
"; Initial Patient-Arm Conditions")
temp_log_pt <- c(temp_log_pt, dump_info)
log_add(dump_info)
}
}
# Initialize events for this patient-arm
set.seed(seed * (simulation * 1007 + i * 349))
.set_last_ctx("Error in setup:initiate_evt", sens=input_list$sens,
simulation=input_list$simulation, patient_id=i, arm=arm, .warden_ctx = .warden_ctx)
if (is.null(input_list_arm$init_event_list)) {
# No events defined, add start event at time 0
start_events <- setNames(0, "start")
new_event(start_events, event_queue, i)
} else {
# Generate initial events
evt_list <- do.call("initiate_evt", list(arm, input_list_arm))
if (input_list_pt$debug) {
names_input <- names(evt_list$time_data)
prev_value <- setNames(vector("list", length(names_input)), names_input)
prev_value[names_input] <- evt_list$time_data[names_input]
prev_value["cur_evtlist"] <- list(setNames(rep(Inf, length(input_list_arm$init_event_list[[1]]$evts)),
input_list_arm$init_event_list[[1]]$evts))
dump_info <- list(
list(
prev_value = prev_value,
cur_value = c(evt_list[["time_data"]], evt_list["cur_evtlist"])
)
)
names(dump_info) <- paste0("Analysis: ", input_list_arm$sens, " ", input_list_arm$sens_name_used,
"; Sim: ", input_list_arm$simulation,
"; Patient: ", input_list_arm$i,
"; Arm: ", arm,
"; Initialize Time to Events for Patient-Arm")
temp_log_pt <- c(temp_log_pt, dump_info)
log_add(dump_info)
}
# Load time data into patient-arm environment
list2env(as.list(evt_list$time_data), input_list_arm)
# Add events to the shared event queue
if (length(evt_list$cur_evtlist) > 0) {
new_event(evt_list$cur_evtlist, event_queue, i)
}
}
# Initialize output list for this patient
input_list_arm$curtime <- 0
# Set up accumulators if using backwards accumulation
if (input_list$accum_backwards) {
# Initialize ongoing_list_temp for backwards accumulation
if (!is.null(input_list_arm$uc_lists$ongoing_inputs)) {
input_list_arm$ongoing_list_temp <- input_list$ongoing_list_temp
}
}
input_list_arm$n_evt <- 0
# Store patient-arm environment
patient_arm_environments[[i]] <- input_list_arm
}
# 3 Process all events for this arm in chronological order -------------
n_evt <- 0
.set_last_ctx("Error in start:event processing", sens=input_list$sens,
simulation=input_list$simulation, .warden_ctx = .warden_ctx)
# Process events while queue is not empty
while (!queue_empty(event_queue)) {
if(is.infinite(next_event(1,event_queue)$time)){
break
}
# Get next event
next_evt <- pop_and_return_event(event_queue)
current_patient_id <- next_evt$patient_id
current_event <- next_evt$event_name
current_time <- next_evt$time
.set_last_ctx("Error in engine:event reaction", sens=input_list$sens,
simulation=input_list$simulation, patient_id=current_patient_id,
arm=arm, event=current_event, time=current_time, .warden_ctx = .warden_ctx)
# Get the appropriate patient-arm environment
input_list_arm <- patient_arm_environments[[current_patient_id]]
# Calculate prevtime correctly (current curtime becomes prevtime)
current_prevtime <- input_list_arm$curtime
if(is.infinite(current_prevtime)){
if(!queue_empty(event_queue)){next}else{break}
}
# Update the event queue reference for new_event, modify_event, etc.
# Note that we are only assigning the pointer, so any changes to input_list_arm$cur_evtlist
# will equally affect event_queue
assign("cur_evtlist", event_queue, envir = input_list_arm)
input_list_arm$n_evt <- input_list_arm$n_evt + 1
input_list_arm <- react_evt(list(evt = current_event,
curtime = current_prevtime,
evttime = current_time),
arm,
input_list_arm)
patient_arm_environments[[current_patient_id]] <- input_list_arm
#Get extra objects to be exported
if(is.null(input_out_v)){
extra_data <- list()
} else{
extra_data <- mget(input_out_v, input_list_arm)
}
extra_data <- extra_data[!vapply(extra_data, is.null, TRUE)]
patdata[[current_patient_id]][[arm]]$evtlist[[input_list_arm$n_evt]] <- c(evtname = current_event,
evttime = current_time,
pat_id = current_patient_id,
arm = arm,
extra_data
)
if(debug) {
if (!is.null(input_list_arm$log_list)) temp_log_pt <- c(temp_log_pt,input_list_arm$log_list)
if (!is.null(input_list_arm$log_list)) log_add(input_list_arm$log_list)
}
}
}
# Return patdata in the same structure as run_engine
final_output <- compute_outputs(patdata, input_list)
input_list$log_list <- lapply(temp_log_pt,transform_debug)
# Add debugging information if enabled
if(input_list$debug){
final_output$log_list <- input_list$log_list
}
return(final_output)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.