modules/production_input_validation/main.R

##' # Production Input Validation Module
##'
##' **Description:**
##'
##' This module examins whether the production data are valid.
##'
##' **Author: Michael C. J. Kao**
##'
##' **Validation:**
##'
##' Currently there are 5 tests in the suite:
##'
##' 1. Check whether flags are valid.
##' 2. Check whether missing values are specified correctly
##' 3. Check whether the production (input) values are within feasible range.
##' 4. Check whether the area harvested (output) values are within feasible range.
##' 5. Check whether the yield (productivity) values are within feasible range.
##' 6. Check whether the production triplet (input, output productivity) are
##'    balanced.
##'
##' **Auto-corrections:**
##'
##' 1. Flag (M, -) --> (M, u) this autocorrection has been removed
##' 2. Flag (E, t) --> (E, -)
##' 3. Flag (E, e) --> (I, e)
##' 4. Flag (E, p) --> (E, f)
##' 5. Conflicting area harvested and production are removed.
##' 6. Zero yield are removed.
##'
##' **Steps:**
##'
##' 1. Perform auto-correction
##' 2. Remove previous production processing.
##' 3. Perform validation on non-processed data
##' 4. Save auto-corrected data back to the database
##' 5. Send email to user if input data contains invalid entries.
##'
##' ---


##' ## Initialisation
##'

##' Load the libraries
suppressMessages({
    library(faosws)
    library(faoswsUtil)
    library(faoswsFlag)
    library(faoswsImputation)
    library(faoswsProduction)
    library(faoswsProcessing)
    library(faoswsEnsure)
    library(magrittr)
    library(dplyr)
    library(sendmailR)
})

##' Set up for the test environment and parameters
R_SWS_SHARE_PATH = Sys.getenv("R_SWS_SHARE_PATH")

##' Load the configuration if in debug mode
if(CheckDebug()){

    library(faoswsModules)
    SETTINGS = ReadSettings("sws.yml")

    R_SWS_SHARE_PATH = SETTINGS[["share"]]

    ## Define where your certificates are stored
    SetClientFiles(SETTINGS[["certdir"]])

    GetTestEnvironment(baseUrl = SETTINGS[["server"]],
                       token = SETTINGS[["token"]])

}

##' Obtain computation parameter, this parameter determines whether only
##' selected session should be validated or the complete production domain.
validationRange = swsContext.computationParams$validation_selection


##' Get session key and dataset configuration
sessionKey = swsContext.datasets[[1]]
datasetConfig = GetDatasetConfig(domainCode = sessionKey@domain,
                                 datasetCode = sessionKey@dataset)


##' Obtain the complete imputation Datakey
completeImputationKey = getCompleteImputationKey("production")

##' Selected the key based on the input parameter
selectedKey =
    switch(validationRange,
           "session" = sessionKey,
           "all" = completeImputationKey)


##' Build processing parameters, the processing parameters contain parameters on
##' how the data should be processed.
processingParameters =
    productionProcessingParameters(datasetConfig = datasetConfig)



##' ---
##' ## Define auto-correction.
##'
##' Certain invalid data can be corrected if a correction rule is agreed a
##' priori. Both function should be removed from the module, possibly to the
##' ensure package.
##'

