batchtools/workflow-from.raw.data.R

library(tidyverse)
library(magrittr)
library(phyloseq)

# setup
pkg.dir <- '~/Dropbox/_phd/projects/ansimo/ansimo/'
# pkg.dir <- '/fast/AG_Forslund/Morgan/ansimo/'
devtools::load_all(pkg.dir)
file.dir <- '~/Dropbox/_phd/projects/ansimo/raw-data/metacardis/'
# file.dir <- '/fast/AG_Forslund/Morgan/'

# load the species-level metacardis count data
# unrarefied library size range: 7042253521 317791411043
species.raw <- file.dir %>%
  paste0('hub.cellcount.motu.Species.mgsamples.v2.tsv') %>%
  read_tsv() %>%
  rename(SampleID = 'MGSampleID') %>%
  mutate_at(vars(SampleID), ~ stringr::str_split(., '_') %>% map_chr(~ tail(., n = 1))) %>%
  rename_all(~ str_replace_all(., '_', ' ')) %>%
  rename_all(~ str_trim(., 'left'))

species.otu <- species.raw %>%
  pivot_longer(cols = -SampleID, names_to = 'feature', values_to = 'abundance') %>%
  pivot_wider(names_from = 'SampleID', values_from = 'abundance') %>%
  column_to_rownames(var = 'feature') %>%
  phyloseq::otu_table(., taxa_are_rows = TRUE)

# carried out on the cluster
species.rar <- species.otu %>%
  phyloseq::phyloseq() %>%
  phyloseq::rarefy_even_depth(., sample.size = 1000000) %>%
  as.data.frame() %>%
  rownames_to_column(var = 'SampleID') %>%
  as_tibble() %>%
  rename(feature = 'SampleID') %>%
  gather('SampleID','ab',-feature) %>%
  spread('feature','ab')
  # -> mc_species_rar.rds in proc-data/

species <- '~/Dropbox/_phd/projects/ansimo/proc-data/mc_species_rar.rds' %>%
  readRDS() %>%
  prevalence.filter(0.05) %>%
  remove.zero.samples()

# selected metadata variables for toy analysis/first benchmark
meta.select <- c('DIABSTATUS_C','COROSTATUS_CC','HTASTATUS_C',
                 'NRDIABTT_C','NRHTATT_C','NRLIPIDTT_C',
                 'METFORMIN_C','STATINE_C','INSULIN_C')

# rename, recode Yes/No to 1/0 then to numeric rather than factor
meta <- file.dir %>%
  paste0('hub.metadata.samples.v10.tsv') %>%
  read_tsv() %>%
  select(c('SampleID', meta.select)) %>%
  rename_all(~ stringr::str_split(., '_') %>% map_chr(~ head(., n = 1))) %>%
  # mutate_at(c(2:5,10:12), ~ fct_recode(., `1` = 'Yes', `0` = 'No')) %>%
  mutate_at(c(2:4,8:10), ~ fct_recode(., `1` = 'Yes', `0` = 'No')) %>%
  mutate_if(is.factor, ~ as.numeric(as.character(.)))

meta.binary <- meta %>%
  select(c('SampleID', map_chr(str_split(meta.select, '_'), ~ head(., n = 1)))) %>%
  # select(-c('METSYNDROM','NRABCURES')) %>%
  summarize_all(~ length(unique(.))) %>%
  gather('var','nlevels') %>%
  filter(nlevels == 2 | var == 'SampleID') %>%
  use_series(var) %>%
  select(meta, .)

# create nested data frame for simulations
feat.stats <- species %>%
  feat.summary.stats() %>%
  join.selected.metadata(meta.binary) %>%
  ungroup() %>%
  # this removes the actual stat calculation
  select(c('feature','data'))

## MAIN SCRIPT: METACARDIS -------------------------------------------------------------------------------

# simulation/pkg settings
dags <- c('null','confounder','biomarker')
types <- c('bootstrap','nbinom','gaussian')
parallel <- 'multicore'
# status.var <- 'DIABSTATUS'
y.var <- 'DIABSTATUS'
z.var <- 'METFORMIN'
replicates <- 3
sample.size <- 1982
n <- 10

# turn metadata into a string to pass to batchtools
meta.str <- ansimo::characterize.metadata(meta.binary)

# transform into list
ansimo.feat.stats <- meta.binary %>%
  select(-SampleID) %>%
  colnames() %>%
  set_names(., .) %>%
  purrr::imap(~ extract.metadata.abundances(feat.stats, .y, y.var)) %>%
  extract(-1)

# preserve: ground truth
# simulate: null, confounded, biomarker
# each static df needs list-column with sample.size x 3 tibbles (abundance, y.var, z.var)

### MAKE REGISTRY
print(getwd())
reg <- batchtools::makeExperimentRegistry(file.dir = 'registry', work.dir = getwd(),
                                          conf.file = paste0(pkg.dir, 'batchtools/',
                                                             'batchtools.conf.R'),
                                          make.default = TRUE)

