anomaliesCalcProcs <- function(GeneralParameters){
GUI <- TRUE
progress <- TRUE
#############
## pour parallel
GeneralParameters <- GeneralParameters
cdtParallelCond <- .cdtData$Config$parallel
intstep <- GeneralParameters$intstep
outstep <- GeneralParameters$outstep
#############
if(GeneralParameters$outdir$update){
don.anom <- try(readRDS(GeneralParameters$outdir$dir), silent = TRUE)
if(inherits(don.anom, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], GeneralParameters$outdir$dir), TRUE, 'e')
return(NULL)
}
if(outstep != don.anom$params$outstep){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['8']], outstep), TRUE, 'e')
return(NULL)
}
## check seasonal outstep (start month and length)
if(GeneralParameters$data.type != don.anom$params$data.type){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['8']], GeneralParameters$data.type), TRUE, 'e')
return(NULL)
}
}else{
if(!dir.exists(GeneralParameters$outdir$dir)){
Insert.Messages.Out(paste(GeneralParameters$outdir$dir, .cdtData$EnvData[['message']][['6']]), TRUE, 'e')
return(NULL)
}
if(GeneralParameters$climato$clim.exist){
don.climato <- try(readRDS(GeneralParameters$climato$clim.file), silent = TRUE)
if(inherits(don.climato, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], GeneralParameters$climato$clim.file), TRUE, "e")
return(NULL)
}
if(outstep != don.climato$params$outstep){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['9']], outstep), TRUE, 'e')
return(NULL)
}
if(GeneralParameters$data.type != don.climato$params$data.type){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['9']], GeneralParameters$data.type), TRUE, 'e')
return(NULL)
}
}else{
allyears <- GeneralParameters$climato$all.years
year1 <- GeneralParameters$climato$start.year
year2 <- GeneralParameters$climato$end.year
minyear <- GeneralParameters$climato$min.year
xwin <- GeneralParameters$climato$window
if(any(is.na(c(year1, year2, minyear)))){
Insert.Messages.Out(.cdtData$EnvData[['message']][['10']], TRUE, 'e')
return(NULL)
}
}
}
#####################################################
if(GeneralParameters$data.type == "cdtstation"){
don <- getStnOpenData(GeneralParameters$cdtstation$file)
if(is.null(don)) return(NULL)
don <- getCDTdataAndDisplayMsg(don, intstep, GeneralParameters$cdtstation$file)
if(is.null(don)) return(NULL)
miss.val <- getStnOpenDataInfo(GeneralParameters$cdtstation$file)[[3]]$miss.val
daty <- don$dates
# year <- as.numeric(substr(daty, 1, 4))
}
if(GeneralParameters$data.type == "cdtdataset"){
don <- try(readRDS(GeneralParameters$cdtdataset$index), silent = TRUE)
if(inherits(don, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], GeneralParameters$cdtdataset$index), TRUE, 'e')
return(NULL)
}
if(intstep != don$TimeStep){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['11']], intstep), TRUE, 'e')
return(NULL)
}
daty <- don$dateInfo$date
# year <- as.numeric(substr(daty, 1, 4))
}
#####################################################
computeCLIMATO <- FALSE
if(GeneralParameters$data.type == "cdtstation"){
if(GeneralParameters$outdir$update){
outDIR <- dirname(GeneralParameters$outdir$dir)
if(length(don$id) != length(don.anom$data$id)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['13']], TRUE, 'e')
return(NULL)
}
if(any(don$id != don.anom$data$id)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['14']], TRUE, 'e')
return(NULL)
}
mean.file <- file.path(outDIR, 'CDTMEAN', 'CDTMEAN.rds')
sds.file <- file.path(outDIR, 'CDTSTD', 'CDTSTD.rds')
dat.moy <- try(readRDS(mean.file), silent = TRUE)
if(inherits(dat.moy, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], don.anom$mean.file), TRUE, 'e')
return(NULL)
}
if(don.anom$params$anomaly == "Standardized"){
dat.sds <- try(readRDS(sds.file), silent = TRUE)
if(inherits(dat.sds, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], don.anom$sds.file), TRUE, 'e')
return(NULL)
}
}
clim.index.file <- file.path(outDIR, "Climatology.rds")
don.climato <- try(readRDS(clim.index.file), silent = TRUE)
if(inherits(don.climato, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], clim.index.file), TRUE, 'e')
return(NULL)
}
index0 <- NULL
index0$id <- don.climato$index
anomaly.fonct <- don.anom$params$anomaly
datadir <- file.path(outDIR, 'CDTSTATIONS')
}else{
outDIR <- file.path(GeneralParameters$outdir$dir, "ANOMALIES_data")
dir.create(outDIR, showWarnings = FALSE, recursive = TRUE)
if(GeneralParameters$climato$clim.exist){
if(length(don$id) != length(don.climato$data$id)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['15']], TRUE, 'e')
return(NULL)
}
if(any(don$id != don.climato$data$id)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['16']], TRUE, 'e')
return(NULL)
}
dirClim <- dirname(GeneralParameters$climato$clim.file)
mean.file <- file.path(dirClim, 'CDTMEAN', 'CDTMEAN.rds')
sds.file <- file.path(dirClim, 'CDTSTD', 'CDTSTD.rds')
dat.moy <- try(readRDS(mean.file), silent = TRUE)
if(inherits(dat.moy, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], mean.file), TRUE, 'e')
return(NULL)
}
if(GeneralParameters$anomaly == "Standardized"){
dat.sds <- try(readRDS(sds.file), silent = TRUE)
if(inherits(dat.sds, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], sds.file), TRUE, 'e')
return(NULL)
}
}
file.copy(file.path(dirClim, 'CDTMEAN'), outDIR, recursive = TRUE, overwrite = TRUE)
file.copy(file.path(dirClim, 'CDTSTD'), outDIR, recursive = TRUE, overwrite = TRUE)
file.copy(GeneralParameters$climato$clim.file, outDIR, recursive = TRUE, overwrite = TRUE)
index0 <- NULL
index0$id <- don.climato$index
}else computeCLIMATO <- TRUE
anomaly.fonct <- GeneralParameters$anomaly
datadir <- file.path(outDIR, 'CDTSTATIONS')
dir.create(datadir, showWarnings = FALSE, recursive = TRUE)
}
}
##############################
if(GeneralParameters$data.type == "cdtdataset"){
if(GeneralParameters$outdir$update){
outDIR <- dirname(GeneralParameters$outdir$dir)
index.file.moy <- file.path(outDIR, 'CDTMEAN', 'CDTMEAN.rds')
index.file.sds <- file.path(outDIR, 'CDTSTD', 'CDTSTD.rds')
index.file.anomal <- file.path(outDIR, "CDTANOM", "CDTANOM.rds")
index.out <- try(readRDS(index.file.anomal), silent = TRUE)
if(inherits(index.out, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], index.file.anomal), TRUE, 'e')
return(NULL)
}
SP1 <- defSpatialPixels(list(lon = don$coords$mat$x, lat = don$coords$mat$y))
SP2 <- defSpatialPixels(list(lon = index.out$coords$mat$x, lat = index.out$coords$mat$y))
if(is.diffSpatialPixelsObj(SP1, SP2, tol = 1e-04)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['17']], TRUE, 'e')
return(NULL)
}
rm(SP1, SP2)
clim.index.file <- file.path(outDIR, "Climatology.rds")
don.climato <- try(readRDS(clim.index.file), silent = TRUE)
if(inherits(don.climato, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], clim.index.file), TRUE, 'e')
return(NULL)
}
index0 <- NULL
index0$id <- don.climato$index
rm(don.climato)
}else{
outDIR <- file.path(GeneralParameters$outdir$dir, "ANOMALIES_data")
dir.create(outDIR, showWarnings = FALSE, recursive = TRUE)
if(GeneralParameters$climato$clim.exist){
dirClim <- dirname(GeneralParameters$climato$clim.file)
index.file.moy <- file.path(dirClim, 'CDTMEAN', 'CDTMEAN.rds')
index.file.sds <- file.path(dirClim, 'CDTSTD', 'CDTSTD.rds')
index.data.climato <- try(readRDS(index.file.moy), silent = TRUE)
if(inherits(index.data.climato, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], index.file.moy), TRUE, 'e')
return(NULL)
}
SP1 <- defSpatialPixels(list(lon = don$coords$mat$x, lat = don$coords$mat$y))
SP2 <- defSpatialPixels(list(lon = index.data.climato$coords$mat$x, lat = index.data.climato$coords$mat$y))
if(is.diffSpatialPixelsObj(SP1, SP2, tol = 1e-04)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['18']], format = TRUE)
return(NULL)
}
rm(SP1, SP2)
file.copy(file.path(dirClim, 'CDTMEAN'), outDIR, recursive = TRUE, overwrite = TRUE)
file.copy(file.path(dirClim, 'CDTSTD'), outDIR, recursive = TRUE, overwrite = TRUE)
file.copy(GeneralParameters$climato$clim.file, outDIR, recursive = TRUE, overwrite = TRUE)
index0 <- NULL
index0$id <- don.climato$index
rm(index.data.climato)
}else computeCLIMATO <- TRUE
}
}
#####################################################
# Aggrgate
if(intstep != outstep){
Insert.Messages.Out(.cdtData$EnvData[['message']][['19']], TRUE, "i")
idatyAggr <- rep(TRUE, length(daty))
if(GeneralParameters$outdir$update | GeneralParameters$climato$clim.exist){
daty0 <- if(intstep == "monthly") paste0(daty, 1) else daty
daty0 <- as.Date(daty0, "%Y%m%d")
daty.range <- get.range.date.time(GeneralParameters$date.range, intstep)
idatyAggr <- daty0 >= daty.range$start & daty0 <= daty.range$end
daty <- daty[idatyAggr]
}
########################
startMonth <- GeneralParameters$seasonal$start.mon
seasonLength <- GeneralParameters$seasonal$length.mon
aggr.pars <- GeneralParameters$aggr.series
agg.index <- cdt.index.aggregate(daty, intstep, outstep,
seasonLength = seasonLength,
startMonth = startMonth)
if(aggr.pars$min.frac$unique){
ifull <- (agg.index$nba / agg.index$nb0) >= aggr.pars$min.frac$all
}else{
ifull <- sapply(agg.index$nb.mon, function(x){
all(x$nba / x$nb0 >= aggr.pars$min.frac$month[x$mo])
})
}
if(all(!ifull)){
Insert.Messages.Out(.cdtData$EnvData[['message']][['21']], TRUE, 'e')
return(NULL)
}
daty <- agg.index$date
#########################
if(GeneralParameters$data.type == 'cdtstation'){
don$data <- cdt.data.aggregateTS(don$data[idatyAggr, , drop = FALSE], agg.index, aggr.pars)
don$dates <- daty
don$data <- round(don$data, 5)
out.cdt.aggregate <- file.path(datadir, paste0("Aggregated_data_", outstep, ".csv"))
if(GeneralParameters$outdir$update & file.exists(out.cdt.aggregate)){
don0 <- try(utils::read.table(out.cdt.aggregate, sep = ',', na.strings = miss.val,
colClasses = 'character', stringsAsFactors = FALSE), silent = TRUE)
if(inherits(don0, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], out.cdt.aggregate), TRUE, 'e')
return(NULL)
}
capition <- don0[1:3, 1]
don0 <- splitCDTData1(don0)
headers <- do.call(rbind, don0[c('id', 'lon', 'lat')])
i0 <- don0$dates %in% daty
if(any(i0)){
don0$dates <- don0$dates[!i0]
don0$data <- don0$data[!i0, , drop = FALSE]
}
daty <- c(don0$dates, daty)
don$data <- rbind(don0$data, don$data)
o0 <- order(daty)
daty <- daty[o0]
don$data <- don$data[o0, , drop = FALSE]
}else{
headers <- do.call(rbind, don[c('id', 'lon', 'lat')])
capition <- c('Stations', 'LON', paste(toupper(outstep), 'LAT', sep = '/'))
}
entete <- cbind(capition, headers)
cdtdata <- rbind(entete, cbind(daty, don$data))
writeFiles(cdtdata, out.cdt.aggregate, na = miss.val)
rm(cdtdata)
}
if(GeneralParameters$data.type == "cdtdataset"){
dataset.name <- paste0("Aggregated_Data_", outstep)
outputDIR <- file.path(outDIR, dataset.name)
dataDIR <- file.path(outputDIR, "DATA")
dir.create(dataDIR, showWarnings = FALSE, recursive = TRUE)
file.index <- file.path(outputDIR, paste0(dataset.name, ".rds"))
##########
index.agg <- don
index.agg$TimeStep <- outstep
##########
readPrevAggr <- FALSE
if(GeneralParameters$outdir$update & file.exists(file.index)){
don0 <- try(readRDS(file.index), silent = TRUE)
if(inherits(don0, "try-error")){
Insert.Messages.Out(paste(.cdtData$EnvData[['message']][['7']], file.index), TRUE, 'e')
return(NULL)
}
don0Index <- don0$dateInfo$index
i0 <- don0$dateInfo$date %in% daty
if(any(i0)){
don0$dateInfo$date <- don0$dateInfo$date[!i0]
don0$dateInfo$index <- don0$dateInfo$index[!i0]
## order: remove missing index, if exist
don0$dateInfo$index <- order(don0$dateInfo$index)
}
index.agg$dateInfo$date <- c(don0$dateInfo$date, daty)
index.agg$dateInfo$index <- c(don0$dateInfo$index, max(don0$dateInfo$index) + seq_along(daty))
o0 <- order(index.agg$dateInfo$date)
index.agg$dateInfo$date <- index.agg$dateInfo$date[o0]
index.agg$dateInfo$index <- index.agg$dateInfo$index[o0]
daty <- index.agg$dateInfo$date
readPrevAggr <- TRUE
}else{
index.agg$dateInfo$date <- daty
index.agg$dateInfo$index <- seq_along(daty)
}
##########
con <- gzfile(file.index, compression = 7)
open(con, "wb")
saveRDS(index.agg, con)
close(con)
##########
chunkfile <- sort(unique(don$colInfo$index))
chunkcalc <- split(chunkfile, ceiling(chunkfile / don$chunkfac))
##########
do.parChunk <- if(don$chunkfac > length(chunkcalc)) TRUE else FALSE
do.parCALC <- if(do.parChunk) FALSE else TRUE
parsL <- doparallel.cond(do.parCALC & (length(chunkcalc) > 10))
ret <- cdt.foreach(seq_along(chunkcalc), parsL, GUI, progress, FUN = function(jj)
{
don.data <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], GeneralParameters$cdtdataset$index, cdtParallelCond, do.par = do.parChunk)
don.data <- don.data[don$dateInfo$index, , drop = FALSE]
don.data <- don.data[idatyAggr, , drop = FALSE]
cdtdata <- cdt.data.aggregateTS(don.data, agg.index, aggr.pars)
if(readPrevAggr){
don.data0 <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], file.index, cdtParallelCond, do.par = do.parChunk)
don.data0 <- don.data0[don0Index, , drop = FALSE]
don.data0 <- don.data0[!i0, , drop = FALSE]
cdtdata <- rbind(don.data0, cdtdata)
cdtdata <- cdtdata[o0, , drop = FALSE]
}
writeCdtDatasetChunk.sequence(cdtdata, chunkcalc[[jj]], index.agg, dataDIR, cdtParallelCond, do.par = do.parChunk)
rm(don.data, cdtdata); gc()
return(0)
})
don <- index.agg
GeneralParameters$cdtdataset$index <- file.index
}
# intstep <- outstep
Insert.Messages.Out(.cdtData$EnvData[['message']][['20']], TRUE, "s")
}
#####################################################
# climatology
if(computeCLIMATO){
Insert.Messages.Out(.cdtData$EnvData[['message']][['22']], TRUE, "i")
daty0 <- daty
year <- as.numeric(substr(daty0, 1, 4))
iyear <- if(allyears) rep(TRUE, length(year)) else year >= year1 & year <= year2
daty0 <- daty0[iyear]
year <- year[iyear]
if(length(unique(year)) < minyear){
Insert.Messages.Out(.cdtData$EnvData[['message']][['23']], TRUE, 'e')
return(NULL)
}
GeneralParameters$climato$start.year <- min(year)
GeneralParameters$climato$end.year <- max(year)
index <- cdt.index.Climatologies(daty0, outstep, xwin)
index0 <- NULL
index0$id <- index$id
#####################################################
if(GeneralParameters$data.type == "cdtstation"){
don$data <- don$data[iyear, , drop = FALSE]
dat.clim <- .cdt.Climatologies(index, don$data, minyear, outstep, xwin)
dat.moy <- round(dat.clim$mean, 2)
dat.sds <- round(dat.clim$sd, 2)
rm(dat.clim)
#########################################
out.cdt.clim.moy <- file.path(datadir, paste0(outstep, "_Climatology_mean.csv"))
out.cdt.clim.sds <- file.path(datadir, paste0(outstep, "_Climatology_std.csv"))
dataOUT1 <- file.path(outDIR, 'CDTMEAN')
dir.create(dataOUT1, showWarnings = FALSE, recursive = TRUE)
index.file.moy <- file.path(dataOUT1, 'CDTMEAN.rds')
dataOUT2 <- file.path(outDIR, 'CDTSTD')
dir.create(dataOUT2, showWarnings = FALSE, recursive = TRUE)
index.file.sds <- file.path(dataOUT2, 'CDTSTD.rds')
file.index.clim <- file.path(outDIR, "Climatology.rds")
##################
output <- list(params = GeneralParameters,
data = list(id = don$id, lon = don$lon, lat = don$lat),
index = index$id)
out.dat.index <- gzfile(file.index.clim, compression = 7)
saveRDS(output, out.dat.index)
close(out.dat.index)
out.cdt.moy <- gzfile(index.file.moy, compression = 7)
saveRDS(dat.moy, out.cdt.moy)
close(out.cdt.moy)
out.cdt.sds <- gzfile(index.file.sds, compression = 7)
saveRDS(dat.sds, out.cdt.sds)
close(out.cdt.sds)
##################
xhead <- rbind(don$id, don$lon, don$lat)
chead <- c('ID.STN', 'LON', 'INDEX/LAT')
infohead <- cbind(chead, xhead)
dat.moy1 <- rbind(infohead, cbind(index$id, dat.moy))
writeFiles(dat.moy1, out.cdt.clim.moy, na = '-99')
dat.sds1 <- rbind(infohead, cbind(index$id, dat.sds))
writeFiles(dat.sds1, out.cdt.clim.sds, na = '-99')
Insert.Messages.Out(.cdtData$EnvData[['message']][['24']], TRUE, "s")
rm(dat.moy1, dat.sds1)
}
#####################################################
if(GeneralParameters$data.type == "cdtdataset"){
ncdfOUT1 <- file.path(outDIR, 'DATA_NetCDF', 'CDTMEAN')
dir.create(ncdfOUT1, showWarnings = FALSE, recursive = TRUE)
ncdfOUT2 <- file.path(outDIR, 'DATA_NetCDF', 'CDTSTD')
dir.create(ncdfOUT2, showWarnings = FALSE, recursive = TRUE)
datadir1 <- file.path(outDIR, 'CDTMEAN', 'DATA')
dir.create(datadir1, showWarnings = FALSE, recursive = TRUE)
index.file.moy <- file.path(outDIR, 'CDTMEAN', 'CDTMEAN.rds')
datadir2 <- file.path(outDIR, 'CDTSTD', 'DATA')
dir.create(datadir2, showWarnings = FALSE, recursive = TRUE)
index.file.sds <- file.path(outDIR, 'CDTSTD', 'CDTSTD.rds')
file.index.clim <- file.path(outDIR, "Climatology.rds")
##################
output <- list(params = GeneralParameters, index = index$id)
out.dat.index <- gzfile(file.index.clim, compression = 7)
saveRDS(output, out.dat.index)
close(out.dat.index)
##################
index.out <- don
index.out$varInfo$longname <- paste(outstep, "climatology from:", don$varInfo$longname)
index.out$dateInfo$date <- index$id
index.out$dateInfo$index <- seq_along(index$id)
file.index.mean <- gzfile(index.file.moy, compression = 7)
saveRDS(index.out, file.index.mean)
close(file.index.mean)
file.index.sds <- gzfile(index.file.sds, compression = 7)
saveRDS(index.out, file.index.sds)
close(file.index.sds)
#########################################
chunkfile <- sort(unique(don$colInfo$index))
chunkcalc <- split(chunkfile, ceiling(chunkfile / don$chunkfac))
do.parChunk <- if(don$chunkfac > length(chunkcalc)) TRUE else FALSE
do.parCALC <- if(do.parChunk) FALSE else TRUE
parsL <- doparallel.cond(do.parCALC & (length(chunkcalc) > 10))
ret <- cdt.foreach(seq_along(chunkcalc), parsL, GUI = GUI,
progress = progress, FUN = function(jj)
{
don.data <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], GeneralParameters$cdtdataset$index, cdtParallelCond, do.par = do.parChunk)
don.data <- don.data[don$dateInfo$index, , drop = FALSE]
don.data <- don.data[iyear, , drop = FALSE]
dat.clim <- .cdt.Climatologies(index, don.data, minyear, outstep, xwin)
writeCdtDatasetChunk.sequence(dat.clim$mean, chunkcalc[[jj]], index.out, datadir1, cdtParallelCond, do.par = do.parChunk)
writeCdtDatasetChunk.sequence(dat.clim$sd, chunkcalc[[jj]], index.out, datadir2, cdtParallelCond, do.par = do.parChunk)
rm(dat.clim, don.data); gc()
})
Insert.Messages.Out(.cdtData$EnvData[['message']][['24']], TRUE, "s")
##########################################
Insert.Messages.Out(.cdtData$EnvData[['message']][['25']], TRUE, "i")
x <- index.out$coords$mat$x
y <- index.out$coords$mat$y
dx <- ncdf4::ncdim_def("Lon", "degreeE", x)
dy <- ncdf4::ncdim_def("Lat", "degreeN", y)
xy.dim <- list(dx, dy)
nc.grd <- ncdf4::ncvar_def(index.out$varInfo$name, index.out$varInfo$units, xy.dim, -99, index.out$varInfo$longname, "float", compression = 9)
######################
ret <- cdt.foreach(seq_along(index$id), doparallel.cond(FALSE),
GUI = GUI, progress = progress, FUN = function(jj)
{
id <- index$id[jj]
dat.moy <- readCdtDatasetChunk.multi.dates.order(index.file.moy, id, cdtParallelCond, onedate = TRUE)
dat.moy <- dat.moy$z
dat.moy[is.na(dat.moy)] <- -99
filenc <- file.path(ncdfOUT1, paste0("clim_", id, ".nc"))
nc <- ncdf4::nc_create(filenc, nc.grd)
ncdf4::ncvar_put(nc, nc.grd, dat.moy)
ncdf4::nc_close(nc)
dat.sds <- readCdtDatasetChunk.multi.dates.order(index.file.sds, id, cdtParallelCond, onedate = TRUE)
dat.sds <- dat.sds$z
dat.sds[is.na(dat.sds)] <- -99
filenc <- file.path(ncdfOUT2, paste0("clim_", id, ".nc"))
nc <- ncdf4::nc_create(filenc, nc.grd)
ncdf4::ncvar_put(nc, nc.grd, dat.sds)
ncdf4::nc_close(nc)
return(0)
})
Insert.Messages.Out(.cdtData$EnvData[['message']][['26']], TRUE, "s")
rm(index, index.out)
}
}
#####################################################
# anomaly
Insert.Messages.Out(.cdtData$EnvData[['message']][['2']], TRUE, "i")
date.range <- GeneralParameters$date.range
if(outstep == "annual"){
iyearAnom <- daty >= date.range$start.year & daty <= date.range$end.year
datyAnom <- daty[iyearAnom]
}else if(outstep == "seasonal"){
daty.range <- get.range.date.time(date.range, 'monthly')
daty0 <- do.call(c, lapply(strsplit(daty, "_"), '[[', 1))
daty0 <- as.Date(paste0(daty0, '-1'))
iyearAnom <- daty0 >= daty.range$start & daty0 <= daty.range$end
datyAnom <- daty[iyearAnom]
}else{
daty0 <- if(outstep == "monthly") paste0(daty, 1) else daty
daty0 <- as.Date(daty0, "%Y%m%d")
daty.range <- get.range.date.time(date.range, outstep)
iyearAnom <- daty0 >= daty.range$start & daty0 <= daty.range$end
datyAnom <- daty[iyearAnom]
}
index1 <- cdt.index.Anomalies(datyAnom, index0, outstep)
daty <- index1$date
index1 <- index1$index
#####################################################
if(GeneralParameters$data.type == "cdtstation"){
don$data <- don$data[iyearAnom, , drop = FALSE]
data.sds <- if(anomaly.fonct == "Standardized") dat.sds else NULL
anom <- .cdt.Anomalies(index1, don$data, dat.moy, data.sds, anomaly.fonct)
#########################################
if(GeneralParameters$outdir$update){
anom.file.rds <- file.path(outDIR, 'CDTANOM', 'CDTANOM.rds')
anom.file.csv <- file.path(outDIR, 'CDTSTATIONS', paste0(outstep, "_Anomaly.csv"))
data.anom.rds <- readRDS(anom.file.rds)
data.anom.csv <- utils::read.table(anom.file.csv, sep = ",", colClasses = "character", stringsAsFactors = FALSE)
infohead <- data.anom.csv[1:3, , drop = FALSE]
data.anom.csv <- data.anom.csv[-(1:3), -1]
ixold <- match(daty, don.anom$data$dates)
ixold <- ixold[!is.na(ixold)]
if(length(ixold) == 0){
don.anom$data$dates <- c(don.anom$data$dates, daty)
data.anom.rds <- rbind(data.anom.rds, anom)
anom <- as.data.frame(anom)
names(anom) <- names(data.anom.csv)
data.anom.csv <- rbind(data.anom.csv, anom)
}else{
don.anom$data$dates <- c(don.anom$data$dates[-ixold], daty)
data.anom.rds <- rbind(data.anom.rds[-ixold, , drop = FALSE], anom)
anom <- as.data.frame(anom)
names(anom) <- names(data.anom.csv)
data.anom.csv <- rbind(data.anom.csv[-ixold, , drop = FALSE], anom)
}
saveRDS(data.anom.rds, anom.file.rds)
tmp <- data.frame(cbind(don.anom$data$dates, data.anom.csv))
names(tmp) <- names(infohead)
data.anom.csv <- rbind(infohead, tmp)
writeFiles(data.anom.csv, anom.file.csv, na = .cdtData$Config$missval.anom)
saveRDS(don.anom, GeneralParameters$outdir$dir)
.cdtData$EnvData$output <- don.anom
.cdtData$EnvData$PathAnom <- outDIR
rm(tmp, don, anom, data.anom.csv, data.anom.rds, don.anom)
}else{
out.dat.index <- gzfile(file.path(outDIR, "Anomaly.rds"), compression = 7)
out.cdt.anomal <- file.path(datadir, paste0(outstep, "_Anomaly.csv"))
dataOUT3 <- file.path(outDIR, 'CDTANOM')
dir.create(dataOUT3, showWarnings = FALSE, recursive = TRUE)
out.cdt.anom <- gzfile(file.path(dataOUT3, 'CDTANOM.rds'), compression = 7)
#############
saveRDS(anom, out.cdt.anom)
close(out.cdt.anom)
#############
xhead <- rbind(don$id, don$lon, don$lat)
chead <- c('ID.STN', 'LON', paste0(toupper(outstep), "/LAT"))
infohead <- cbind(chead, xhead)
#############
anom <- round(anom, 2)
anom <- rbind(infohead, cbind(daty, anom))
writeFiles(anom, out.cdt.anomal, na = .cdtData$Config$missval.anom)
#############
output <- list(params = GeneralParameters,
data = don[c('id', 'lon', 'lat', 'dates')])
.cdtData$EnvData$output <- output
.cdtData$EnvData$PathAnom <- outDIR
saveRDS(output, out.dat.index)
close(out.dat.index)
rm(output, don, anom)
}
Insert.Messages.Out(.cdtData$EnvData[['message']][['2a']], TRUE, "s")
}
#####################################################
if(GeneralParameters$data.type == "cdtdataset"){
ncdfOUT3 <- file.path(outDIR, 'DATA_NetCDF', 'CDTANOM')
datadir3 <- file.path(outDIR, 'CDTANOM', 'DATA')
index.file.anomal <- file.path(outDIR, 'CDTANOM', 'CDTANOM.rds')
if(GeneralParameters$outdir$update){
anomaly.fonct <- don.anom$params$anomaly
out.anom.index <- gzfile(GeneralParameters$outdir$dir, compression = 7)
ixold <- match(daty, don.anom$dates)
ixold <- ixold[!is.na(ixold)]
if(length(ixold) == 0){
newdaty <- c(index.out$dateInfo$date, daty)
newindex <- c(index.out$dateInfo$index, max(index.out$dateInfo$index) + seq_along(daty))
}else{
newdaty <- c(index.out$dateInfo$date[-ixold], daty)
newindex <- c(index.out$dateInfo$index[-ixold], max(index.out$dateInfo$index[-ixold]) + seq_along(daty))
}
##########
index.out$dateInfo$date <- newdaty
index.out$dateInfo$index <- newindex
saveRDS(index.out, index.file.anomal)
don.anom$dates <- newdaty
saveRDS(don.anom, out.anom.index)
close(out.anom.index)
.cdtData$EnvData$output <- don.anom
.cdtData$EnvData$PathAnom <- outDIR
}else{
anomaly.fonct <- GeneralParameters$anomaly
out.anom.index <- gzfile(file.path(outDIR, "Anomaly.rds"), compression = 7)
dir.create(ncdfOUT3, showWarnings = FALSE, recursive = TRUE)
dir.create(datadir3, showWarnings = FALSE, recursive = TRUE)
index.out <- don
index.out$varInfo$longname <- paste(outstep, "anomaly from:", don$varInfo$longname)
index.out$dateInfo$date <- daty
index.out$dateInfo$index <- seq_along(daty)
##########
saveRDS(index.out, index.file.anomal)
output <- list(params = GeneralParameters, dates = daty)
saveRDS(output, out.anom.index)
close(out.anom.index)
.cdtData$EnvData$output <- output
.cdtData$EnvData$PathAnom <- outDIR
}
#########################################
chunkfile <- sort(unique(don$colInfo$index))
chunkcalc <- split(chunkfile, ceiling(chunkfile / don$chunkfac))
do.parChunk <- if(don$chunkfac > length(chunkcalc)) TRUE else FALSE
do.parCALC <- if(do.parChunk) FALSE else TRUE
parsL <- doparallel.cond(do.parCALC & (length(chunkcalc) > 10))
ret <- cdt.foreach(seq_along(chunkcalc), parsL, GUI = GUI,
progress = progress, FUN = function(jj)
{
don.data <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], GeneralParameters$cdtdataset$index, cdtParallelCond, do.par = do.parChunk)
don.data <- don.data[don$dateInfo$index, , drop = FALSE]
don.data <- don.data[iyearAnom, , drop = FALSE]
dat.moy <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], index.file.moy, cdtParallelCond, do.par = do.parChunk)
data.sds <- NULL
if(anomaly.fonct == "Standardized")
data.sds <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], index.file.sds, cdtParallelCond, do.par = do.parChunk)
anom <- .cdt.Anomalies(index1, don.data, dat.moy, data.sds, anomaly.fonct)
if(GeneralParameters$outdir$update){
dat.anom <- readCdtDatasetChunk.sequence(chunkcalc[[jj]], index.file.anomal, cdtParallelCond, do.par = do.parChunk)
if(length(ixold) == 0){
anom <- rbind(dat.anom, anom)
}else{
anom <- rbind(dat.anom[-ixold, , drop = FALSE], anom)
}
rm(dat.anom)
}
writeCdtDatasetChunk.sequence(anom, chunkcalc[[jj]], index.out, datadir3, cdtParallelCond, do.par = do.parChunk)
rm(don.data, dat.moy, anom); gc()
})
Insert.Messages.Out(.cdtData$EnvData[['message']][['2a']], TRUE, "s")
#########################################
Insert.Messages.Out(.cdtData$EnvData[['message']][['25']], TRUE, "i")
x <- index.out$coords$mat$x
y <- index.out$coords$mat$y
nx <- length(x)
ny <- length(y)
dx <- ncdf4::ncdim_def("Lon", "degreeE", x)
dy <- ncdf4::ncdim_def("Lat", "degreeN", y)
xy.dim <- list(dx, dy)
if(anomaly.fonct == "Difference") units <- index.out$varInfo$units
if(anomaly.fonct == "Percentage") units <- "percentage"
if(anomaly.fonct == "Standardized") units <- ""
nc.grd <- ncdf4::ncvar_def("anom", units, xy.dim, -9999, index.out$varInfo$longname, "float", compression = 9)
#########################################
chunkfile <- sort(unique(don$colInfo$index))
datyread <- split(daty, ceiling(seq_along(daty) / 50))
do.parChunk <- if(length(chunkfile) > length(datyread)) TRUE else FALSE
do.parCALC <- if(do.parChunk) FALSE else TRUE
parsL <- doparallel.cond(do.parCALC & (length(datyread) > 50))
ret <- cdt.foreach(seq_along(datyread), parsL, GUI = GUI,
progress = progress, FUN = function(jj)
{
daty0 <- datyread[[jj]]
dat.anom <- readCdtDatasetChunk.multi.dates.order(index.file.anomal, daty0, cdtParallelCond, do.par = do.parChunk)
for(j in seq_along(daty0)){
anom <- dat.anom[j, ]
dim(anom) <- c(nx, ny)
anom[is.na(anom) | is.nan(anom) | is.infinite(anom)] <- -9999
filenc <- file.path(ncdfOUT3, paste0("anomaly_", daty0[j], ".nc"))
nc <- ncdf4::nc_create(filenc, nc.grd)
ncdf4::ncvar_put(nc, nc.grd, anom)
ncdf4::nc_close(nc)
}
rm(daty0, dat.anom, anom); gc()
return(0)
})
Insert.Messages.Out(.cdtData$EnvData[['message']][['26']], TRUE, "s")
rm(don, index1, index.out)
}
return(0)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.