# * Author: Bangyou Zheng (Bangyou.Zheng@csiro.au)
# * Created: 04:05 PM Thursday, 13 June 2013
# * Copyright: AS IS
# *
# Condor functions for APSIM
#' Generate the sim prefix with md5
#'
#' @param factors A data.frame for factors
#' @export
simPrefix <- function(factors)
{
library(digest)
v_names <- names(factors)
sim_title <- as.character(apply(factors, 1, FUN = function(x)
{
x <- gsub(' ', '', x)
# x <- gsub('/', '_', x)
x <- paste(paste(v_names, x, sep = '='), collapse = ',')
return(x)
}))
prefix <- unlist(lapply(sim_title, digest, algo = 'md5'))
return(prefix = prefix)
}
#' Compress the input files
#'
#' @param input list of files and folders
#' @param output The file of output
#' @param sevenz The path to 7z.exe
#' @export
compressInputs <- function(input, output,
sevenz = 'C:\\Program Files\\7-Zip\\7z.exe')
{
cmd <- sprintf('"%s" a -t7z -m0=LZMA -mmt=on -mx9 -md=64m -mfb=64 -ms=4g -sccUTF-8 -sfx "-w%s" "%s" %s',
sevenz,
dirname(output),
output,
paste(paste0('"', input, '"'), collapse = ' '))
system(cmd)
}
#' Preparing task table for APSIM simulations
#'
#' @param project The project name in the ClusterRun
#' @param factors The factors list to generate simulations
#' @param sharedrive sharedrive
#' @param local local
#' @param r_code r_code
#' @param par_factor The parallel factors to write into database of ClusterRun
#' @param input_factor The factors to indicate input files
#' @param skip_factor The skip factors to be processed in the condor clients
#' @param r_script The path to Rscript.exe file
#' @param folder_prefix Output folder to store the out files of all simulations
#' @param ids The starting id of simulations
#' @export
clusterRunPreparingTask <- function(
project,
factors,
sharedrive,
local,
par_factor,
input_factor,
skip_factor = NULL,
folder_prefix = NULL,
r_script = 'R\\bin\\i386\\Rscript.exe',
r_code = 'RCode.R',
ids = 0, save_factor = FALSE)
{
library(digest)
if (ids == 0)
{
# Clean all tasks in the condor system
library(RClusterRun)
con <- connectTaskDB()
cleanTasks(con, project)
disconnectTaskDB(con)
}
# Generate factors for several targets
c_factor <- factors
if (!is.null(skip_factor))
{
c_factor <- c_factor[!(names(c_factor) %in% skip_factor)]
}
c_par_factor <- c_factor[(names(c_factor) %in% par_factor)]
c_par_factor <- expand.grid(c_par_factor, stringsAsFactors = FALSE)
c_com_factor <- expand.grid(
c_factor[!(names(c_factor) %in% par_factor)],
stringsAsFactors = FALSE)
updateTask <- function(tasks) {
n <- 100000
count <- ceiling(nrow(tasks) / n)
for (k in seq_len(count)) {
start_pos <- (k - 1) * n + 1
end_pos <- min(k * n, nrow(tasks))
con <- connectTaskDB()
addTasks(con, project, tasks[seq(start_pos, end_pos),])
disconnectTaskDB(con)
}
}
# Generate the task for all simulations
remaining_mysqls <- NULL
for (i in seq(length = nrow(c_par_factor)))
{
i_factors <- c_com_factor
for (j in seq(along = par_factor))
{
i_factors[[par_factor[j]]] <- c_par_factor[[i, par_factor[j]]]
}
v_names <- names(i_factors)
sim_title <- as.character(apply(i_factors, 1, FUN = function(x)
{
x <- gsub(' ', '', x)
# x <- gsub('/', '_', x)
x <- paste(paste(v_names, x, sep = '='), collapse = ',')
return(x)
}))
file_prefix <- unlist(lapply(sim_title, digest, algo = 'md5'))
mysql <- as.data.frame(matrix(NA, nrow = nrow(i_factors), ncol = 0))
mysql$id <- seq(along = file_prefix) + ids
ids <- max(mysql$id)
if (is.null(input_factor) && length(input_factor) == 0) {
mysql$inputs <- ""
} else {
input_name <- i_factors[,input_factor]
if (length(input_factor) > 1)
{
input_name <- apply(i_factors[,input_factor], 1, paste, collapse = '_')
}
mysql$inputs <- sprintf('%s\\%s\\Input\\Sims\\%s.Rds', sharedrive, project, input_name)
}
mysql$commands <- paste(r_script, ' ', r_code, ' ',
sim_title, ' ', file_prefix, ' 1>nul 2>nul', sep = '')
if (is.null(folder_prefix))
{
i_folder_output <- paste(c_par_factor[i,], collapse = '_')
} else
{
i_folder_output <- paste0(folder_prefix, '/',
paste(c_par_factor[i,], collapse = '_'))
}
output_dir <- sprintf('%s\\%s\\Output\\%s', sharedrive, project, i_folder_output)
mysql$outputs <- paste(output_dir, '\\',
file_prefix, '.Rds', sep = '')
remaining_mysqls <- bind_rows(remaining_mysqls, mysql)
if (nrow(remaining_mysqls) > 100000) {
updateTask(remaining_mysqls)
remaining_mysqls <- NULL
}
# con <- connectTaskDB()
# addTasks(con, project, mysql)
# disconnectTaskDB(con)
#
if (save_factor) {
output_files <- i_factors
output_files$prefix <- file_prefix
output_files$outputs <- paste0(i_folder_output, '/', file_prefix, '.Rds')
file_path <- file.path(local, project, 'Factor',
sprintf('%s.RData', i_folder_output))
if (!file.exists(dirname(file_path))) {
dir.create(dirname(file_path), recursive = TRUE)
}
save(output_files, mysql, file = file_path, compress = TRUE)
}
}
if (!is.null(remaining_mysqls)) {
updateTask(remaining_mysqls)
}
# Create output folder
folder_outputs <- apply(c_par_factor, 1, paste, collapse = '_')
if (!is.null(folder_prefix))
{
folder_outputs <- paste0(folder_prefix, '/', folder_outputs)
}
lapply(paste0(local, '/', project, '/Output/',
folder_outputs), dir.create, showWarnings = FALSE, recursive = TRUE)
ids
}
#' Post-processing APSIM outputs into netcdf files
#'
#' @param filename The filename of netcdf file
#' @param years The years in the APSIM and netcdf
#' @param factors The factors in the netcdf and APSIM
#' @param traits The vector of traits write into netcdf file
#' @param par_factor The factors will be parallel processed
#' @param base The base folder to store APSIM output files
#' @param factor_files The data.frame or vector of files to specify the output file names and levels
#' @param base_sub The sub-folder to store the single output files
#' @param merge_out_nc Whether merge the parallel outputs into an array when updating netcdf files.
#' @param yearvar Variable name of yeas in the APSIM output file
#' @param cpu_num The cpu number of parallel
#' @param tmp_folder The tmp folder to store merge APSIM output files
#' @export
apsimOut2Nc <- function(filename, years, factors, traits, par_factor,
base, factor_files,
base_sub = NULL,
merge_out_nc = TRUE,
yearvar = 'year',
cpu_num = 14,
tmp_folder = file.path('Results', 'tmpNc')
)
{
library(rapsim)
library(ncdf4cf)
library(ncdf4)
# print('Create a empty netcdf file.')
ncCreateApsim(filename, years, factors, traits)
par_factor <- names(factors[(names(factors) %in% par_factor)])
#print('Create the temp folder to store the merge RData files of APSIM.')
# Check whether tmp folder existing
if (!file.exists(tmp_folder))
{
dir.create(tmp_folder)
}
mergeApsimOut <- function(i, par_grid, filename,
factors, years, traits, factor_files, base,
yearvar, tmp_folder, base_sub)
{
output <- file.path(tmp_folder,
paste0(paste(par_grid[i,], collapse = '_'), '.RData'))
if (!file.exists(output))
{
library(rapsim)
library(ncdf4)
library(digest)
if (is.character(factor_files))
{
if (length(factor_files) > 1)
{
vars <- load(factor_files[grep(paste(par_grid[i,], collapse = '_'), factor_files)])
} else
{
load(factor_files)
}
}
pos <- rep(TRUE, nrow(output_files))
par_f_names <- names(par_grid)
for (j in seq(along = par_f_names))
{
pos <- pos & output_files[[par_f_names[j]]] %in% par_grid[i,j]
}
if (!is.null(output_files$outputs))
{
files <- paste0(base, '/Output/',
output_files$outputs)
} else
{
if (is.null(base_sub))
{
files <- paste0(base, '/Output/',
paste(par_grid[i,], collapse = '_'), '/',
output_files$prefix[pos], '.RData')
} else
{
files <- paste0(base, '/Output/',
base_sub, '/',
output_files$prefix[pos], '.RData')
}
}
# sum(!file.exists(files))
nc <- nc_open(filename)
args <- c(list(nc), as.list(unlist(par_grid[i,])))
names(args) <- c('nc', names(par_grid))
start <- do.call(ncGetStart, args)
dims <- c(length(years), as.numeric(unlist(lapply(factors, length))))
dims[match(names(par_grid), names(factors)) + 1] <- 1
res.array <- array(NA, dim = dims)
outputs <- ncReadApsimOut(nc, files, start,
res.array,
yearvar = yearvar,
traits = traits)
save(outputs, file = output, compress = TRUE)
nc_close(nc)
return(invisible())
}
}
#print('Merge the single APSIM output files into a bigger RData files.')
# par_factor <- names(factors)[names(factors) %in% par_factor]
par_grid <- expand.grid(factors[par_factor], stringsAsFactors = FALSE)
library(snowfall)
sfInit(parallel = TRUE, cpus = cpu_num)
sfLapply(seq(length = nrow(par_grid)),
mergeApsimOut, par_grid, filename,
factors, years, traits, factor_files, base,
yearvar, tmp_folder, base_sub)
sfStop()
#print('Write RData files into netcdf file.')
if (merge_out_nc)
{
dims <- c(length(years), as.numeric(unlist(lapply(factors, length))))
res.array <- array(NA, dim = dims)
s_outputs <- list()
s_outputs$start <- rep(1, length(factors) + 1)
s_outputs$count <- dims
for (j in seq(along = traits))
{
s_outputs[[traits[j]]] <- res.array
}
for (i in seq(nrow(par_grid)))
{
load(file = file.path(tmp_folder,
paste0(paste(par_grid[i,], collapse = '_'), '.RData')))
for (k in seq(along = traits))
{
FUN <- function(x) NULL
empty_arg <- formals(FUN)
dim_list <- replicate(length(dims), unname(empty_arg))
for (j in seq(ncol(par_grid)))
{
pos <- match(names(par_grid[j]), names(factors)) + 1
dim_list[pos] <- outputs$start[pos]
}
s_outputs[[traits[k]]] <- do.call(`[<-`, c(list(s_outputs[[traits[k]]]), dim_list, list(outputs[[traits[k]]])))
}
}
nc <- nc_open(filename, write=TRUE)
ncUpdateApsim(nc, outputs = s_outputs)
# nc_sync(nc)
nc_close(nc)
} else
{
nc <- nc_open(filename, write=TRUE)
par_grid <- expand.grid(factors[par_factor], stringsAsFactors = FALSE)
for (i in seq(nrow(par_grid)))
{
load(file = file.path(tmp_folder,
paste0(paste(par_grid[i,], collapse = '_'), '.RData')))
ncUpdateApsim(nc, outputs = outputs)
nc_sync(nc)
}
nc_close(nc)
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.