### WRAPPER METHODS
# dynamic problem functions
simulate.data <- function(data, job, type, n, y.var, z.var, parallel, ...) {
  ansimo::batchtools.simulate.dynamic(data, dag, type, y.var, z.var,
                                      n, sample.size, parallel)}
preserve.ground.truth <- function(data, job, ...) {data}

# algorithm function to run metadeconfoundR
calculate <- function(data, job, instance, y.var, z.var, meta.str, ...) {

  meta.mat <- meta.str %>%
    reconstruct.metadata() %>%
    dplyr::arrange(SampleID) %>%
    tibble::column_to_rownames(SampleID)

  feat.mat <- instance %>%
    ansimo::reconstruct.abundance.tbl(y.var, z.var) %>%
    dplyr::select(-c(y.var, z.var)) %>%
    tibble::add_column(SampleID = rownames(meta.mat)) %>%
    dplyr::arrange(SampleID) %>%
    tibble::column_to_rownames(SampleID)

  cores <- future::availableCores() - 1
  till.list <- metadeconfoundR::MetaDeconfound(feat.mat,
                                               meta.mat,
                                               nnodes = cores,
                                               robustCutoff = 0) %>%
    purrr::map(function(x) {
      x %>%
        as.data.frame() %>%
        tibble::rownames_to_column(var = 'feature') %>%
        tibble::as_tibble()})

}


### PROBLEM + ALGORITHM PARAMETERS

# dynamic function [simulate] parameters
prob.sim.params <- ansimo.feat.stats %>%
  names() %>%
  purrr::map(function(x) {
    data.table::CJ(dag = dags,
                   type = types,
                   y.var = y.var,
                   z.var = x,
                   n = NA,
                   sample.size = sample.size,
                   parallel = parallel)}) %>%
  magrittr::set_names(paste0(names(ansimo.feat.stats),'.simulate'))

# dynamic function [preserve strata] parameters
prob.groundtruth.params <- ansimo.feat.stats %>%
  names() %>%
  purrr::map(function(x) {
    data.table::data.table()
    # data.table::CJ(y.var = y.var,
    #                z.var = x)
    }) %>%
  magrittr::set_names(paste0(names(ansimo.feat.stats), '.groundtruth'))

# algorithm parameters
alg.params <- list(
  metadeconfoundR = data.table::CJ(status.var = status.var,
                                   meta.str = meta.str))


### ADD PROBLEMS, ALGORITHMS, THEN EXPERIMENTS
# algorithm for simulated + preserved data
batchtools::addAlgorithm('metadeconfoundR', fun = calculate)

# add problems
purrr::iwalk(ansimo.feat.stats, function(.x, .y) {

  batchtools::addProblem(name = paste0(.y, '.groundtruth'),
                         data = .x,
                         fun = preserve.ground.truth)
  batchtools::addProblem(name = paste0(.y, '.simulate'),
                         data = .x,
                         fun = simulate.data)
})

# replicate simulation problems
batchtools::addExperiments(prob.designs = prob.sim.params,
                           algo.designs = alg.params,
                           repls = replicates)

# preserve ground truth problems - run once
batchtools::addExperiments(prob.designs = prob.groundtruth.params,
                           algo.designs = alg.params)

### TESTING
batchtools::summarizeExperiments()

# test
jobs <- batchtools::getJobPars() %>%
  batchtools::unwrap() %>%
  tibble::as_tibble() %>%
  dplyr::group_by(problem, type) %>%
  tidyr::nest() %>%
  dplyr::mutate(data = purrr::map(data, ~ as_tibble(.))) %>%
  dplyr::mutate(job.id = purrr::map_int(data, function(x) {
    x %>%
      magrittr::use_series(job.id) %>%
      purrr::pluck(1)}))

# function to apply on returned results
reduce.results <- function(result.list) {
  result.list %>%
    purrr::map(~ mutate_if(., ~ !is.character(.), ~ as.character(.))) %>%
    dplyr::bind_rows(.id = 'id') %>%
    tidyr::gather(key = 's.var', value = 'value', -c(id, feature)) %>%
    tidyr::spread(key = 'id', value = 'value') %>%
    dplyr::mutate_at(c('Ds','Ps','Qs'), ~ as.double(.))
}

### SUBMIT [TEST] JOBS
batchtools::submitJobs(jobs$job.id)
status <- batchtools::waitForJobs(jobs$job.id)

### COLLECT RESULTS
if (status) {
  results <- batchtools::findDone() %>%
    magrittr::use_series(job.id) %>%
    batchtools::reduceResultsList(reduce.results) %>%
    magrittr::set_names(batchtools::findDone()$job.id) %>%
    dplyr::bind_rows(.id = 'id') %>%
    dplyr::mutate(id = as.integer(id)) %>%
    dplyr::mutate_if(~ ((is.character(.) & (length(unique(.)) < 6))),
                     ~ as.factor(.))

  saveme <- dplyr::nest_join(jobs, results, by = c('job.id' = 'id'))

  saveRDS(saveme, file = '/fast/AG_Forslund/Morgan/ansimo/proc-data/test.jobs.rds')}

## follow up with issues from test run ^
sxmorgan/ansimo documentation built on June 26, 2020, 7:59 p.m.