batchtools/workflow-from.feat.stats.R

# setup
pkg.dir <- '~/Dropbox/_phd/projects/ansimo/ansimo/'
pkg.dir <- '/fast/AG_Forslund/Morgan/ansimo/ansimo'
main.work.dir <- paste0(pkg.dir,'run/')
devtools::load_all(pkg.dir)

# load the data
feat.stats <- readRDS('feat.stats.rds')

# simulation settings - eventually grep these from main input somehow...?
algs <- c('bootstrap','gaussian','nbinom')
parallel <- 'multicore'
status.var <- 'DIABSTATUS'
replicates <- 3

# check that all metadata of interest are factors (can't simulate otherwise)
meta.vars <- ansimo::get.metadata.vars(feat.stats)
cat('Identified the following metadata variables:\n', names(meta.vars),'\n')
meta.vars <- ansimo::prune.metadata.vars(meta.vars)
cat('The following metadata variables are not factors and will be excluded from this analysis:\n',
    names(meta.vars$discard),'\n')

# create metadata table
meta.tbl <- feat.stats %>%
  magrittr::use_series(data) %>%
  magrittr::extract2(1) %>%
  dplyr::select(-c('abundance', 'SampleID', names(meta.vars$discard)))

# turn it into a string to pass to batchtools
meta.str <- ansimo::characterize.metadata(meta.tbl)

# transform into list
feat.stats <- meta.vars %>%
  magrittr::use_series(keep) %>%
  purrr::imap(~ extract.metadata.abundances(feat.stats, .y)) %>%
  magrittr::inset2('ORIGINAL',
                   (.) %>%
                     magrittr::extract2(1) %>%
                     dplyr::mutate(data = purrr::map(data, ~ dplyr::select(., 'abundance'))))

### MAKE REGISTRY
print(getwd())
reg <- batchtools::makeExperimentRegistry(file.dir = 'registry', work.dir = getwd(),
                                          conf.file = paste0(pkg.dir, 'batchtools/',
                                                             'batchtools.conf.R'),
                                          make.default = TRUE)

### WRAPPER METHODS
# dynamic problem functions
simulate.data <- function(data, job, type, n, s.var, parallel, ...) {
  ansimo::batchtools.simulate.dynamic(data, type, n, s.var, parallel)}
preserve.strata <- function(data, job, ...) {data}

# algorithm function
calculate <- function(data, job, instance, status.var, meta.str, ...) {

  labels.present <- ifelse(check.tbl.dim(instance) == 1,
                           FALSE,
                           TRUE)
  if (labels.present) {
    s.var <- instance %>%
      magrittr::use_series(data) %>%
      purrr::pluck(1) %>%
      select(-abundance) %>%
      colnames()
    reformatted <- instance %>%
      ansimo::reconstruct.abundance.tbl(s.var) %>%
      ansimo::prepare.labeled.tbl.mdc(s.var, meta.str, status.var)
  }

  else {
    reformatted <- instance %>%
      ansimo::reconstruct.abundance.tbl(s.var = NA) %>%
      ansimo::prepare.unlabeled.tbl.mdc(meta.str, status.var)
  }

  cores <- future::availableCores() - 1
  till.list <- metadeconfoundR::MetaDeconfound(reformatted$feat.abundances,
                                               reformatted$metadata,
                                               nnodes = cores,
                                               robustCutoff = 0) %>%
    purrr::map(function(x) {
      x %>%
        as.data.frame() %>%
        tibble::rownames_to_column(var = 'feature') %>%
        tibble::as_tibble()})

}


### PROBLEM + ALGORITHM PARAMETERS

# dynamic function [simulate] parameters
prob.sim.params <- feat.stats %>%
  names() %>%
  paste0(rep('.simulate', length(feat.stats))) %>%
  purrr::map(function(x) {
    tmp <- x %>%
      stringr::str_split('[.]') %>%
      purrr::pluck(1) %>%
      purrr::pluck(1)
    if (tmp == 'ORIGINAL') data.table::CJ(type = algs,
                                          n = NA,
                                          s.var = NA,
                                          parallel = parallel)
    else data.table::CJ(type = algs,
                        n = NA,
                        s.var = tmp,
                        parallel = parallel)}) %>%
  magrittr::set_names(paste0(names(feat.stats),'.simulate'))