##' Function to perform flag correction
autoFlagCorrection = function(data,
                              flagObservationStatusVar = "flagObservationStatus",
                              flagMethodVar = "flagMethod"){
    dataCopy = copy(data)

     ## Correction (1): (M, -) --> (M, u)
     ## correctionFilter =
     ##     dataCopy[[flagObservationStatusVar]] == "M" &
     ##     dataCopy[[flagMethodVar]] == "-"
     ## 
     ## 
     ## dim1=dim(dataCopy[correctionFilter,])
     ## message("Number of (M,-) replaced with (M,u)", dim1[1])
     ## 
     ## 
     ## dataCopy[correctionFilter,
     ##          `:=`(c(flagObservationStatusVar, flagMethodVar),
     ##               .("M", "u"))]

     ## Correction (2): (E, t) --> (E, -)
    
    
    correctionFilter =
        dataCopy[[flagObservationStatusVar]] == "E" &
        dataCopy[[flagMethodVar]] == "t"
    
    dim2=dim(dataCopy[correctionFilter,])
    message("Number of (E, t) replaced with (E, -)", dim2[1])
    
    
    
    dataCopy[correctionFilter,
             `:=`(c(flagObservationStatusVar, flagMethodVar),
                  .("E", "-"))]

    ## Correction (3): (E, e) --> (I, e)
    correctionFilter =
        dataCopy[[flagObservationStatusVar]] == "E" &
        dataCopy[[flagMethodVar]] == "e"
    
    
    dim3=dim(dataCopy[correctionFilter,])
    message("Number of (E, e) replaced with (I, e)", dim3[1])
    
    dataCopy[correctionFilter,
             `:=`(c(flagObservationStatusVar, flagMethodVar),
                  .("I", "e"))]

    ## Correction (4): (E, p) --> (E, f)
    correctionFilter =
        dataCopy[[flagObservationStatusVar]] == "E" &
        dataCopy[[flagMethodVar]] == "p"
    
    
    dim4=dim(dataCopy[correctionFilter,])
    message("Number of (E, p) replaced with (E, f)", dim4[1])
    
    
    dataCopy[correctionFilter,
             `:=`(c(flagObservationStatusVar, flagMethodVar),
                  .("E", "f"))]

    dataCopy
    
 ##   data.table(dim1[1],dim2[1],dim3[1],dim4[1])
    
    
}

##' Function to perform value correction
autoValueCorrection = function(data,
                               processingParameters,
                               formulaParameters){
    correctedData =
        with(formulaParameters,
             with(processingParameters,
                  data %>%
                  removeZeroConflict(data = .,
                                     value1 = productionValue,
                                     value2 = areaHarvestedValue,
                                     observationFlag1 = productionObservationFlag,
                                     observationFlag2 =
                                         areaHarvestedObservationFlag,
                                     methodFlag1 = productionMethodFlag,
                                     methodFlag2 = areaHarvestedMethodFlag,
                                     missingObservationFlag =
                                         missingValueObservationFlag,
                                     missingMethodFlag =
                                         missingValueMethodFlag
                                 ##     getSummary=TRUE
                                     ) %>%
                  removeZeroYield(data = .,
                                  yieldValue = yieldValue,
                                  yieldObsFlag = yieldObservationFlag,
                                  yieldMethodFlag = yieldMethodFlag
                                 ## getSummary=TRUE
                                  
                                  )
                  )
             )
    correctedData
}

##' ## Perform Validation
##'

##' Extract the selected item code which will be looped over, we need to loop
##' over items as different item can have different formula. As we need to check
##' production is balanced, we need to substitute the correct formula for each
##' commodity.
selectedItem = getQueryKey("measuredItemCPC", selectedKey)

invalidFlagCombination=list()

