analysis/sample_pipeline.R

library(drake)
library(sadspace)

expose_imports("sadspace")

sv <- define_statevars(s_range = c(0, 8), n_range = c(0.1, 12.1))
sv_lows <- define_statevars(s_range = c(0, 3.5), n_range = c(0.1, 10.6), logs = TRUE, by = .25)

sv2 <- sv %>%
  dplyr::select(s0) %>%
  dplyr::distinct() %>%
  dplyr::mutate(n0 = ceiling(s0 + max(.01 * s0, 4))) %>%
  dplyr::filter(n0 < 1.5 * s0)

sv <- dplyr::bind_rows(sv, sv2, sv_lows) %>%
  assign_ptable() %>%
  dplyr::filter(p_table != "none") %>%
  dplyr::distinct()

#sv <- sv[1:5, ]

p_table_list <- as.list(as.character(sv$p_table))
p_table_list <- lapply(p_table_list, FUN = rlang::sym)

max_draws <- 5000
ndraws <- 5000
set.seed(1977)
sample_plan <- drake_plan(
  wide = target(readRDS(here::here("analysis", "masterp_wide.Rds"))),
  tall = target(readRDS(here::here("analysis", "masterp_tall.Rds"))),
  mamm = target(readRDS(here::here("analysis", "masterp_mamm.Rds"))),
  fs = target(sample_fs(s, n, nsamples = !!max_draws, p_table),
              transform = map(s = !!sv$s0,
                              n = !!sv$n0,
                              p_table =!!p_table_list,
                              .id = c(s, n))),
  di = target(fs_di(fs),
              transform = map(fs)),
  diffs = target(rep_diff_sampler(fs, ndraws = !!ndraws),
                 transform = map(fs))
)


## Set up the cache and config
db <- DBI::dbConnect(RSQLite::SQLite(), here::here("analysis", "drake", "drake-cache.sqlite"))
cache <- storr::storr_dbi("datatable", "keystable", db)

## View the graph of the plan
if (interactive())
{
  config <- drake_config(sample_plan, cache = cache)
  sankey_drake_graph(config, build_times = "none")  # requires "networkD3" package
  vis_drake_graph(config, build_times = "none")     # requires "visNetwork" package
}

## Run the pipeline
nodename <- Sys.info()["nodename"]
if(grepl("ufhpc", nodename)) {
  print("I know I am on the HiPerGator!")
  library(clustermq)
  options(clustermq.scheduler = "slurm", clustermq.template = "slurm_clustermq.tmpl")
  ## Run the pipeline parallelized for HiPerGator
  make(sample_plan,
       force = TRUE,
       cache = cache,
       cache_log_file = here::here("analysis", "drake", "cache_log.txt"),
       verbose = 2,
       parallelism = "clustermq",
       jobs = 50,
       caching = "master", memory_strategy = "autoclean") # Important for DBI caches!
} else {
  library(clustermq)
  options(clustermq.scheduler = "multicore")
  # Run the pipeline on multiple local cores
  system.time(make(sample_plan, cache = cache, cache_log_file = here::here("analysis", "drake", "cache_log.txt"), parallelism = "clustermq", jobs = 2))
}
#
# system.time(make(sample_plan, cache = cache, cache_log_file = here::here("analysis", "drake", "cache_log.txt")))

dis <- sample_plan %>%
  dplyr::filter(substr(target, 0, 3) == "di_") %>%
  dplyr::select(target) %>%
  as.matrix() %>%
  as.list()

di_dfs <- lapply(dis, FUN = function(di_name) return(readd(di_name, cache = cache, character_only = T)))

di_dfs <- dplyr::bind_rows(di_dfs) %>%
  dplyr::mutate(max_draws = max_draws)


fs_size_dat <- di_dfs %>%
  dplyr::select(s0, n0, nparts, nunique, max_draws) %>%
  dplyr::distinct() %>%
  dplyr::mutate(log_nparts = log(as.numeric(nparts)),
                log_nunique = log(nunique))

di_summary_df <- di_dfs %>%
  dplyr::group_by(s0, n0, nparts, nunique, max_draws) %>%
  dplyr::summarize(mean_skew = mean(skew, na.rm = T),
                   sd_skew = sd(skew, na.rm = T),
                   range_skew = max(skew, na.rm = T) - min(skew, na.rm = T),
                   lowerq_skew = quantile(skew, na.rm = T, probs = .025),
                   upperq_skew = quantile(skew, na.rm = T, probs = .975),
                   median_skew = median(skew, na.rm = T),
                   median_simpson = median(simpson, na.rm = T),
                   mean_simpson = mean(simpson, na.rm = T),
                   sd_simpson = sd(simpson, na.rm = T),
                   lowerq_simpson = quantile(simpson, na.rm = T, probs = .025),
                   upperq_simpson = quantile(simpson, na.rm = T, probs = .975),
                   range_simpson = max(simpson, na.rm = T) - min(simpson, na.rm = T)) %>%
  dplyr::ungroup() %>%
  dplyr::mutate(log_nunique = log(nunique),
                log_nparts = log(as.numeric(nparts)))


diffs <- sample_plan %>%
  dplyr::filter(substr(target, 0, 3) == "dif") %>%
  dplyr::select(target) %>%
  as.matrix() %>%
  as.list()

diff_dfs <- lapply(diffs, FUN = function(di_name) return(readd(di_name, cache = cache, character_only = T)))

diff_dfs <- dplyr::bind_rows(diff_dfs)

write.csv(di_dfs, here::here("analysis", "di_dfs.csv"), row.names = F)
write.csv(diff_dfs, here::here("analysis", "diff_dfs.csv"), row.names = F)
write.csv(fs_size_dat, here::here("analysis", "fs_size_dat.csv"), row.names = F)
write.csv(di_summary_df, here::here("analysis", "di_summary_df.csv"), row.names = F)
DBI::dbDisconnect(db)
rm(cache)
print("Completed OK")
diazrenata/sadspace documentation built on April 26, 2020, 7:01 p.m.