R/cdtCompute_WB_Procs.R

Defines functions computeWBProcs

computeWBProcs <- function(){
    message <- .cdtData$GalParams$message
    Insert.Messages.Out(message[['6']], TRUE, "i")

    if(.cdtData$GalParams$data.type == "cdtstation"){
        prec <- getStnOpenData(.cdtData$GalParams$cdtstation$prec)
        if(is.null(prec)) return(NULL)
        prec <- getCDTdataAndDisplayMsg(prec, .cdtData$GalParams$Tstep, .cdtData$GalParams$cdtstation$prec)
        if(is.null(prec)) return(NULL)

        etp <- getStnOpenData(.cdtData$GalParams$cdtstation$etp)
        if(is.null(etp)) return(NULL)
        etp <- getCDTdataAndDisplayMsg(etp, .cdtData$GalParams$Tstep, .cdtData$GalParams$cdtstation$etp)
        if(is.null(etp)) return(NULL)

        if(!any(prec$id %in% etp$id)){
            Insert.Messages.Out(message[['7']], TRUE, 'e')
            return(NULL)
        }
        if(!any(prec$dates %in% etp$dates)){
            Insert.Messages.Out(message[['8']], TRUE, 'e')
            return(NULL)
        }

        ##################
        inx <- match(prec$dates, etp$dates)
        inx <- inx[!is.na(inx)]
        etp$dates <- etp$dates[inx]
        etp$data <- etp$data[inx, , drop = FALSE]
        prec$data <- prec$data[prec$dates %in% etp$dates, , drop = FALSE]
        daty <- etp$dates

        ##################
        jnx <- match(prec$id, etp$id)
        jnx <- jnx[!is.na(jnx)]
        etp$id <- etp$id[jnx]

        stn.id <- etp$id
        stn.lon <- etp$lon[jnx]
        stn.lat <- etp$lat[jnx]
        etp$data <- etp$data[, jnx, drop = FALSE]
        prec$data <- prec$data[, prec$id %in% etp$id, drop = FALSE]

        ##################

        if(.cdtData$GalParams$swhc$multi){
            swhc <- getStnOpenData(.cdtData$GalParams$swhc$file)
            if(is.null(swhc)) return(NULL)

            swhc.id <- as.character(swhc[1, -1])
            # swhc.lon <- as.numeric(swhc[2, -1])
            # swhc.lat <- as.numeric(swhc[3, -1])
            swhc <- as.numeric(swhc[nrow(swhc), -1])

            if(!isTRUE(all.equal(stn.id, swhc.id))){
                Insert.Messages.Out(message[['9']], TRUE, 'e')
                return(NULL)
            }
        }else swhc <- .cdtData$GalParams$swhc$cap.max

        if(.cdtData$GalParams$wb$multi){
            wb1 <- getStnOpenData(.cdtData$GalParams$wb$file)
            if(is.null(wb1)) return(NULL)

            wb1.id <- as.character(wb1[1, -1])
            # wb1.lon <- as.numeric(wb1[2, -1])
            # wb1.lat <- as.numeric(wb1[3, -1])
            wb1 <- as.numeric(wb1[nrow(wb1), -1])

            if(!isTRUE(all.equal(stn.id, wb1.id))){
                Insert.Messages.Out(message[['10']], TRUE, 'e')
                return(NULL)
            }
        }else wb1 <- .cdtData$GalParams$wb$wb1

        ##################

        index <- cdt.index.DailyYears(daty, .cdtData$GalParams$hdate$start.month, .cdtData$GalParams$hdate$start.day)
        index <- index$index

        ##################
        miss1 <- is.na(index[[1]][1])
        ix <- do.call(c, index)
        startDaty <- min(which(!is.na(ix)))
        endDaty <- max(which(!is.na(ix)))

        if(!.cdtData$GalParams$hdate$separate.year){
            index <- if(miss1) list(index[[1]], unlist(index[-1])) else list(ix)
        }
        rg.date <- range(as.Date(daty, '%Y%m%d'), na.rm = TRUE)
        daty <- format(seq(rg.date[1], rg.date[2], 'day'), '%Y%m%d')

        ##################

        WB <- lapply(index, function(idx){
            Water.Balance(prec$data[idx, , drop = FALSE], etp$data[idx, , drop = FALSE], swhc, wb1)
        })
        if(miss1) WB[[1]][] <- NA
        WB <- do.call(rbind, WB)
        WB <- WB[startDaty:endDaty, , drop = FALSE]
        WB <- round(WB, 1)
        WB[is.na(WB)] <- .cdtData$Config$missval

        ##################

        xhead <- rbind(stn.id, stn.lon, stn.lat)
        chead <- c('ID.STN', 'LON', paste0(toupper(.cdtData$GalParams$Tstep), '/LAT'))
        infohead <- cbind(chead, xhead)

        WB <- rbind(infohead, cbind(daty, WB))
        writeFiles(WB, .cdtData$GalParams$output)
        rm(prec, etp, WB)
    }

    #######################################################

    if(.cdtData$GalParams$data.type == "cdtdataset"){
        prec <- try(readRDS(.cdtData$GalParams$cdtdataset$prec), silent = TRUE)
        if(inherits(prec, "try-error")){
            Insert.Messages.Out(paste(message[['11']], .cdtData$GalParams$cdtdataset$prec), TRUE, 'e')
            return(NULL)
        }
        if(.cdtData$GalParams$Tstep != prec$TimeStep){
            Insert.Messages.Out(paste(message[['12']], .cdtData$GalParams$Tstep), TRUE, 'e')
            return(NULL)
        }

        etp <- try(readRDS(.cdtData$GalParams$cdtdataset$etp), silent = TRUE)
        if(inherits(etp, "try-error")){
            Insert.Messages.Out(paste(message[['11']], .cdtData$GalParams$cdtdataset$etp), TRUE, 'e')
            return(NULL)
        }
        if(.cdtData$GalParams$Tstep != etp$TimeStep){
            Insert.Messages.Out(paste(message[['13']], .cdtData$GalParams$Tstep), TRUE, 'e')
            return(NULL)
        }

        ##################
        SP1 <- defSpatialPixels(list(lon = prec$coords$mat$x, lat = prec$coords$mat$y))
        SP2 <- defSpatialPixels(list(lon = etp$coords$mat$x, lat = etp$coords$mat$y))
        if(is.diffSpatialPixelsObj(SP1, SP2, tol = 1e-04)){
            Insert.Messages.Out(message[['14']], TRUE, 'e')
            return(NULL)
        }
        rm(SP1, SP2)

        ##################
        if(prec$chunksize != etp$chunksize){
            Insert.Messages.Out(message[['15']], TRUE, 'e')
            return(NULL)
        }

        ##################
        if(!any(prec$dateInfo$date %in% etp$dateInfo$date)){
            Insert.Messages.Out(message[['8']], TRUE, 'e')
            return(NULL)
        }

        ##################
        inx <- match(prec$dateInfo$date, etp$dateInfo$date)
        inx <- inx[!is.na(inx)]
        etp$dateInfo$date <- etp$dateInfo$date[inx]
        etp$dateInfo$index <- etp$dateInfo$index[inx]

        pdaty <- prec$dateInfo$date %in% etp$dateInfo$date
        prec$dateInfo$date <- prec$dateInfo$date[pdaty]
        prec$dateInfo$index <- prec$dateInfo$index[pdaty]

        daty <- prec$dateInfo$date

        ##################

        if(.cdtData$GalParams$swhc$multi){
            swhc <- getNCDFSampleData(.cdtData$GalParams$swhc$file)
            if(is.null(swhc)){
                Insert.Messages.Out(message[['16']], TRUE, 'e')
                return(NULL)
            }

            SP1 <- defSpatialPixels(swhc[c('lon', 'lat')])
            SP2 <- defSpatialPixels(list(lon = etp$coords$mat$x, lat = etp$coords$mat$y))
            if(is.diffSpatialPixelsObj(SP1, SP2, tol = 1e-04)){
                Insert.Messages.Out(message[['17']], TRUE, 'e')
                return(NULL)
            }
            rm(SP1, SP2)

            jfile <- getIndex.AllOpenFiles(.cdtData$GalParams$swhc$file)
            swhc <- .cdtData$OpenFiles$Data[[jfile]][[2]]$z
        }else swhc <- .cdtData$GalParams$swhc$cap.max

        if(.cdtData$GalParams$wb$multi){
            wb1 <- getNCDFSampleData(.cdtData$GalParams$wb$file)
            if(is.null(wb1)){
                Insert.Messages.Out(message[['18']], TRUE, 'e')
                return(NULL)
            }

            SP1 <- defSpatialPixels(wb1[c('lon', 'lat')])
            SP2 <- defSpatialPixels(list(lon = etp$coords$mat$x, lat = etp$coords$mat$y))
            if(is.diffSpatialPixelsObj(SP1, SP2, tol = 1e-04)){
                Insert.Messages.Out(message[['19']], TRUE, 'e')
                return(NULL)
            }
            rm(SP1, SP2)

            jfile <- getIndex.AllOpenFiles(.cdtData$GalParams$wb$file)
            wb1 <- .cdtData$OpenFiles$Data[[jfile]][[2]]$z
        }else wb1 <- .cdtData$GalParams$wb$wb1

        ##################

        index <- cdt.index.DailyYears(daty, .cdtData$GalParams$hdate$start.month, .cdtData$GalParams$hdate$start.day)
        index <- index$index

        miss1 <- is.na(index[[1]][1])
        ix <- do.call(c, index)
        startDaty <- min(which(!is.na(ix)))
        endDaty <- max(which(!is.na(ix)))

        if(!.cdtData$GalParams$hdate$separate.year){
            index <- if(miss1) list(index[[1]], unlist(index[-1])) else list(ix)
        }

        rg.date <- range(as.Date(daty, '%Y%m%d'), na.rm = TRUE)
        daty <- format(seq(rg.date[1], rg.date[2], 'day'), '%Y%m%d')

        ##################

        index.out <- prec
        index.out$varInfo$name <- "wb"
        index.out$varInfo$units <- "mm"
        index.out$varInfo$longname <- "Daily water balance"

        dataset <- paste0("WaterBalance_", .cdtData$GalParams$Tstep)
        outDIR <- file.path(.cdtData$GalParams$output, dataset)
        dir.create(outDIR, showWarnings = FALSE, recursive = TRUE)

        datadir <- file.path(outDIR, 'DATA')
        dir.create(datadir, showWarnings = FALSE, recursive = TRUE)
        datafileIdx <- file.path(outDIR, paste0(dataset, '.rds'))

        ##################

        chunkfile <- sort(unique(prec$colInfo$index))
        chunkcalc <- split(chunkfile, ceiling(chunkfile/prec$chunkfac))

        do.parChunk <- if(prec$chunkfac > length(chunkcalc)) TRUE else FALSE
        do.parCALC <- if(do.parChunk) FALSE else TRUE

        GalParams <- .cdtData$GalParams
        cdtParallelCond <- .cdtData$Config$parallel

        parsL <- doparallel.cond(do.parCALC & (length(chunkcalc) > 5))
        ret <- cdt.foreach(seq_along(chunkcalc), parsL, GUI = TRUE,
                           progress = TRUE, FUN = function(j)
        {
            rr <- readCdtDatasetChunk.sequence(chunkcalc[[j]], GalParams$cdtdataset$prec, cdtParallelCond, do.par = do.parChunk)
            rr <- rr[prec$dateInfo$index, , drop = FALSE]

            et <- readCdtDatasetChunk.sequence(chunkcalc[[j]], GalParams$cdtdataset$etp, cdtParallelCond, do.par = do.parChunk)
            et <- et[etp$dateInfo$index, , drop = FALSE]

            if(GalParams$swhc$multi){
                ix <- prec$colInfo$index %in% chunkcalc[[j]]
                capmx <- swhc[prec$colInfo$id[ix]]
            }else capmx <- swhc

            if(GalParams$wb$multi){
                ix <- prec$colInfo$index %in% chunkcalc[[j]]
                wb0 <- wb1[prec$colInfo$id[ix]]
            }else wb0 <- wb1

            WB <- lapply(index, function(idx){
                Water.Balance(rr[idx, , drop = FALSE], et[idx, , drop = FALSE], capmx, wb0)
            })

            if(miss1) WB[[1]][] <- NA
            WB <- do.call(rbind, WB)
            WB <- WB[startDaty:endDaty, , drop = FALSE]

            writeCdtDatasetChunk.sequence(WB, chunkcalc[[j]], index.out, datadir, cdtParallelCond, do.par = do.parChunk)

            rm(rr, et, WB); gc()
            return(0)
        })

        ##################

        index.out$dateInfo$date <- daty
        index.out$dateInfo$index <- seq_along(daty)

        con <- gzfile(datafileIdx, compression = 6)
        open(con, "wb")
        saveRDS(index.out, con)
        close(con)

        rm(etp, prec, index.out)
    }

    return(0)
}
rijaf-iri/CDT documentation built on July 3, 2024, 2:54 a.m.