##' Perform input validation item by item
for(iter in seq(selectedItem)){

    currentItem = selectedItem[iter]
    message("Performing input validation for item: ", currentItem)

    ## Get the formula (elements) associated with the current item
    currentFormula =
        getProductionFormula(currentItem) %>%
        removeIndigenousBiologicalMeat
    currentElements = with(currentFormula, c(input, productivity, output))

    ## Update the item and element key to for current item
    currentKey = selectedKey
    currentKey@dimensions$measuredItemCPC@keys = currentItem
    currentKey@dimensions$measuredElement@keys = currentElements

    ## Create the formula parameter list
    formulaParameters =
        with(currentFormula,
             productionFormulaParameters(datasetConfig = datasetConfig,
                                         productionCode = output,
                                         areaHarvestedCode = input,
                                         yieldCode = productivity,
                                         unitConversion = unitConversion))

    ## Extract the current data
    currentData =
        currentKey %>%
        GetData(key = .)

    
    
    

    
    if(nrow(currentData) > 0){
        ## NOTE (Michael): The test should be conducted on the raw data, that is
        ##                 excluding any imputation, statistical estimation and
        ##                 previous calculated values. However, auto corrected
        ##                 values will be saved back to the database.

        ## Correct the flag and values here where available based on agreed
        ## correction rules.
        
        
        
        
        ## Count non-valid flag combinations
        
        Flag=copy(flagValidTable)
                Flag[,flagCombination := paste(flagObservationStatus,flagMethod, sep=";")]
                currentData[,flagCombination:=paste(flagObservationStatus,flagMethod, sep=";")]
                a=merge(Flag,currentData, by="flagCombination")
                sum(a[,Valid])
                dima=dim(a)
        numberOfInvalid=dima[1]-sum(a[,Valid])
                message("Number of invalid combination of flag=  ", numberOfInvalid )
       
       
                
                
                
        ## Flag autocorrection
                
                
         autoCorrectedData =
            currentData %>%
            preProcessing(data = .) %>%
            autoFlagCorrection(data = .) %>% 
            denormalise(normalisedData = .,
                         denormaliseKey = processingParameters$elementVar) %>%
             createTriplet(data = ., formula = currentFormula) %>%
             autoValueCorrection(data = .,
                                 processingParameters = processingParameters,
                                 formulaParameters = formulaParameters)

    
        

        ## Remove all values generated by previous production process (e.g.
        ## previous imputation/calculation) and also manual estimates.
        
       
        
        
        rawData =
            autoCorrectedData %>%
            processProductionDomain(data = .,
                                    processingParameters = processingParameters,
                                    formulaParameters = formulaParameters)


        
        
        
        ##' The tests and test messages
        tests = c("flag_validity", "correct_missing_value",
                  "production_range", "areaHarvested_range", "yield_range", "balanced")
        
        ##' Initialise the error list
        errorList = vector("list", length(tests))
        
##        completeData =
##            GetData(selectedKey) %>%
##            preProcessing(data = .)
        
        ##' Check the flags
        errorList[[1]] =
            rawData %>%
            normalise(.)%>%
            ensureFlagValidity(data = .,
                               getInvalidData = TRUE,
                               normalised = TRUE) %>%
            rbind(errorList[[1]], .)
        
        ##' Ensuring missing values are correctly specified
        errorList[[2]] =
            rawData %>%
            normalise(.)%>%
            ensureCorrectMissingValue(data = .,
                                      valueVar = "Value",
                                      flagObservationStatusVar = "flagObservationStatus",
                                      missingObservationFlag = "M",
                                      returnData = FALSE,
                                      getInvalidData = TRUE)%>%
                                      rbind(errorList[[2]], .)
        
        ## Check production value
        errorList[[3]] =
            with(formulaParameters,
            {
                rawData %>%
                    ensureValueRange(data = .,
                                     ensureColumn = productionValue,
                                     getInvalidData = TRUE) %>%
                    normalise %>%
                    rbind(errorList[[3]], .)
            }
            )

        ## Check area harvested value
        errorList[[4]] =
            with(formulaParameters,
            {
                rawData %>%
                    ensureValueRange(data = .,
                                     ensureColumn = areaHarvestedValue,
                                     getInvalidData = TRUE) %>%
                    normalise %>%
                    rbind(errorList[[4]], .)
            }
            )

        ## Check yield value
        errorList[[5]] =
            with(formulaParameters,
            {
                rawData %>%
                    ensureValueRange(data = .,
                                     ensureColumn = yieldValue,
                                     getInvalidData = TRUE,
                                     includeEndPoint = FALSE) %>%
                    normalise %>%
                    rbind(errorList[[5]], .)
            }
            )

        ## Check production domain balanced.
        errorList[[6]] =
            with(formulaParameters,
            {
                rawData %>%
                    ensureProductionBalanced(data = .,
                                             areaVar = areaHarvestedValue,
                                             yieldVar = yieldValue,
                                             prodVar = productionValue,
                                             conversion = unitConversion,
                                             returnData = FALSE,
                                             getInvalidData = TRUE,
                                             normalised = FALSE) %>%
                    normalise %>%
                    rbind(errorList[[6]], .)
            }
            )

        ## Save the auto-corrected data back
        with(formulaParameters,
        {
            autoCorrectedData %>%
                ensureValueRange(data = .,
                                 ensureColumn = productionValue,
                                 getInvalidData = FALSE, 
                                 returnData = TRUE) %>%
                ensureValueRange(data = .,
                                 ensureColumn = areaHarvestedValue,
                                 getInvalidData = FALSE, 
                                 returnData = TRUE) %>%
                ensureValueRange(data = .,
                                 ensureColumn = yieldValue,
                                 getInvalidData = FALSE, 
                                 returnData = TRUE) %>%
                normalise(.) %>%
                postProcessing(.) %>%
                ensureFlagValidity(data = .,
                                   normalised = TRUE,
                                   getInvalidData = FALSE, 
                                   returnData = TRUE) %>%
                SaveData(domain = sessionKey@domain,
                         dataset = sessionKey@dataset,
                         data = .)
            "Module finished successfully"
              ##  ensureProductionBalanced(data = .,
              ##                           areaVar = areaHarvestedValue,
              ##                           yieldVar = yieldValue,
              ##                           prodVar = productionValue,
              ##                           conversion = unitConversion,
              ##                           getInvalidData = FALSE, 
              ##                           returnData = TRUE,
              ##                           normalised = FALSE) %>%
                
        })

    } else {
        message("Current Item has no data")
    }

}