# dynamic function [preserve strata] parameters
prob.strata.params <- feat.stats %>%
  names() %>%
  paste0(rep('.strata', length(feat.stats))) %>%
  purrr::map(function(x) {
    data.table::data.table()}) %>%
  magrittr::set_names(paste0(names(feat.stats), '.strata')) %>%
  magrittr::extract(-length(.))

# algorithm parameters
alg.params <- list(
  metadeconfoundR = data.table::CJ(status.var = status.var,
                                   meta.str = meta.str))


### ADD PROBLEMS, ALGORITHMS, THEN EXPERIMENTS
# algorithm for simulated + preserved data
batchtools::addAlgorithm('metadeconfoundR', fun = calculate)

# simulation problems
purrr::iwalk(feat.stats, function(.x, .y) {

  # simulate without stratification
  if (.y == 'ORIGINAL') s.var <- NA

  batchtools::addProblem(name = paste0(.y, '.simulate'),
                         data = .x,
                         fun = simulate.data)
})

# replicate simulation problems
batchtools::addExperiments(prob.designs = prob.sim.params,
                           algo.designs = alg.params,
                           repls = replicates)

# preserve strata problems
purrr::iwalk(feat.stats, function(.x, .y) {

  # no need to preserve unlabeled data
  if (.y != 'ORIGINAL') {
    batchtools::addProblem(name = paste0(.y, '.strata'),
                           data = .x,
                           fun = preserve.strata)}
})

# preserve strata problems not replicated! run once
batchtools::addExperiments(prob.designs = prob.strata.params,
                           algo.designs = alg.params)

### TESTING
batchtools::summarizeExperiments()

# test
jobs <- batchtools::getJobPars() %>%
  batchtools::unwrap() %>%
  tibble::as_tibble() %>%
  dplyr::group_by(problem, type) %>%
  tidyr::nest() %>%
  dplyr::mutate(data = purrr::map(data, ~ as_tibble(.))) %>%
  dplyr::mutate(job.id = purrr::map_int(data, function(x) {
    x %>%
      magrittr::use_series(job.id) %>%
      purrr::pluck(1)}))

# function to apply on returned results
reduce.results <- function(result.list) {
  result.list %>%
    purrr::map(~ mutate_if(., ~ !is.character(.), ~ as.character(.))) %>%
    dplyr::bind_rows(.id = 'id') %>%
    tidyr::gather(key = 's.var', value = 'value', -c(id, feature)) %>%
    tidyr::spread(key = 'id', value = 'value') %>%
    dplyr::mutate_at(c('Ds','Ps','Qs'), ~ as.double(.))
}

### SUBMIT [TEST] JOBS
batchtools::submitJobs(jobs$job.id)
status <- batchtools::waitForJobs(jobs$job.id)

### COLLECT RESULTS
if (status) {
  results <- batchtools::findDone() %>%
    magrittr::use_series(job.id) %>%
    batchtools::reduceResultsList(reduce.results) %>%
    magrittr::set_names(batchtools::findDone()$job.id) %>%
    dplyr::bind_rows(.id = 'id') %>%
    dplyr::mutate(id = as.integer(id)) %>%
    dplyr::mutate_if(~ ((is.character(.) & (length(unique(.)) < 6))),
                     ~ as.factor(.))

  saveme <- dplyr::nest_join(jobs, results, by = c('job.id' = 'id'))

  saveRDS(saveme, file = '/fast/AG_Forslund/Morgan/ansimo/proc-data/test.jobs.rds')}

## follow up with issues from test run ^
sxmorgan/ansimo documentation built on June 26, 2020, 7:59 p.m.