##' ---
##' ## Return Messages and Send Error
##'

##' Send email to user if the data contains invalid entries.
if(max(sapply(errorList, nrow)) > 0){

    ## Initiate email
    from = "I_am_a_magical_unicorn@sebastian_quotes.com"
    to = swsContext.userEmail
    subject = "Validation Result"
    body = paste0("There are 5 tests current in the system:\n",
                  "1. Flag validity: Whether a flag is valid\n",
                  "2. Missing values are correctly specified\n",
                  "3. Production range: [0, Inf)\n",
                  "4. Area Harvested range: [0, Inf)\n",
                  "5. Yield range: (0, Inf)\n",
                  "6. Production balanced")


    ## Function to attach error to email
    createErrorAttachmentObject = function(testName,
                                           testResult,
                                           R_SWS_SHARE_PATH){
        errorAttachmentName = paste0(testName, ".csv")
        errorAttachmentPath =
            paste0(R_SWS_SHARE_PATH, "/kao/", errorAttachmentName)
        write.csv(testResult, file = errorAttachmentPath,
                  row.names = FALSE)
        errorAttachmentObject = mime_part(x = errorAttachmentPath,
                                          name = errorAttachmentName)
       errorAttachmentObject
    }

    ## Create attachment for errors
    testContainInvalidData =
        which(sapply(errorList, nrow) > 0)

    bodyWithAttachment =
        vector("list", length = length(testContainInvalidData) + 1)
    bodyWithAttachment[[1]] = body

    for(i in seq(length(testContainInvalidData))){
        failedIndex = testContainInvalidData[i]
        bodyWithAttachment[[(i + 1)]] =
            createErrorAttachmentObject(tests[[failedIndex]],
                                        errorList[[failedIndex]],
                                        R_SWS_SHARE_PATH)
    }

    ## Send the email
    sendmail(from = from, to = to, subject = subject, msg = bodyWithAttachment)
    stop("Production Input Invalid, please check follow up email on invalid data")
} else {
    msg = "Production Input Validation passed without any error!"
    message(msg)
    msg
}
SWS-Methodology/faoswsProduction documentation built on March 21, 2023, 8:27 p.m.