modules/complete_tf_cpc/main.R

##' ---
##' title: "Appendix: `complete_tf_cpc` module"
##' author:
##'   - Marco Garieri
##'   - Alexander Matrunich
##'   - Christian A. Mongeau Ospina
##'   - Aydan Selek
##'   - Bo Werth\
##'
##'     Food and Agriculture Organization of the United Nations
##' date: "`r format(Sys.time(), '%e %B %Y')`"
##' output:
##'    pdf_document
##' ---

##' This document gives a faithful step-by-step sequence of the operations
##' performed in the `complete_tf_cpc` module. For a narrative version of
##' the module's approach, please see its main document.

# format(Sys.time(), "%F-%H-%M")
PLUGIN_VERSION <- "2021-10-26-15-00"

##+ setup, include=FALSE
knitr::opts_chunk$set(echo = FALSE, eval = FALSE)

startTime = Sys.time()

# Libraries ####
suppressPackageStartupMessages(library(data.table))
library(stringr)
library(magrittr)
library(scales)
library(tidyr, warn.conflicts = FALSE)
library(futile.logger)
suppressPackageStartupMessages(library(dplyr, warn.conflicts = FALSE))
library(faosws)
library(faoswsUtil)
library(faoswsTrade)
library(faoswsFlag)
library(bit64)
`%!in%` = Negate(`%in%`)
# Always source files in R/ (useful for local runs)
sapply(dir("R", full.names = TRUE), source)

##+ check_parameters

##+ init

## **Flow chart:**
##
## ![Aggregate complete_tf to total_trade](assets/diagram/trade_3.png?raw=true "livestock Flow")

# Settings ####

# Should we do just pre-processing reports?
# If TRUE no auxiliary files will be read (unless they are required)
# and the module will stop as soon as reports on raw data are done.
only_pre_process <- FALSE

# If this is set to TRUE, the module will download the whole dataset
# saved on SWS (year specific) and will do a setdiff by comparing this
# set and the dataset generated by the module: all values saved on SWS
# that are not generated by the current run should be considered "wrong"
# (e.g., generated by a previous run of the module that had a bug) and
# will then be set to NA. See issue #164
remove_nonexistent_transactions <- TRUE

# If set to TRUE, an automatic HS6 mapping will be created so that
# unmapped codes with normal mapped can be recovered with HS6 codes
generate_hs6mapping <- TRUE

# Package build ID (it is included into report directory name)
build_id <- "master"

# Should we stop after HS-FCL mapping?
stop_after_mapping <- FALSE

set.seed(2507)

# Size for sampling. Set NULL if no sampling is required.
samplesize <- NULL

# Logging level. There are following levels (`trace` shows everything in log):
# trace, debug, info, warn, error, fatal

# Additional logger for technical data
futile.logger::flog.logger("dev", "TRACE")
futile.logger::flog.threshold("TRACE", name = "dev")

# Parallel backend will be used only if required packages are installed.
# It will be switched to FALSE if packages are not available.
multicore <- TRUE

## If TRUE, reported values will be in $, if FALSE in k$
dollars <- FALSE

# If TRUE, impute outliers, if FALSE no imputation occurs
detect_outliers <- FALSE

# Print general log to console
general_log2console <- FALSE

# Save current options (will be reset at the end)
old_options <- options()

dev_sws_set_file <- "modules/complete_tf_cpc/sws.yml"

# Switch off dplyr's progress bars globally
options(dplyr.show_progress = FALSE)

# max.print in RStudio is too small
options(max.print = 99999L, scipen = 999)

# Development (SWS-outside) mode addons ####
if (faosws::CheckDebug()){
  set_sws_dev_settings(dev_sws_set_file)
  HS_DESCR <- "https://comtrade.un.org/Data/cache/classificationHS.json"
} else {
  # In order to have all columns aligned. Issue #119
  options(width = 1000L)

  # Remove domain from username
  USER <- regmatches(
    swsContext.username,
    regexpr("(?<=/).+$", swsContext.username, perl = TRUE)
  )

  # XXX: this could/should be an SWS datatable
  # HS_DESCR <- "/srv/shiny-server/TRADEvalidation/files/classificationHS.json"
  HS_DESCR <- file.path(Sys.getenv("R_SWS_SHARE_PATH"), "trade/datatables/classificationHS.json")

  options(error = function(){
    dump.frames()

    filename <- file.path(Sys.getenv("R_SWS_SHARE_PATH"),
                          USER,
                          "complete_tf_cpc")

    dir.create(filename, showWarnings = FALSE, recursive = TRUE)

    save(last.dump, file = file.path(filename, "last.dump.RData"))
  })
}

# E-mail addresses of people that will get notified.
EMAIL_RECIPIENTS <- ReadDatatable("ess_trade_people")$fao_email
EMAIL_RECIPIENTS <- gsub(" ", "", EMAIL_RECIPIENTS)

# Remove S.T.:
EMAIL_RECIPIENTS <- EMAIL_RECIPIENTS[!grepl("yy", EMAIL_RECIPIENTS)]

# Stop if required parameters were not set
# The out_coef was removed as a parameter given that the
# outlier detection/imputation was disabled.
#stopifnot(!is.null(swsContext.computationParams$out_coef))
stopifnot(!is.null(swsContext.computationParams$year))

##' # Parameters

# Below the calls to `exists()` is useful if it the parameters
# were set in an interactive session

##' - `year`: year for processing.
if (!exists('year', inherits = FALSE)) {
  year <- as.integer(swsContext.computationParams$year)
}
flog.info("Year: %s", year, name = "dev")

stopifnot(!any(is.na(USER), USER == ""))

reportdir <- reportdirectory(USER, year, build_id, browsedir = CheckDebug())
report_txt <- file.path(reportdir, "report.txt")
dev_log <- file.path(reportdir, "development.log")

# Send general log messages
if (general_log2console) {
  # to console and a file
  flog.appender(appender.tee(report_txt))
} else {
  # to a file only
  flog.appender(appender.file(report_txt))
}

# Send technical log messages to a file and console
flog.appender(appender.tee(dev_log), name = "dev")

# Read SWS module run parameters ####

flog.info("Plugin version: %s", PLUGIN_VERSION, name = "dev")


# ##' - `out_coef`: coefficient for outlier detection, i.e., the `k` parameter in
# ##' the *Outlier Detection and Imputation* section.
# # See coef argument in ?boxplot.stats
# if (!exists('out_coef', inherits = FALSE)) {
#   out_coef <- as.numeric(swsContext.computationParams$out_coef)
# }
# Note: the outlier detection/imputation is harcoded to be disabled.
# In any case, the coefficient if left here if eventually required.
out_coef <- 1000
# flog.info("Coefficient for outlier detection: %s", out_coef)

#if (!CheckDebug()) {
#  updateInfoTable(year = year, table = 'complete_tf_runs_info',
#                  mode = 'restart')
#}

flog.info("SWS-session is run by user %s", USER, name = "dev")

flog.debug("User's computation parameters:",
           swsContext.computationParams, capture = TRUE,
           name = "dev")

flog.info("R session environment: ",
          sessionInfo(), capture = TRUE, name = "dev")

PID <- Sys.getpid()

# Check that all packages are up to date ####

check_versions(c('faoswsUtil', 'faoswsTrade', 'dplyr'),
               c('0.2.11',     '0.1.1',       '0.5.0'))

# Register CPU cores ####
if (multicore) multicore <- register_cpu_cores()

##+ swsdebug

## ## local data
## install.packages("//hqfile4/ess/Team_working_folder/A/SWS/faosws_0.8.2.9901.tar.gz",
##                  repos = NULL,
##                  type = "source")
## ## SWS data
## install.packages("faosws",
##                  repos = "http://hqlprsws1.hq.un.fao.org/fao-sws-cran/")

##+ hschapters, eval = TRUE

hs_chapters <- c(1:24, 33, 35, 38, 40:41, 43, 50:53) %>%
  formatC(width = 2, format = "d", flag = "0") %>%
  as.character %>%
  shQuote(type = "sh") %>%
  paste(collapse = ", ")

##'   - `hs_chapters`: can not be set by the user as it is provided by
##'   Team B/C and hardcoded). The HS chapters are the following:

##'     `r paste(formatC(hs_chapters, width = 2, format = "d", flag = "0"), collapse = ' ')`

##'

flog.info("HS chapters to be selected:", hs_chapters,  capture = TRUE)

##' # Download auxiliary tables

# If running the whole module (only_pre_process = FALSE) the various
# helper files will be read before everything else and a check will
# be done so that if there is any issue in reading any of these tables
# the module should exit immediately (i.e., it makes no sense to read
# the whole trade data, do computations, etc. if the module is going
# to fail because a table could not be read at some later step).
# They are in ordered based on increasing time required for reading.

# TODO: there are basic checks on the tables (mainly that they have
# data), but more detailed checks should be needed (see #132)

trademap_year <- ReadDatatable(paste0("ess_trademap_", year))

# If not available, will have zero rows.
# General mapping will be done only the first time to integrate year-1.
trademap_year_available <- nrow(trademap_year) > 0

if (trademap_year_available == FALSE) {
  # Now, check whether a table for previous year actually exists...
  if (paste0("ess_trademap_", year - 1) %in% FetchDatatableNames()) {

    trademap_year <- ReadDatatable(paste0("ess_trademap_", year - 1))

    # ... if it does, copy previous one, and save
    if (nrow(trademap_year) > 0) {
      trademap_year[, year := year + 1]
    }
  }
}

# The general mapping procedure uses FAO area codes, so converting M49
# to FAL. At some point, this will need to be changed (use always M49)
trademap_year <-
  merge(
    trademap_year,
    as.data.table(m49faomap)[,
                             .(m49 = as.character(m49), fao)
                             ],
    by.x = "area",
    by.y = "m49",
    all.x = TRUE
  )

# FIX M49/FAO areas conversions
trademap_year[area == "1248", fao := 41]
trademap_year[area == "250",  fao := 68]
trademap_year[area == "380",  fao := 106]
trademap_year[area == "578",  fao := 162]
trademap_year[area == "756",  fao := 211]
trademap_year[area == "840",  fao := 231]

setnames(trademap_year, c("area", "fao"), c("m49", "area"))

if (!only_pre_process) {

  ##' - `comtradeunits`: Translation of the `qunit` variable (supplementary
  ##' quantity units) in Tariffline data into intelligible unit of measurement,
  ##' which correspond to the standards of quantity recommended by the *World
  ##' Customs Organization* (WCO) (e.g., `qunit`=8 corresponds to *kg*).
  ##' See: http://unstats.un.org/unsd/tradekb/Knowledgebase/UN-Comtrade-Reference-Tables

  flog.trace("[%s] Reading in 'comtradeunits' datatable", PID, name = "dev")
  #data("comtradeunits", package = "faoswsTrade", envir = environment())
  comtradeunits <- ReadDatatable('comtradeunits')

  stopifnot(nrow(comtradeunits) > 0)

  setnames(comtradeunits, c("ctu_qunit", "ctu_wco", "ctu_desc"), c("qunit", "wco", "desc"))

  comtradeunits[, qunit := as.integer(qunit)]

  ##' - `EURconversionUSD`: Annual EUR/USD currency exchange rates table from SWS.

  flog.trace("[%s] Reading in 'eur_conversion_usd' datatable", PID, name = "dev")
  EURconversionUSD <- ReadDatatable('eur_conversion_usd')

  stopifnot(nrow(EURconversionUSD) > 0)
  stopifnot(year %in% EURconversionUSD$eusd_year)

  ##' - `fclunits`: For UNSD Tariffline units of measurement are converted to
  ##' meet FAO standards. According to FAO standard, all weights are reported in
  ##' tonnes, animals in heads or 1000 heads and for certain commodities,
  ##' only the value is provided.

  flog.trace("[%s] Reading in 'fclunits' datatable", PID, name = "dev")
  #data("fclunits", package = "faoswsTrade", envir = environment())
  fclunits <- ReadDatatable('fclunits')

  stopifnot(nrow(fclunits) > 0)

  setnames(fclunits, c("fcu_fcl", "fcu_fclunit"), c("fcl", "fclunit"))

  fclunits[, fcl := as.integer(fcl)]

  ##' - `fclcodes`: List of valid FCL codes.

  flog.trace("[%s] Reading in 'fcl_codes' datatable", PID, name = "dev")
  fcl_codes <- as.numeric(ReadDatatable('fcl_2_cpc')$fcl)

  stopifnot(length(fcl_codes) > 0)

  ##' - `livestockweights`: List of valid FCL codes.

  flog.trace("[%s] Reading in 'livestock_weights' datatable", PID, name = "dev")
  livestock_weights <- ReadDatatable('livestock_weights')
  stopifnot(nrow(livestock_weights) > 0)

  ##' - `hs6standard`: HS6standard will be used as last resort for mapping.

  flog.trace("[%s] Reading in 'standard_hs12_6digit' datatable", PID, name = "dev")
  hs6standard <- ReadDatatable('standard_hs12_6digit')

  stopifnot(nrow(hs6standard) > 0)

  hs6standard <-
    hs6standard[
      !duplicated(hs2012_code)
      ][,
        .(hs6 = as.integer(hs2012_code), hs2012_code, faostat_code)
        ]

  ##@ if (trademap_year_available == FALSE) {

  ##' - `hsfclmap4`: Additional mapping between HS and FCL codes (extends `hsfclmap`).

  flog.trace("[%s] Reading in 'hsfclmap4' datatable", PID, name = "dev")
  add_map <- ReadDatatable('hsfclmap4')

  stopifnot(nrow(add_map) > 0)

  add_map <- add_map[!is.na(year) & !is.na(reporter_fao) & !is.na(hs)]

  add_map[, hs := stringr::str_replace_all(hs, ' ', '')]

  add_map[hs_chap < 10 & stringr::str_sub(hs, 1, 1) != '0', hs := paste0("0", hs)]

  ## XXX change some FCL codes that are not valid
  add_map[fcl == 389, fcl := 390]
  add_map[fcl == 654, fcl := 653]

  setkeyv(add_map, c("reporter_fao", "flow", "hs", "year"))

  ##' - `hsfclmap`: Mapping between HS and FCL codes extracted from MDB files
  ##' used to archive information existing in the previous trade system
  ##' (Shark/Jellyfish). This mapping table contains (identifier: `hsfclmap5`)
  ##' also some "corrections" to the original mapping found in the MDB files.
  ##' These are contained in the `correction_*` variables (e.g.,
  ##' `corrections_fcl`), and if for a given HS range one or more of these
  ##' variables are non-missing they will replace the original corresponding
  ##' variable (e.g., if `corresponding_fcl` is non-missing, it will replace
  ##' `fcl`). Missing HS to FCL links in the MDB files are mapped by Team B/C
  ##' and stored in a table (identifier: `hsfclmap4`) that will extend the
  ##' original mapping table. *[Note: for reference, the actual name of the
  ##' initial mapping table is `hsfclmap3`; the naming convention of these
  ##' tables should probably be made more logical or, at least, more easily
  ##' identifiable.]* The resulting mapping table gets subsetted with the
  ##' condition that the`startyear` and `endyear` of the HS to FCL links
  ##' should satisfy the condition: $startyear <= year <= endyear$.

  flog.trace("[%s] Reading in 'hsfclmap5' datatable", PID, name = "dev")
  if ((is.null(swsContext.computationParams$rdsfile) || !swsContext.computationParams$rdsfile) && !faosws::CheckDebug()) {
    hsfclmap3 <- readRDS(file.path(Sys.getenv("R_SWS_SHARE_PATH"), "trade/datatables/hsfclmap5.rds"))
  } else {
    hsfclmap3 <- ReadDatatable('hsfclmap5')
  }

  stopifnot(nrow(hsfclmap3) > 0)
  ##@ }

  ##' - `force_mirroring`: Datatables for those reported that need to be
  ##' treated as non-reporters as mirroring is required.

  flog.trace("[%s] Reading in 'force_mirroring' datatable", PID, name = "dev")
  force_mirroring <- ReadDatatable('force_mirroring')

  stopifnot(nrow(force_mirroring) > 0)

  ##' - `corrections_table`: Table with corrections applied during the
  ##' validation process.

  flog.trace("[%s] Reading in corrections dataset", PID, name = "dev")
  corrections_dir <-
    file.path(Sys.getenv('R_SWS_SHARE_PATH'), 'trade/validation_tool_files')

  # Corrections are stored into single-country files
  corrections_table_all <-
    lapply(
      file.path(dir(corrections_dir, pattern = '^[0-9]+$', full.names = TRUE),
                'corrections_table.rds'),
      readRDS
    ) %>%
    bind_rows()

  # Check whether the folder where the unapplied corrections
  # are going to be stored exists, if not then create it
  if (!file.exists(file.path(corrections_dir, 'unapplied'))) {
    dir.create(file.path(corrections_dir, 'unapplied'), recursive = TRUE)
  }

}

# (unsdpartnersblocks is required in pre-processing)
#
##' - `unsdpartnersblocks`: UNSD Tariffline reporter and partner dimensions use
##' different list of geographic are codes. The partner dimension is more
##' detailed than the reporter dimension. Since we can not split trade flows of
##' the reporter dimension, trade flows of the corresponding partner dimensions
##' have to be assigned the reporter dimension's geographic area code. For
##' example, the code 842 is used for the United States includes Virgin Islands
##' and Puerto Rico and thus the reported trade flows of those territories.
##' Analogous steps are taken for France, Italy, Norway, Switzerland and US
##' Minor Outlying Islands.

flog.trace("[%s] Reading in unsdpartnersblocks datatable", PID, name = "dev")
unsdpartnersblocks <- ReadDatatable('unsdpartnersblocks')

stopifnot(nrow(unsdpartnersblocks) > 0)

##' # Download raw data and basic operations

##' 1. Download Eurostat data (ES) available in
##' `trade-input-data:ce_combinednomenclature_unlogged_YEAR`
##' datatables (label: "EU Commission - Combined Nomenclature YEAR").

flog.trace("[%s] Reading in Eurostat data", PID, name = "dev")

if ((is.null(swsContext.computationParams$rdsfile) || !swsContext.computationParams$rdsfile) && !faosws::CheckDebug()) {

  local_esdata_file <-
    paste0(Sys.getenv("R_SWS_SHARE_PATH"),
           "/trade/datatables/ce_combinednomenclature_unlogged_", year, ".rds")

  if (!file.exists(local_esdata_file)) {
    stop("Local esdata file does not exist.")
  }

  chapters <- c(1:24, 33, 35, 38, 40:41, 43, 50:53) %>%
    formatC(width = 2, format = "d", flag = "0")

  esdata <- readRDS(local_esdata_file)

  if (!("chapter" %in% names(esdata))) {
    esdata[, chapter := substr(product_nc, 1, 2)]
  }

  esdata <- esdata[chapter %in% chapters]

} else {
  esdata <- ReadDatatable(
    paste0("ce_combinednomenclature_unlogged_", year),
    columns = c(
      "period",
      "declarant",
      "partner",
      "flow",
      "product_nc",
      "value_1k_euro",
      "qty_ton",
      "sup_quantity",
      "stat_regime"
    ),
    where = paste0("chapter IN (", hs_chapters, ")")
  )
}

stopifnot(nrow(esdata) > 0)

# Sample, if required

if (!is.null(samplesize)) {
  esdata <- sample_n(esdata, samplesize)
  warning(sprintf("Eurostat data was sampled with size %d", samplesize))
}

flog.info("Raw Eurostat data preview:", rprt_glimpse0(esdata), capture = TRUE)

##' 1. Download Tariff line data (TL) available in
##' `trade-input-data:ct_tariffline_unlogged_YEAR`
##' datatables (label: "UNSD Tariffline YEAR").

flog.trace("[%s] Reading in Tariffline data", PID, name = "dev")

if ((is.null(swsContext.computationParams$rdsfile) || !swsContext.computationParams$rdsfile) & !faosws::CheckDebug()) {

  local_tldata_file <-
    paste0(Sys.getenv("R_SWS_SHARE_PATH"),
           "/trade/datatables/ct_tariffline_unlogged_", year, ".rds")

  if (!file.exists(local_tldata_file)) {
    stop("Local tldata file does not exist.")
  }

  chapters <- c(1:24, 33, 35, 38, 40:41, 43, 50:53) %>%
    formatC(width = 2, format = "d", flag = "0")

  tldata <- readRDS(local_tldata_file)
  tldata <- setDT(tldata)
  tldata <- tldata[chapter %in% chapters]

} else {
  tldata <- ReadDatatable(
    paste0("ct_tariffline_unlogged_", year),
    columns = c(
      "tyear",
      "rep",
      "prt",
      "flow",
      "comm",
      "tvalue",
      "weight",
      "qty",
      "qunit",
      "chapter"
    ),
    where = paste0("chapter IN (", hs_chapters, ")")
  )
}

stopifnot(nrow(tldata) > 0)

# Bridge between the "legacy" and new UNSD data format: there are new units
# that should be take into account, but this still needs to be done. For
# now, convert "easy" units (1,000 kilos and grams), and set the remaining
# ones to NA, so that they are estimated with the UV.
# NOTE: this is done only for a subset of countries. These countries are
# those for which the new measurement units would have a huge impact as
# weight is missing (e.g. USA).


# use_new_data_format <-
#   ReadDatatable('ess_trade_use_new_unsd_format', where = paste("year =", year))

# if (nrow(use_new_data_format) > 0) {

flog.trace("[%s] Patching Tariffline data", PID, name = "dev")

if ((is.null(swsContext.computationParams$rdsfile) || !swsContext.computationParams$rdsfile) & !faosws::CheckDebug()) {

  local_tldata_file_patch <-
    paste0(Sys.getenv("R_SWS_SHARE_PATH"),
           "/trade/datatables/ct_tariffline_unlogged_", year, "_PATCH.rds")

  if (!file.exists(local_tldata_file_patch)) {
    stop("Local tldata file does not exist.")
  }

  tldata_patch <- readRDS(local_tldata_file_patch)

} else {

  tldata_patch <-
    ReadDatatable(
      table = paste0("ct_tariffline_v2_unlogged_", year),
      columns =
        c(
          "refperiodid",
          "reportercode",
          "partnercode",
          "flowcode",
          "cmdcode",
          "primaryvalue",
          "netwgt",
          "qty",
          "qtyunitcode",
          "chapter"
        ),
      where = paste0("chapter IN (", hs_chapters, ") AND reportercode IN (",
                     paste(shQuote(use_new_data_format$area, type = 'sh'),
                           collapse = ', '), ")")
    )

  tldata_patch <-
    tldata_patch[,
                 .(tyear = substr(refperiodid, 1, 4), rep = as.character(reportercode),
                   flow = ifelse(grepl('M', flowcode), '1', '2'), comm = cmdcode,
                   prt = as.character(partnercode), tvalue = primaryvalue,
                   weight = netwgt, qty, qunit = as.character(qtyunitcode), chapter
                 )
                 ]

  tldata_patch[qunit == -1, qunit := '1']

  # The new measurement units that are easy to convert are:
  # 15, "Weight in grams"
  # 21, "Weight in thousand of kilograms"
  tldata_patch[qunit == 15 & (is.na(weight) | dplyr::near(weight, 0)), weight := qty / 1000]
  tldata_patch[qunit == 15, `:=`(qty = weight * 1000, qunit = '8')]

  tldata_patch[qunit == 21 & (is.na(weight) | dplyr::near(weight, 0)), weight := qty * 1000]
  tldata_patch[qunit == 21, `:=`(qty = weight / 1000, qunit = '8')]

  # For the remaining units, we set them to NA
  tldata_patch[
    as.numeric(qunit) > 13 & (is.na(weight) | dplyr::near(weight, 0)),
    `:=`(qty = NA_real_, weight = NA_real_, qunit = '1')
    ]

  tldata_patch[qunit > 13 & !(is.na(weight) | dplyr::near(weight, 0)), `:=`(qty = weight, qunit = '8')]
}

message("TRADE: original data ", nrow(tldata))

# tldata <- tldata[!(rep %in% use_new_data_format$area)]
tldata_patch_countries <- unique(tldata_patch$rep)
tldata <- tldata[rep %!in% tldata_patch_countries, ]

message("TRADE: unpatched data ", nrow(tldata))

tldata <- rbind(tldata, tldata_patch, fill = TRUE)

message("TRADE: patched data ", nrow(tldata))
# }

###### National data, in UNSD tariffline legacy format
#
# VENEZUELA
national_data_tariffline <-
  ReadDatatable("national_tariffline_ven", where = paste0("tyear IN ('", year, "')"))

if (nrow(national_data_tariffline) > 0) {
  national_data_tariffline <- national_data_tariffline[, names(tldata), with = FALSE]

  # This should't really happen, but let's check.
  if (any(unique(national_data_tariffline$rep) %in% tldata$rep)) {
    stop("Duplicated country")
  }

  tldata <- rbind(tldata, national_data_tariffline)
}

#### Fix zero weights:
# These cannot be anything but NAs
tldata[near(weight, 0) & is.na(qty) & qunit == 1, weight := NA_real_]
# Likely actual NAs
tldata[near(weight, 0) & ((qty >= 10 & tvalue >= 10) | qty > 100 | tvalue > 1000), weight := NA_real_]
# Remaining cases should be analysed.


# Sample, if required

if (!is.null(samplesize)) {
  tldata <- sample_n(tldata, samplesize)
  warning(sprintf("Tariffline data was sampled with size %d", samplesize))
}

flog.info("Raw Tariffline data preview:", rprt_glimpse0(tldata), capture = TRUE)

##' 1. Keep only `stat_regime` = 4 in ES.

##' 1. Remove European-aggregated data (i.e., totals) from ES.

# * Only regime 4 is relevant for Eurostat data
# * Remove totals, 1010 = 'European Union', 1011 = 'Extra-European Union', see
#   http://ec.europa.eu/eurostat/documents/3859598/5889816/KS-BM-05-002-EN.PDF
# * Removing stat_regime as it is not needed anymore
esdata <-
  esdata[
    stat_regime == "4" & !(declarant == 'EU' | partner %in% c('1010', '1011'))
    ][,
      stat_regime := NULL
      ]

flog.info("Records after removing 4th regime and EU totals: %s", nrow(esdata))

##' 1. Use standard (common) variable names (e.g., `declarant` becomes `reporter`) in ES and TL.

adaptTradeDataNames(esdata)
adaptTradeDataNames(tldata)

esdata[, hs6 := as.integer(str_sub(hs, 1, 6))]
tldata[, hs6 := as.integer(str_sub(hs, 1, 6))]

##' 1. Filter HS codes of interest, i.e., codes that do not
##' participate in further processing. Such solution drops,
##' e.g., all HS codes shorter than 6 digits.

esdata <- filterHS6FAOinterest(esdata)
tldata <- filterHS6FAOinterest(tldata)


##' 1. Remove non numeric reporters / partners / hs codes from ES and TL.

esdata <- removeNonNumeric(esdata)
tldata <- removeNonNumeric(tldata)

##' 1. Use standard (common) variable types in ES and TL.

adaptTradeDataTypes(esdata)
adaptTradeDataTypes(tldata)


flog.trace("[%s] TL: removing zero-value and zero-weight and zero|missing qty", PID, name = "dev")
# Nothing can be done about these.
tldata <- tldata[!(near(value, 0) & near(weight, 0) & (near(qty, 0) | is.na(qty)))]

esdata <- tbl_df(esdata)
tldata <- tbl_df(tldata)

##' 1. Apply specific HS corrections. Some HS codes in some countries
##' need specific HS corrections. As on 2018-03-08 only a subset of
##' HS codes for a given TL reporter are corrected (tonnes were reported
##' instead of kilograms, so raw data was multiplied by 1000).

# XXX As of 20171130 this is relevant only for a given reporter in
# a given year, but in the future it should probably be generalised

# esdata <- specificCorrectionsHS(esdata)
tldata <- specificCorrectionsHS(tldata)

##' 1. Convert ES geonomenclature country/area codes to FAO codes.

##+ geonom2fao
esdata <- esdata %>%
  dplyr::mutate(
    reporter = convertGeonom2FAO(reporter),
    partner  = convertGeonom2FAO(partner)
  ) %>%
  # XXX issue 147
  dplyr::filter(!is.na(partner))


# M49 to FAO area list ####

##' 1. TL M49 codes (which are different from official M49) are
##' converted in FAO country codes using a specific conversion
##' table (`unsdpartnersblocks`) provided by Team ENV. Then,
##' these M49 codes are converted to FAO codes.

flog.trace("[%s] TL: converting M49 to FAO area list", PID, name = "dev")

tldata <- tldata %>%
  left_join(
    unsdpartnersblocks %>%
      select_(
        wholepartner = ~unsdpb_rtcode,
        part         = ~unsdpb_formula
      ) %>%
      dplyr::mutate(
        wholepartner = as.numeric(wholepartner),
        part         = as.numeric(part)
      ) %>%
      # Exclude EU grouping and old countries
      filter_(
        ~wholepartner %in% c(251, 381, 579, 581, 757, 842)
      ),
    by = c("partner" = "part")
  ) %>%
  dplyr::mutate_(
    partner  = ~ifelse(is.na(wholepartner), partner, wholepartner),
    m49rep   = ~reporter,
    m49par   = ~partner,
    # Conversion from Comtrade M49 to FAO area list
    reporter = ~as.integer(convertComtradeM49ToFAO(m49rep)),
    partner  = ~as.integer(convertComtradeM49ToFAO(m49par))
  )

##' 1. Remove invalid reporters (i.e., keep countries/areas that
##' existed in the year considered).

tldata <- removeInvalidReporters(tldata)

##' 1. Force mirroring, i.e., remove countries that appear as official
##' reporters so that the mirroring procedure will estimate their data.

esdata <-
  anti_join(
    esdata,
    force_mirroring,
    by = c('reporter' = 'fao_code', 'year')
  )

tldata <-
  anti_join(
    tldata,
    force_mirroring,
    by = c('reporter' = 'fao_code', 'year')
  )

##' 1. Remove ES reporters from TL.

flog.trace("[%s] TL: dropping reporters already found in Eurostat data", PID, name = "dev")

#### UNITED KINGTOM ISSUE -- use UNSD data instead of EUROSTAT####
if (year >= 2020L){
  setDT(esdata)
  esdata <- esdata[reporter != '229', ]
  esdata <- tbl_df(esdata)
}

# They will be replaced by ES data
tldata <- tldata %>%
  anti_join(
    esdata %>%
      select_(~reporter) %>%
      distinct(),
    by = "reporter"
  )

# XXX create all reporters

tldata_rep_table <- tldata %>%
  dplyr::select(reporter, flow) %>%
  distinct() %>%
  dplyr::mutate(name = faoAreaName(reporter, "fao"))

# XXX: bring back
#rprt_writetable(tldata_rep_table, subdir = 'preproc')

# XXX this is a duplication: a function should be created.
to_mirror_raw <- bind_rows(
  dplyr::select(esdata, year, reporter, partner, flow),
  dplyr::select(tldata, year, reporter, partner, flow)
) %>%
  dplyr::mutate(flow = recode(flow, '4' = 1L, '3' = 2L)) %>%
  flowsToMirror(names = TRUE)

# XXX: bring back
#rprt_writetable(to_mirror_raw, 'flows', subdir = 'preproc')

if (only_pre_process) stop("Stop after reports on raw data")

##@ if (trademap_year_available == FALSE) {

##' 1. Apply explicit corrections to the HS-FCL mapping.

#data("hsfclmap3", package = "hsfclmap", envir = environment())
# NOTE: it is pulling now v5
# FCL, startyear, endyear codes can be overwritten by corrections
hsfclmap3 <- setDT(hsfclmap3)
hsfclmap3[!is.na(correction_fcl), fcl := as.numeric(correction_fcl)]

hsfclmap3[!is.na(correction_startyear), startyear := correction_startyear]

hsfclmap3[!is.na(correction_endyear), endyear := correction_endyear]

hsfclmap3[, grep('correction', names(hsfclmap3)) := NULL]

##' 1. Extend the `endyear` for those combinations of `area` / `flow` /
##' `fromcode` / `tocode` for which `endyear` < `year`.

# Extend endyear to 2050
hsfclmap3[, maxy := max(endyear), .(area, flow, fromcode, tocode)]

hsfclmap3[, extend := ifelse(maxy < 2050, TRUE, FALSE), .(area, flow, fromcode, tocode)]

hsfclmap3[endyear == maxy & extend, endyear := 2050]

hsfclmap3[, c("maxy", "extend") := NULL]
# / Extend endyear to 2050


# ADD UNMAPPED CODES

# Check that all FCL codes are valid

fcl_diff <- setdiff(unique(add_map$fcl), fcl_codes)

fcl_diff <- fcl_diff[!is.na(fcl_diff)]

fcl_diff <- setdiff(fcl_diff, 0)

if (length(fcl_diff) > 0) {
  warning(paste('Invalid FCL codes:', paste(fcl_diff, collapse = ', ')))
}

# Check that years are in a valid range

if (min(add_map$year) < 2000) {
  warning('The minimum year should not be lower than 2000.')
}

if (max(add_map$year) > as.numeric(format(Sys.Date(), '%Y'))) {
  warning('The maximum year should not be greater than the current year.')
}

# Check that there are no duplicate codes

if (nrow(add_map[, .N, .(reporter_fao, year, flow, hs)][N > 1]) > 0) {
  warning('Removing duplicate HS codes by reporter/year/flow.')

  add_map[, `:=`(n = .N, hs_ext_perc = sum(!is.na(hs_extend))/.N), .(reporter_fao, year, flow, hs)]

  # Prefer cases where hs_extend is available
  add_map <- add_map[hs_ext_perc == 0 | (hs_ext_perc > 0 & !is.na(hs_extend) & n == 1)]

  add_map[, c("n", "hs_ext_perc") := NULL]
}

# Raise warning if countries were NOT in mapping.

if (length(setdiff(unique(add_map$reporter_fao), hsfclmap3$area)) > 0) {
  warning('Some countries were not in the original mapping.')
}

add_map <-
  add_map[,
          .(area = reporter_fao,
            flow,
            fromcode = gsub(' ', '', hs),
            tocode = gsub(' ', '', hs),
            fcl = as.numeric(fcl),
            startyear = year,
            endyear = 2050L,
            recordnumb = NA_integer_,
            area_name = reporter_name
          )
          ]

max_record <- max(hsfclmap3$recordnumb)

add_map$recordnumb <- (max_record + 1):(max_record + nrow(add_map))

##' 1. Add additional codes that were not present in the HS-FCL
##' original mapping file.

hsfclmap3 <- rbind(add_map, hsfclmap3)

hsfclmap3[, `:=`(startyear = as.integer(startyear), endyear = as.integer(endyear))]

# / ADD UNMAPPED CODES

# XXX: bring back
#flog.info("HS->FCL mapping table preview:",
#          rprt_glimpse0(hsfclmap3), capture = TRUE)

# XXX: bring back
#rprt(hsfclmap3, "hsfclmap", year)

##' 1. Keep HS-FCL links for which `startyear` <= `year` & `endyear` >= `year`

hsfclmap <- hsfclmap3[startyear <= year & endyear >= year]

hsfclmap3 <- tbl_df(hsfclmap3)

hsfclmap[, c("startyear", "endyear") := NULL]

# Workaround issue #123
hsfclmap[, fromgtto := as.numeric(fromcode) > as.numeric(tocode)]

from_gt_to <- hsfclmap$recordnumb[hsfclmap$fromgtto]

if (length(from_gt_to) > 0)
  flog.warn(paste0("In following records of hsfclmap fromcode greater than tocode: ",
                   paste0(from_gt_to, collapse = ", ")))

hsfclmap <- hsfclmap[fromgtto == FALSE][, fromgtto := NULL]

stopifnot(nrow(hsfclmap) > 0)

hsfclmap <- tbl_df(hsfclmap)

flog.info("Rows in mapping table after dplyr::filtering by year: %s", nrow(hsfclmap))

##' 1. Generate HS to FCL map at HS6 level (if switched on)

if (generate_hs6mapping) {

  # hs6fclmap ####

  flog.trace("[%s] Extraction of HS6 mapping table", PID, name = "dev")

  ##'     1. Universal (all years) HS6 mapping table.

  flog.trace("[%s] Universal (all years) HS6 mapping table", PID, name = "dev")

  hs6fclmap_full <- extract_hs6fclmap(hsfclmap3, parallel = multicore)

  ##'     1. Current year specific HS6 mapping table.

  flog.trace("[%s] Current year specific HS6 mapping table", PID, name = "dev")

  hs6fclmap_year <- extract_hs6fclmap(hsfclmap, parallel = multicore)

  hs6fclmap <- bind_rows(hs6fclmap_full, hs6fclmap_year) %>%
    filter_(~fcl_links == 1L) %>%
    distinct()

  reporters_not_in_mapping <-
    setdiff(
      (tldata %>% count(reporter))$reporter,
      unique(hs6fclmap$reporter)
    )

  reporters_not_in_mapping <- setdiff(reporters_not_in_mapping, '252')

  if (length(reporters_not_in_mapping) > 0) {
    # Fallback HS and HS6 maps for countries not in the mapping file,
    # or recently added. The fallback consists of the most common
    # HS6-FCL combination for all countries, and for HS dummy entries
    # (required, HS6 will be used)
    hs6fclmap_fallback <- as.data.table(hs6fclmap)[, .N, .(hs6, fcl)]
    hs6fclmap_fallback <- hs6fclmap_fallback[order(hs6, -N, fcl)]
    hs6fclmap_fallback <- hs6fclmap_fallback[, .SD[1], hs6][, N := NULL]

    hs6fclmap_fallback <-
      rbind(
        data.table(flow = 1, hs6fclmap_fallback, fcl_links = 1),
        data.table(flow = 2, hs6fclmap_fallback, fcl_links = 1)
      )

    hs6fclmap_fallback <- hs6fclmap_fallback[nchar(hs6) == 6]

    hs6fclmap_fallback <-
      data.table(
        reporter = rep(reporters_not_in_mapping, each = nrow(hs6fclmap_fallback)),
        hs6fclmap_fallback
      )

    hs6fclmap <- tbl_df(rbind(hs6fclmap, hs6fclmap_fallback))

    hsfclmap3_fallback <-
      hsfclmap3[1:2, ] %>%
      mutate(
        fromcode = '000001', tocode = '000001',
        fcl = 0,
        startyear = year, endyear = 2050,
        area_name = "",
        flow = 1:2
      )

    hsfclmap3_fallback <-
      rbindlist(
        lapply(
          reporters_not_in_mapping,
          function(x) mutate(hsfclmap3_fallback, area = x)
        )
      )

    hsfclmap3_fallback <- tbl_df(hsfclmap3_fallback)

    hsfclmap3_fallback <-
      mutate(
        hsfclmap3_fallback,
        recordnumb = max(hsfclmap3$recordnumb) + 1:nrow(hsfclmap3_fallback)
      )

    hsfclmap3 <- bind_rows(hsfclmap3, hsfclmap3_fallback)
  }

} else {

  # A dummy zero-row dataframe needs to be created
  hs6fclmap <-
    data_frame(
      reporter  = integer(),
      flow      = integer(),
      hs6       = integer(),
      fcl       = double(),
      fcl_links = integer()
    )
}

# XXX: bring back
#rprt(hs6fclmap, "hs6fclmap")

##@ }

##' # Specific operations on Eurostat data

##' 1. Add variables that will contain flags. (Note: flags are set in various
##' steps in the code. Please, refer to the "Flag Management in the Trade module"
##' document.)

esdata <- generateFlagVars(esdata)

esdata <- esdata %>%
  setFlag3(!is.na(value),  type = 'status', flag = 'Y', variable = 'value') %>%
  setFlag3(!is.na(weight), type = 'status', flag = 'Y', variable = 'weight') %>%
  setFlag3(!is.na(qty),    type = 'status', flag = 'Y', variable = 'quantity') %>%
  setFlag3(!is.na(value),  type = 'method', flag = 'h', variable = 'value') %>%
  setFlag3(!is.na(weight), type = 'method', flag = 'h', variable = 'weight') %>%
  setFlag3(!is.na(qty),    type = 'method', flag = 'h', variable = 'quantity')


##' 1. Remove in ES those reporters with area codes that are not included in
##' MDB commodity mapping area list.

flog.trace("[%s] ES: dropping reporters not found in the mapping table", PID, name = "dev")

if (trademap_year_available == FALSE) {
  ##+ es-treat-unmapped
  esdata_not_area_in_fcl_mapping <- esdata %>%
    filter_(~!(reporter %in% unique(hsfclmap$area)))

  esdata <- filter_(esdata, ~reporter %in% unique(hsfclmap$area))
} else {
  esdata_not_area_in_fcl_mapping <- esdata %>%
    filter_(~!(reporter %in% unique(trademap_year$area)))

  esdata <- filter_(esdata, ~reporter %in% unique(trademap_year$area))
}

# XXX: bring back
#rprt_writetable(esdata_not_area_in_fcl_mapping)


flog.info("ES records after removing areas not in HS->FCL map: %s", nrow(esdata))

setDT(esdata)

esdata[, map_src := NA_character_]

if (nrow(trademap_year) > 0) {
  esdata <-
    merge(
      esdata,
      trademap_year[, .(area, hs, flow, recordnumb = -1, fcl)],
      by.x = c("reporter", "hs", "flow"),
      by.y = c("area", "hs", "flow"),
      all.x = TRUE
    )

  esdata[recordnumb == -1, map_src := "previous"]
}

if (nrow(esdata[is.na(fcl)]) > 0) {

  # In case already mapped
  esdata_mapped <- esdata[!is.na(fcl)]

  # ES trade data mapping to FCL ####
  message(sprintf("[%s] Convert Eurostat HS to FCL", PID))

  ##' 1. Map HS codes to FCL.

  ##'     1. Extract HS6-FCL mapping table.

  message(sprintf("[%s] Convert Eurostat HS to FCL, HS6 LINKS", PID))

  esdatahs6links <- mapHS6toFCL(esdata, hs6fclmap)

  ##'     1. Extract specific HS-FCL mapping table.

  message(sprintf("[%s] Convert Eurostat HS to FCL, HS LINKS", PID))

  esdatalinks <- mapHS2FCL(tradedata   = esdata,
                           maptable    = hsfclmap3,
                           hs6maptable = hs6fclmap,
                           year        = year,
                           parallel    = multicore)

  ##'     1. Use HS6-FCL or HS-FCL mapping table.

  esdata <- esdata[is.na(fcl)]

  esdata[, c("recordnumb", "fcl") := NULL]

  esdata <- add_fcls_from_links(esdata,
                                hs6links = esdatahs6links,
                                links    = esdatalinks)

  ##'     1. Use HS6 standard for unmapped codes.

  esdata <- esdata %>%
    left_join(
      hs6standard %>% dplyr::select(-hs2012_code),
      by = 'hs6'
    ) %>%
    dplyr::mutate(
      fcl_pre = fcl,
      fcl     = ifelse(is.na(fcl_pre) & !is.na(faostat_code), faostat_code, fcl_pre),
      map_src = ifelse(is.na(fcl_pre) & !is.na(faostat_code), 'standard', map_src)
    ) %>%
    dplyr::select(-faostat_code, -fcl_pre)

  setDT(esdata)

  esdata <- rbind(esdata_mapped, esdata, fill = TRUE)

  esdata <- tbl_df(esdata)
}

esdata$fcl <- as.numeric(esdata$fcl)

flog.info("Records after HS-FCL mapping: %s", nrow(esdata))

# XXX: bring back
#rprt(esdata, "hs2fcl_fulldata", tradedataname = "esdata")

flog.trace("[%s] ES: dropping unmapped records", PID, name = "dev")

##' 1. Remove unmapped FCL codes (i.e., transactions with no HS to FCL link).

esdata_unmapped <- filter_(esdata, ~is.na(fcl))

esdata <- filter_(esdata, ~!(is.na(fcl)))

flog.info("ES records after removing non-mapped HS codes: %s", nrow(esdata))

##' 1. Add FCL units, i.e., the target units of measurement of the items
##' (e.g., tonnes, heads).

esdata <- addFCLunits(tradedata = esdata, fclunits = fclunits)

##' 1. Specific conversions: some FCL codes are reported in Eurostat
##' with different supplementary units than those reported in FAOSTAT,
##' thus a conversion is done.

## specific supplementary unit conversion
es_spec_conv <- frame_data(
  ~fcl, ~conv,
  1057L, 0.001,
  1068L, 0.001,
  1072L, 0.001,
  1079L, 0.001,
  1083L, 0.001,
  1140L, 0.001,
  1181L, 1000
)

esdata <- esdata %>%
  left_join(es_spec_conv, by = 'fcl') %>%
  dplyr::mutate_(qty = ~ifelse(is.na(conv), qty, qty*conv)) %>%
  setFlag3(!is.na(conv), type = 'method', flag = 'i', variable = 'quantity') %>%
  select_(~-conv)

##' # Specific operations on Tariff line data

##' 1. Do mathematical conversions on specific `qunit`s: 6 (pairs),
##' 9 (thousands), and 11 (dozens) become 5 (units), by multiplying
##' them by 2, 1000, and 12, respectively.

# Convert qunit 6, 9, and 11 to 5 (mathematical conversion)
setDT(tldata)
tldata[qunit ==  6, c('qty', 'qunit') := list(   qty*2, 5)]
tldata[qunit ==  9, c('qty', 'qunit') := list(qty*1000, 5)]
tldata[qunit == 11, c('qty', 'qunit') := list(  qty*12, 5)]

##+ reexptoexp ####
##' 1. Re-imports become imports and re-exports become exports.
flog.trace("[%s] TL: recoding reimport/reexport", PID, name = "dev")

# { "id": "1", "text": "Import" },
# { "id": "2", "text": "Export" },
# { "id": "4", "text": "re-Import" },
# { "id": "3", "text": "re-Export" }

tldata[flow == 4L, flow := 1L]
tldata[flow == 3L, flow := 2L]

# tl-aggregate-multiple-rows ####

##' 1. Identical combinations of `reporter` / `partner` / `commodity` /
##' `flow` / `year` / `qunit` are pre-aggregated.

flog.trace("[%s] TL: aggregation of similar flows", PID, name = "dev")

tldata <- preAggregateMultipleTLRows(tldata)

tldata <- tbl_df(tldata)

##' 1. Add variables that will contain flags. (Note: flags are set in various
##' steps in the code. Please, refer to the "Flag Management in the Trade module"
##' document.)

flog.trace("[%s] TL: add flag variables")
tldata <- generateFlagVars(tldata)

tldata <- tldata %>%
  setFlag3(!is.na(value),    type = 'status', flag = 'Y', variable = 'value') %>%
  setFlag3(!is.na(weight),   type = 'status', flag = 'Y', variable = 'weight') %>%
  setFlag3(!is.na(qty),      type = 'status', flag = 'Y', variable = 'quantity') %>%
  setFlag3(!is.na(value),    type = 'method', flag = 'h', variable = 'value') %>%
  setFlag3(!is.na(weight),   type = 'method', flag = 'h', variable = 'weight') %>%
  setFlag3(!is.na(qty),      type = 'method', flag = 'h', variable = 'quantity') %>%
  setFlag3(nrows > 1,        type = 'method', flag = 's', variable = 'all') %>%
  setFlag3(imputed_puv > 0,  type = 'status', flag = 'I', variable = 'weight') %>%
  setFlag3(imputed_qbyw > 0, type = 'status', flag = 'I', variable = 'quantity') %>%
  # NOTE: method is i as it is an identity, not a statistical method
  setFlag3(imputed_puv > 0,  type = 'method', flag = 'i', variable = 'weight') %>%
  setFlag3(imputed_qbyw > 0, type = 'method', flag = 'i', variable = 'quantity')


##+ drop_reps_not_in_mdb ####

##' 1. Area codes not mapping to any FAO country in the HS to FCL mapping
##' codes are removed.

# We drop reporters that are absent in MDB hsfcl map
# because in any case we can proceed their data

flog.trace("[%s] TL: dropping reporters not found in the mapping table", PID, name = "dev")

if (trademap_year_available == FALSE) {
  avail_rep_list <- unique(c(hsfclmap$area, hs6fclmap$reporter))

  tldata_not_area_in_fcl_mapping <- tldata %>%
    filter_(~!(reporter %in% avail_rep_list))

  tldata <- filter_(tldata, ~reporter %in% avail_rep_list)
} else {
  avail_rep_list <- unique(c(trademap_year$area, hs6fclmap$reporter))

  tldata_not_area_in_fcl_mapping <- tldata %>%
    filter_(~!(reporter %in% avail_rep_list))

  tldata <- filter_(tldata, ~reporter %in% avail_rep_list)
}

# 252 is fine, it's "Unspecified"
tldata <- filter_(tldata, ~!reporter %in% 252)

# XXX: bring back
#rprt_writetable(tldata_not_area_in_fcl_mapping)

flog.info("TL records after removing areas not in HS->FCL map: %s", nrow(tldata))

setDT(tldata)

tldata[, map_src := NA_character_]

if (nrow(trademap_year) > 0) {
  tldata <-
    merge(
      tldata,
      trademap_year[, .(area, hs, flow, recordnumb = -1, fcl)],
      by.x = c("reporter", "hs", "flow"),
      by.y = c("area", "hs", "flow"),
      all.x = TRUE
    )

  tldata[recordnumb == -1, map_src := "previous"]
}

if (nrow(tldata[is.na(fcl)]) > 0) {

  # In case already mapped
  tldata_mapped <- tldata[!is.na(fcl)]

  ##' 1. Map HS codes to FCL.
  ##+ tl_hs2fcl ####


  ##'     1. Extract HS6-FCL mapping table.

  message(sprintf("[%s] Convert UNSD HS to FCL, HS6 LINKS", PID))

  tldatahs6links <- mapHS6toFCL(tldata, hs6fclmap)

  ##'     1. Extract specific HS-FCL mapping table.

  message(sprintf("[%s] Convert UNSD HS to FCL, HS LINKS", PID))

  tldatalinks <- mapHS2FCL(tradedata   = tldata,
                           maptable    = hsfclmap3,
                           hs6maptable = hs6fclmap,
                           year        = year,
                           parallel    = multicore)

  ##'     1. Use HS6-FCL or HS-FCL mapping table.

  tldata <- tldata[is.na(fcl)]

  tldata[, c("recordnumb", "fcl") := NULL]

  tldata <- add_fcls_from_links(tldata,
                                hs6links = tldatahs6links,
                                links    = tldatalinks)

  ##'     1. Use HS6 starndard for unmapped codes.

  tldata <- tldata %>%
    left_join(
      hs6standard %>% dplyr::select(-hs2012_code),
      by = 'hs6'
    ) %>%
    dplyr::mutate(
      fcl_pre = fcl,
      fcl     = ifelse(is.na(fcl_pre) & !is.na(faostat_code), faostat_code, fcl_pre),
      map_src = ifelse(is.na(fcl_pre) & !is.na(faostat_code), 'standard', map_src)
    ) %>%
    dplyr::select(-faostat_code, -fcl_pre)

  setDT(tldata)

  tldata <- rbind(tldata_mapped, tldata, fill = TRUE)

  tldata <- tbl_df(tldata)
}

tldata$fcl <- as.numeric(tldata$fcl)

flog.info("Records after HS-FCL mapping: %s", nrow(tldata))

# XXX: bring back
#rprt(tldata, "hs2fcl_fulldata", tradedataname = "tldata")

flog.trace("[%s] TL: dropping unmapped records", PID, name = "dev")

##' 1. Remove unmapped FCL codes (i.e., transactions with no HS to FCL link).

tldata_unmapped <- filter_(tldata, ~is.na(fcl))

tldata <- filter_(tldata, ~!is.na(fcl))

flog.info("TL records after removing non-mapped HS codes: %s", nrow(tldata))


if (any(nrow(esdata_unmapped) > 0, nrow(tldata_unmapped) > 0)) {
  # Generate unmapped trademap
  unmapped_trademap <-
    dplyr::bind_rows(
      dplyr::select(esdata_unmapped, year, reporter, flow, hs),
      dplyr::select(tldata_unmapped, year, reporter, flow, hs)
    ) %>%
    dplyr::distinct() %>%
    setDT()

  unmapped_trademap <-
    unmapped_trademap[,
                      .(year, area = as.character(reporter), flow, hs,
                        cpc = NA_character_, fcl = NA_character_, map_src = NA_character_,
                        hs_description = NA_character_, cpc_description = NA_character_,
                        notes = NA_character_, hs6 = substr(hs, 1, 6)
                      )
                      ]

  unmapped_trademap[, geographicAreaM49 := faoswsUtil::fs2m49(area)]

  # HS descriptions (XXX: this is repeated somewhere below)
  tmp <- RJSONIO::fromJSON(HS_DESCR)

  stopifnot(length(tmp) > 0)

  hs6_descr <-
    data.table(
      hs = sapply(tmp$results, function(x) x[['id']]),
      hs6_description = lapply(tmp$results, function(x) x[['text']])
    )[
      nchar(hs) == 6,
      .(hs6 = hs, hs6_description = str_replace(hs6_description, '^.* - *', ''))
      ]

  rm(tmp)
  # / HS descriptions

  # Extract the most common FCL for HS6. Only on tldata, as this is the one
  # likely unmapped, though something different could be done for esdata.
  common_fcl_hs6 <-
    tldata %>%
    dplyr::count(hs6 = substr(hs, 1, 6), suggested_fcl = fcl) %>%
    dplyr::arrange(-n) %>%
    dplyr::slice(1) %>%
    dplyr::ungroup() %>%
    dplyr::select(-n) %>%
    dplyr::left_join(hs6_descr, by = 'hs6') %>%
    setDT()

  unmapped_trademap <-
    merge(
      unmapped_trademap,
      common_fcl_hs6,
      by = 'hs6',
      all.x = TRUE
    )

  unmapped_trademap[!is.na(hs6_description), hs_description := hs6_description]

  unmapped_trademap[, c('area', 'hs6', 'hs6_description') := NULL]

  if (nrow(unmapped_trademap[is.na(suggested_fcl)]) != nrow(unmapped_trademap)){
    unmapped_trademap[
      !is.na(suggested_fcl),
      measuredItemCPC :=
        fcl2cpc(stringr::str_pad(suggested_fcl, 4, 'left', 0), version = "2.1")
      ]

  } else {
    unmapped_trademap[ , measuredItemCPC := NA]
  }


  unmapped_trademap <-
    nameData('trade', 'total_trade_cpc_m49', unmapped_trademap)

  setnames(
    unmapped_trademap,
    c('measuredItemCPC', 'measuredItemCPC_description',
      'geographicAreaM49', 'geographicAreaM49_description'),
    c('suggested_cpc', 'suggested_cpc_description',
      'area', 'country_name')
  )

  # So that the leading 0 doesn't disappear in Excel.
  # TODO: An actual Excel file can be sent, with HS as char.
  unmapped_trademap[, hs := paste0("'", hs)]
  unmapped_trademap[, suggested_cpc := paste0("'", suggested_cpc)]

  unmapped_csv_filename <-
    tempfile(pattern = "unmapped_codes_", fileext = ".csv")

  write.csv(unmapped_trademap, unmapped_csv_filename, row.names = FALSE)

  if (!CheckDebug()) {
    send_mail(
      from    = "SWS-trade-module@fao.org",
      to      = EMAIL_RECIPIENTS,
      subject = paste0("Trade plugin: unmapped codes, year ", year),
      body    = c(paste("Once all CPC/FCL codes are added, append the file to the ESS trademap", year, "datatable on SWS. Before doing so (and AFTER the mappping of unmapped codes is done), open the CSV file with a text editor and remove all the ' characters in front of hs and cpc (those were added so that leading zeros do not disappear). Also, remove all columns after 'notes'. Avoid inserting accented characters in the free text fields in this file: not doing so may make SWS angry."), unmapped_csv_filename),
      remove  = TRUE
    )
  }

}

flog.trace("[%s] Saving binary file with unique mapped codes", PID, name = "dev")

rawdata <-
  bind_rows(mutate(esdata, nrows = 1), tldata) %>%
  dplyr::mutate(
    reporter = fs2m49(as.character(reporter)),
    partner  = fs2m49(as.character(partner)),
    cpc      = fcl2cpc(sprintf("%04d", fcl), version = "2.1")
  ) %>%
  dplyr::select(year, reporter, partner, flow, hs, cpc, fcl, qunit,
                value, weight, qty, map_src, recordnumb, nrows) %>%
  group_by(reporter, partner, flow, hs, cpc, fcl, qunit) %>%
  dplyr::mutate(n = sum(nrows)) %>%
  ungroup() %>%
  setDT()

raw_dir <-
  file.path(
    Sys.getenv("R_SWS_SHARE_PATH"),
    paste0("trade/validation_tool_files/rawtrade/", year)
  )

if (!file.exists(raw_dir)) {
  dir.create(raw_dir)
}

for (i in unique(rawdata$reporter)) {
  saveRDS(rawdata[reporter == i], file.path(raw_dir, paste0(i, ".rds")))
}

# September 2022 update: the new_harvest variable will help us to check if any new country
# has been added into the raw data. If so, the SWS trademap datatable needs to updated
# to allow the visualisation and perform changes, if needed. It also keep the previous
# Manual corrections.

if (length(unique(rawdata$reporter)) > length(unique(trademap_year$m49))){
  new_harvest <- TRUE
} else {
  new_harvest <- FALSE
}

if (trademap_year_available == FALSE | new_harvest == TRUE) {
  # The result will be the final trademap

  # HS descriptions
  tmp <- RJSONIO::fromJSON(HS_DESCR)

  stopifnot(length(tmp) > 0)

  hs6_descr <-
    data.table(
      hs = sapply(tmp$results, function(x) x[['id']]),
      description = lapply(tmp$results, function(x) x[['text']])
    )[
      nchar(hs) == 6,
      .(hs6 = hs, description = str_replace(description, '^.* - *', ''))
      ]

  rm(tmp)
  # / HS descriptions

  # `reporter` is not accepted as colname in SWS datatables...
  trademap_year_new <-
    unique(
      rawdata[,
              .(year, area = reporter, flow, hs, cpc, fcl, map_src,
                hs_description = NA_character_,
                cpc_description = NA_character_,
                notes = NA_character_)
              ]
    )

  orig_names <- copy(names(trademap_year_new))

  trademap_year_new[, fcl := as.character(fcl)]

  trademap_year_new[, `:=`(measuredItemCPC = cpc, hs6 = substr(hs, 1, 6))]

  trademap_year_new <- merge(trademap_year_new, hs6_descr, by = "hs6", all.x = TRUE)

  trademap_year_new[, hs_description := description]

  trademap_year_new <- faoswsUtil::nameData("trade", "completed_tf_cpc_m49", trademap_year_new)

  trademap_year_new[, cpc_description := measuredItemCPC_description]

  trademap_year_new <- trademap_year_new[, orig_names, with = FALSE]

  if (trademap_year_available == FALSE) {
    changeset <- Changeset(paste0("ess_trademap_", year))

    AddInsertions(changeset, trademap_year_new)

    Finalise(changeset)
  } else {

    manual_correction <- trademap_year[grepl('manu', map_src, ignore.case=TRUE), ]
    manual_correction[, area:= NULL]
    setnames(manual_correction, 'm49', 'area')
    manual_correction_short <- manual_correction[, c("area", "year", "flow", "hs"), with = FALSE]
    manual_correction_short[, delete:= TRUE]

    trademap_year_new2 <- merge(trademap_year_new, manual_correction_short, by = c("area", "year", "flow", "hs"),
                               all= TRUE)
    trademap_year_new2 <- trademap_year_new2[is.na(delete),]
    trademap_year_new2[, delete:= NULL]
    trademap_year_new <- rbind(trademap_year_new2, manual_correction)

    changeset <- Changeset(paste0("ess_trademap_", year))

    olddat <- ReadDatatable(paste0("ess_trademap_", year), readOnly = FALSE)

    AddDeletions(changeset, olddat)

    Finalise(changeset)

    ## Add
    AddInsertions(changeset, trademap_year_new)
    Finalise(changeset)
  }

  # NOTE: now the codes saved are the ones actually mapped. An option can be
  # to APPEND new mapped codes to previous map, but see below:

  ## XXX: This will APPEND new mapped codes with respect to previous
  ## year map, thus it will potentially explode at some point (in the
  ## sense that there are going to be a lot of unnecessary links)
  #trademap_year <-
  #  rbind(
  #    trademap_year,
  #    trademap_year_new[
  #      !trademap_year_prev[,
  #        .(area, flow, hs, cpc, fcl)],
  #      on = c("area", "flow", "hs", "cpc", "fcl")
  #    ]
  #  )
}

#all_unique_data_mapped <-
#  bind_rows(esdata, tldata) %>%
#  dplyr::select(year, rep_fao = reporter, flow, hs, fcl) %>%
#  distinct() %>%
#  dplyr::mutate(
#    geographicAreaM49 = fs2m49(as.character(rep_fao)),
#    measuredItemCPC   = fcl2cpc(sprintf("%04d", fcl), version = "2.1")
#  ) %>%
#  nameData("trade", "total_trade_cpc_m49", .) %>%
#  dplyr::select(
#    year,
#    reporter = geographicAreaM49_description,
#    rep_m49  = geographicAreaM49,
#    everything()
#  )
#
#saveRDS(all_unique_data_mapped,
#        file.path(reportdir, "datadir/all_unique_data_mapped.rds"))

if (stop_after_mapping) stop("Stop after HS->FCL mapping")

############# Units of measurment in TL ####

##' 1. Add FCL units, i.e., the target units of measurement of the items
##' (e.g., tonnes, heads).

flog.trace("[%s] TL: add FCL units", PID, name = "dev")

tldata <- addFCLunits(tldata, fclunits = fclunits)

tldata <- tldata %>%
  dplyr::mutate_(qunit = ~as.integer(qunit)) %>%
  left_join(comtradeunits %>% select_(~qunit, ~wco), by = "qunit")

## Dataset with all matches between Comtrade and FAO units
ctfclunitsconv <- tldata %>%
  select_(~qunit, ~wco, ~fclunit) %>%
  distinct() %>%
  arrange_(~qunit) %>%
  as.data.table()

################ Conv. factor (TL) ################
flog.trace("[%s] TL: conversion factors", PID, name = "dev")

##### Table for conv. factor

##' 1. General conversions: some FCL codes are reported in Tariffline with
##' different units than those reported in FAOSTAT, thus a conversion is done.

ctfclunitsconv$conv <- 0
# Missing quantity
ctfclunitsconv[qunit == 1,                               conv :=   NA]
# Missing quantity
ctfclunitsconv[fclunit == "$ value only",                conv :=   NA]
ctfclunitsconv[fclunit == "mt"         & wco == "l",     conv := .001]
ctfclunitsconv[fclunit == "heads"      & wco == "u",     conv :=    1]
ctfclunitsconv[fclunit == "1000 heads" & wco == "u",     conv := .001]
ctfclunitsconv[fclunit == "number"     & wco == "u",     conv :=    1]
ctfclunitsconv[fclunit == "mt"         & wco == "kg",    conv := .001]
ctfclunitsconv[fclunit == "mt"         & wco == "m³",    conv :=    1]
ctfclunitsconv[fclunit == "mt"         & wco == "carat", conv := 5e-6]


##### Add conv factor to the dataset

tldata <- left_join(tldata, ctfclunitsconv, by = c("qunit", "wco", "fclunit"))

##' 1. Specific conversions: some FCL codes are reported in Tariff line
##' with different supplementary units than those reported in FAOSTAT,
##' thus a conversion is done.

#### Commodity specific conversion

# For converting qty to metric tons
fcl_spec_mt_conv <- tldata %>%
  filter_(~fclunit == "mt" & (is.na(weight) | near(weight, 0)) & conv == 0) %>%
  select_(~fcl, ~wco) %>%
  distinct()

# Reset conv to NA ad its zero is not useful anymore
tldata <- tldata %>%
  dplyr::mutate(conv = ifelse(conv == 0, NA, conv))

# For converting weight to 'heads', '1000 heads', 'number'.
fcl_spec_head_conv <- tldata %>%
  dplyr::select(fcl, fclunit) %>%
  distinct() %>%
  dplyr::filter(fclunit %in% c('heads', '1000 heads', 'number'))

# XXX probably this check should be removed.
if (NROW(fcl_spec_mt_conv) > 0) {

  # weight > mt
  conversion_factors_fcl_mt <- tldata %>%
    dplyr::filter(!is.na(weight) & !is.na(qty)) %>%
    dplyr::mutate(qw = (weight/qty)/1000) %>%
    group_by(fcl, wco) %>%
    dplyr::summarise(convspec_mt = median(qw, na.rm = TRUE)) %>%
    ungroup()

  fcl_spec_mt_conv <- fcl_spec_mt_conv %>%
    left_join(conversion_factors_fcl_mt, by = c("fcl", "wco")) %>%
    # Zero quantities will be imputed
    dplyr::mutate(convspec_mt = ifelse(is.na(convspec_mt), 0, convspec_mt))

  # XXX some weights are missing: e.g., reporter = 20, fcl = 946, year = 2015
  # They can be imputed as the median of existing weights (see below), but this
  # needs to be discussed:
  #  livestock_weights %>%
  #    unite(fcl_live, fcl, livestock, sep = '#') %>%
  #    complete(reporter_fao, fcl_live) %>%
  #    separate(fcl_live, c('fcl', 'livestock'), sep = '#') %>%
  #    dplyr::mutate(fcl = as.integer(fcl)) %>%
  #    group_by(fcl) %>%
  #    dplyr::mutate(liveweight = ifelse(is.na(liveweight), median(liveweight, na.rm = TRUE), liveweight))

  # weight > heads
  fcl_spec_head_conv <- livestock_weights %>%
    tbl_df() %>%
    dplyr::select(reporter = reporter_fao, fcl, liveweight) %>%
    left_join(fcl_spec_head_conv, by = 'fcl') %>%
    dplyr::filter(!is.na(fclunit)) %>%
    dplyr::filter(!is.na(liveweight), liveweight > 0) %>%
    dplyr::rename(convspec_head = liveweight)

  ### Add commodity specific conv.factors to dataset

  tldata <- tldata %>%
    left_join(fcl_spec_mt_conv,   by = c("fcl", "wco")) %>%
    left_join(fcl_spec_head_conv, by = c("reporter", "fcl", "fclunit"))

  tldata$id <- 1:nrow(tldata)

  tldata$qtyfcl <- NA_real_

  tldata_converted <- tldata %>%
    dplyr::mutate(
      qtyfcl =
        case_when(
          .$fclunit == 'mt' & !is.na(.$weight)                     ~ .$weight / 1000,
          .$fclunit == 'heads' & !is.na(.$qty) & .$wco == 'u'      ~ .$qty,
          .$fclunit == 'number' & !is.na(.$qty) & .$wco == 'u'     ~ .$qty,
          # This requires a flag change
          .$fclunit == '1000 heads' & !is.na(.$qty) & .$wco == 'u' ~ .$qty / 1000,
          # -1 will be set to NA just below
          .$fclunit == '$ value only'                              ~ -1,
          # Nothing can be done for these
          is.na(.$weight) & is.na(.$qty)                           ~ -1,
          TRUE                                                     ~ .$qtyfcl
        )
    ) %>%
    dplyr::filter(!is.na(qtyfcl)) %>%
    dplyr::mutate(qtyfcl = ifelse(qtyfcl == -1, NA, qtyfcl))

  tldata_not_converted <-
    anti_join(tldata, tldata_converted, by = 'id')

  ########## Conversion of units

  #### FCL specific conv

  tldata_to_convert <- tldata_not_converted %>%
    dplyr::mutate(
      qtyfcl =
        case_when(
          !is.na(.$convspec_mt)                               ~ .$qty * .$convspec_mt,
          !is.na(.$convspec_head) & .$fclunit != '1000 heads' ~ .$weight / .$convspec_head,
          !is.na(.$convspec_head) & .$fclunit == '1000 heads' ~ .$weight / .$convspec_head / 1000,
          #### Common conv
          # If no specific conv. factor, we apply general
          TRUE                                                ~ .$qty * .$conv
        )
    )

  tldata <-
    bind_rows(
      tldata_converted,
      tldata_to_convert
    ) %>%
    # Decimals make no sense for heads
    dplyr::mutate(qtyfcl = ifelse(fclunit %in% 'heads', round(qtyfcl, 0), qtyfcl))

  rm(tldata_converted, tldata_to_convert, tldata_not_converted)
  invisible(gc())

  # Estimation of weight for livestock will be kept for later,
  # especifically after doImputation() has been used.

} else {
  tldata$qtyfcl = NA
}

###' 1. If the `weight` variable is available and the final unit
###' of measurement is tonnes then `weight` is used as `quantity`.
## (already done above)
#cond_w <- tldata$fclunit == 'mt' & !is.na(tldata$weight) & tldata$weight > 0
#
#tldata$qtyfcl <- ifelse(cond_w, tldata$weight / 1000, tldata$qtyfcl)

# Weight is always in tonnes
tldata$weight <- tldata$weight / 1000

# XXX sometimes the condition is TRUE, but it doesn't mean that
# the conversion factors were indeed used. See, e.g.,
# tldata %>% filter(year == 2016, reporter == 154, fcl == 1062)
cond_q <- !is.na(tldata$convspec_mt) | !is.na(tldata$convspec_head)

# XXX
# Flag on weight as qty (which underwent a change) will populate weight
#
tldata <- tldata %>%
  setFlag3(weight > 0, type = 'method', flag = 'i', variable = 'weight') %>%
  setFlag3(cond_q,     type = 'method', flag = 'i', variable = 'weight')

##' # Combine Trade Data Sources

######### Value from USD to thousands of USD

##+ es_convcur

##' 1. Convert currency of monetary values from EUR to USD using the
##' `EURconversionUSD` table (required only for ES).

eur_usd <- as.numeric(EURconversionUSD[eusd_year == year, eusd_exchangerate])

esdata$value <- esdata$value * eur_usd

esdata <- esdata %>%
  setFlag3(value > 0, type = 'method', flag = 'i', variable = 'value')


##' 1. Convert data in thousands of dollars.

if (dollars) {
  esdata <- esdata %>%
    dplyr::mutate(value = value * 1000) %>%
    setFlag3(value > 0, type = 'method', flag = 'i', variable = 'value')
} else { ## This means it is in k$
  tldata <- tldata %>%
    dplyr::mutate(value = value / 1000) %>%
    setFlag3(value > 0, type = 'method', flag = 'i', variable = 'value')
}

##+ tl_aggregate

tldata_mid <- tldata

###' 1. Assign 'weight' flags to 'qty' flags in TL XXX.
#
# NO: this isn't needed as below qty = weight and it has already its own flag
#
#tldata <- tldata %>%
#  dplyr::mutate_each_(funs(swapFlags(., swap='\\1\\2\\2'), !is.na(weight)),
#               ~starts_with('flag_'))

esdata <- esdata %>%
  dplyr::mutate(qtyfcl = ifelse(fclunit == "mt", weight, qty))

# Assign `qty` flags to `weight` flags in ES but
# only when `fclunit` is different from "mt".


esdata <- esdata %>%
  dplyr::mutate_each_(funs(swapFlags(., swap='\\1\\3\\3', fclunit != "mt")),
                      ~starts_with('flag_'))


##' 1. Combine Tariff line and Eurostat data sources in a single data set:
##'     - TL: assign `weight` to `qty`.
##'     - ES: assign `weight` to `qty` if `fclunit` is "mt", else keep `qty`.

##+ combine_es_tl
flog.trace("[%s] Combine TL and ES data sets", PID, name = "dev")
tradedata <- bind_rows(
  tldata %>%
    dplyr::select(year, reporter, partner, flow, fcl, fclunit, hs,
                  value, weight, qty = qtyfcl,
                  convspec_head, starts_with('flag_'), wco),
  esdata %>%
    dplyr::mutate(convspec_head = NA_real_, wco = NA_character_) %>%
    dplyr::select(year, reporter, partner, flow, fcl, fclunit, hs,
                  value, weight, qty = qtyfcl,
                  convspec_head, starts_with('flag_'), wco)
)

# XXX this is fine, but probably the name of the function should be changed
tradedata <- tradedata %>%
  dplyr::mutate_each_(funs(swapFlags(., swap='\\1\\2')), ~starts_with('flag_'))

### Check for double counting of HS codes
#hs_many_lengths = getHsManyLengths(tradedata)
#rprt_writetable(hs_many_lengths, subdir = 'details')

flog.trace("[%s] Outlier detection and imputation", PID, name = "dev")
##+ calculate_median_uv

tradedata <- tradedata %>%
  dplyr::mutate(
    no_quant = fclunit != '$ value only' & (near(qty, 0) | is.na(qty)),
    no_value = near(value, 0) | is.na(value)
  )

##' 1. Unit values are calculated for each observation at the HS level as ratio
##' of monetary value over quantity: $uv = value / qty$.

tradedata <- dplyr::mutate_(tradedata,
                            uv = ~ifelse(no_quant | no_value, NA, value / qty))

## Round UV in order to avoid floating point number problems (see issue #54)
tradedata$uv <- round(tradedata$uv, 10)

##+ boxplot_uv

if (detect_outliers) {
  tradedata <- detectOutliers(tradedata = tradedata,
                              method = "boxplot",
                              parameters = list(out_coef = out_coef))
} else {
  tradedata$outlier <- FALSE
}

##' # Imputation

##+ impute_qty_uv

##' 1. Imputation of missing quantities by applying the method presented
##' in the *Missing Quantities Imputation* subsection of the *faoswsTrade:
##' `complete_tf_cpc` and `total_trade_CPC` modules* document
##' (*Standardization, editing and outlier detection* section). The
##' `flagTrade` variable is given a value of 1 if an imputation was performed.

## These flags are also assigned to monetary values. This may need to be
## revised (monetary values are not supposed to be modified).

##' OCTOBER 2021: The Team BC has decided to cancel the hierarchical imputation
##' methodology and the global unit value changes applied to impute the missing
##' quantities.
##' Old methodology was using computeMedianUnitValue and doImputation functions.
##' so bring them back in case the team changes their idea.

setDT(tradedata)
global_UVs <- ReadDatatable('ess_trade_global_unit_values')
global_UVs <- global_UVs[region=='World',]
global_UVs[, flow:= ifelse(flow=='import', 1, 2)]
global_UVs[, c("measured_item_cpc_description", "region"):= NULL]

map = ReadDatatable("fcl2cpc_ver_2_1")
map[, c(names(map)[names(map) %!in% c('fcl', 'cpc')]):= NULL]
map <- map[!duplicated(cpc),]
out = merge(global_UVs, map, by.x = "measured_item_cpc", by.y = "cpc", all.x = TRUE)
#### Take the previous year's UVs from Total trade level####
geoKeys = GetCodeList("trade", "total_trade_cpc_m49", "geographicAreaM49")[type == "country"][,code] %>%
  Dimension(name = "geographicAreaM49", keys = .)

itemKeys = GetCodeList("trade", "total_trade_cpc_m49", "measuredItemCPC")[, code] %>%
  Dimension(name = "measuredItemCPC", keys = .)

eleKeys <- c("5630", "5930"
              , "5638", "5639", "5938", "5939"
             ) %>%
  Dimension(name = "measuredElementTrade", keys = .)

timeDim <- Dimension(name = "timePointYears", keys = as.character(year-1))

key = DatasetKey(domain = "trade", dataset = "total_trade_cpc_m49", dimensions = list(
    geographicAreaM49 = geoKeys,
    measuredElementTrade = eleKeys,
    measuredItemCPC = itemKeys,
    timePointYears = timeDim
    ))

data_previous_year = GetData(key, omitna = FALSE, normalized = FALSE)
data_previous_year[, c(names(data_previous_year)[grepl('flag', names(data_previous_year))]):= NULL]
data_previous_year <- data_previous_year[!(measuredElementTrade==5630 & stringr::str_sub(measuredItemCPC, 1, 3) == '021'),]
data_previous_year <- data_previous_year[!(measuredElementTrade==5930 & stringr::str_sub(measuredItemCPC, 1, 3) == '021'),]
data_previous_year[, flow:= ifelse(substr(measuredElementTrade, 1, 2)=='56', 1, 2)]


#####
out_new <- out[time_point_years == as.character(year),]
setnames(out_new, c('measured_item_cpc', 'time_point_years'), c('measuredItemCPC', 'timePointYears'))
merged1 <- merge(out_new, data_previous_year, by = c('measuredItemCPC', 'flow'))

merged1 <- merged1[, variation_unit_value:= 1+ variation_unit_value]
setnames(merged1, c(paste0('Value_timePointYears_', as.character(year-1))), c('value_old'))
merged1 <- merged1[, unitvalue_estimation_new:= value_old*variation_unit_value]

merged1$fcl <- sub("^0+", "", merged1$fcl)

merged1 <- merged1 %>%
  dplyr::mutate_(
    geographicAreaM49 = ~m492fs(as.character(geographicAreaM49)))
# merged1 <- merged1[geographicAreaM49=='360', geographicAreaM49:='101']
merged1$geographicAreaM49 <- as.integer(merged1$geographicAreaM49)
setDT(merged1)
merged1[, c("unit_value", "variation_unit_value", "measuredElementTrade", "value_old", "measuredItemCPC"):= NULL]
merged1[, c(names(merged1)[grepl('flag', names(merged1))]):= NULL]
merged1$timePointYears <- as.integer(merged1$timePointYears)
merged1$fcl <- as.numeric(merged1$fcl)

tradedata <- merge(tradedata, merged1, by.x = c('year', 'reporter', 'flow', 'fcl'),
                               by.y = c('timePointYears', 'geographicAreaM49', 'flow', 'fcl'), all.x = TRUE)

tradedata <- tradedata[, imputed_qty:= (value / unitvalue_estimation_new)*1000]

tradedata <- tradedata[, qty := ifelse(no_quant==TRUE | outlier==TRUE, imputed_qty, qty)]
tradedata <- tradedata[, flagTrade := ifelse(no_quant==TRUE | outlier==TRUE, 1, 0)]
tradedata <- tradedata[, c("unitvalue_estimation_new", "imputed_qty"):=NULL]
# tradedata <- tradedata[, weight:=qty]
tbl_df(tradedata)

# XX BRING BACK If Team BC decides again to use hierarchical imputation methodology
# tradedata <- computeMedianUnitValue(tradedata = tradedata)
# tradedata <- doImputation(tradedata = tradedata)

# Team BC: do not assign imputed flag to imputed flows that
# account for less than 10% of the total flow.
tradedata <-
  tradedata %>%
  group_by(year, reporter, flow, fcl) %>%
  dplyr::mutate(qty_perc = qty / sum(qty)) %>%
  ungroup() %>%
  dplyr::mutate(
    flagTrade = ifelse(fclunit != '$ value only' & qty_perc > 0.1, flagTrade, 0)
  ) %>%
  dplyr::select(-qty_perc)

flog.trace("[%s] Flag stuff", PID, name = "dev")
# XXX using flagTrade for the moment, but should go away
# (Team BC: quantities below 1 tonne do not get imputed flag)
tradedata <- tradedata %>%
  setFlag2(flagTrade > 0 & qty > 1,
           type = 'status', flag = 'I', var = 'quantity') %>%
  setFlag2(flagTrade > 0 & qty > 1,
           type = 'method', flag = 'e', var = 'quantity')

##' # Additional operations

##' 1. Separate flags.

###### TODO (Christian) Rethink/refactor
# separate flag_method and flag_status into 2 variables each one: _v and _q
flag_vars <- colnames(tradedata)[grep('flag_', colnames(tradedata))]
for (var in flag_vars) {
  tradedata <- separate_(tradedata, var, 1:2,
                         into = c('x', paste0(var, '_', c('v', 'q'))),
                         convert = TRUE) %>%
    dplyr::select(-x)
}

tradedata_flags <- tradedata %>%
  group_by_(~year, ~reporter, ~partner, ~flow, ~fcl) %>%
  dplyr::summarise_each_(funs(sum(.)), vars = ~starts_with('flag_')) %>%
  ungroup() %>%
  dplyr::mutate_each_(funs(as.integer(. > 0)), vars = ~starts_with('flag_'))

##' 1. Aggregate values, quantities, and flags by FCL codes.

# Aggregation by fcl
flog.trace("[%s] Aggregation by FCL", PID, name = "dev")
tradedata <- tradedata %>%
  dplyr::mutate_(nfcl = 1) %>%
  group_by_(~year, ~reporter, ~partner, ~flow, ~fcl, ~fclunit) %>%
  dplyr::summarise_each_(
    funs(sum(., na.rm = TRUE)),
    vars = c("value", "weight", "qty", "flagTrade", "nfcl")
  ) %>%
  ungroup()

# XXX not all weights were estimated, so after aggregation they are zero
tradedata <-
  tradedata %>%
  dplyr::mutate(weight = ifelse(near(weight, 0), NA, weight))


flog.trace("[%s] Flags again", PID, name = "dev")
tradedata <- left_join(tradedata,
                       tradedata_flags,
                       by = c('year', 'reporter', 'partner', 'flow', 'fcl'))

###### TODO (Christian) Rethink/refactor
# unite _v and _q into one variable
flag_vars <- sort(unique(sub('_[vq]$', '', colnames(tradedata)[grep('flag_', colnames(tradedata))])))
for (var in flag_vars) {
  var_v <- paste0(var, '_v')
  var_q <- paste0(var, '_q')

  tradedata[[var]] <- 100 + (tradedata[[var_v]]>0)*10 + (tradedata[[var_q]]>0)*1
}
tradedata <- tradedata[-grep('^flag_.*[vq]$', colnames(tradedata))]

tradedata <- tradedata %>%
  setFlag2(nfcl > 1,  type = 'method', flag = 's', variable = 'all')

##' 1. Map FCL codes to CPC (version 2.1).

# Adding CPC2 extended code
flog.trace("[%s] Add CPC item codes", PID, name = "dev")
tradedata <- tradedata %>%
  dplyr::mutate_(cpc = ~fcl2cpc(sprintf("%04d", fcl), version = "2.1"))

# Not resolve mapping fcl2cpc
no_mapping_fcl2cpc <- tradedata %>%
  select_(~fcl, ~cpc) %>%
  filter_(~is.na(cpc)) %>%
  distinct_(~fcl) %>%
  select_(~fcl) %>%
  unlist()

##' 1. Map FAO area codes to M49. Countries with FAOSTAT code 252
##' ("Unspecified") are converted to M49 code 896 ("Other nei").

# Converting back to M49 for the system
flog.trace("[%s] Convert FAO area codes to M49", PID, name = "dev")
tradedata <- tradedata %>%
  dplyr::mutate_(
    reporterM49 = ~fs2m49(as.character(reporter)),
    partnerM49  = ~fs2m49(as.character(partner))
  ) %>%
  # XXX issue 34
  dplyr::mutate(partnerM49 = ifelse(partner == 252, '896', partnerM49))

# Report of countries mapping to NA in M49
countries_not_mapping_M49 <- bind_rows(
  tradedata %>% select_(fc = ~reporter, m49 = ~reporterM49),
  tradedata %>% select_(fc = ~partner,  m49 = ~partnerM49)) %>%
  distinct_() %>%
  filter_(~is.na(m49)) %>%
  select_(~fc) %>%
  unlist()

##+ mirror_estimation

##' # Mirror Trade Estimation

##' 1. Create a table with the list of reporters and partners
##' combined as areas and count the number of flows that the
##' areas declare as reporting countries. The partners that
##' never show up as reporters or the reporters that do not
##' report a flow will have a number of flows equal to zero
##' and will be mirrored.
flog.trace("[%s] Mirroring", PID, name = "dev")

to_mirror <- flowsToMirror(tradedata) %>%
  dplyr::filter(area != 252)

##' 1. Swap the reporter and partner dimensions: the value previously appearing
##' as reporter country code becomes the partner country code (and vice versa).

##' 1. Invert the flow direction: an import becomes an export (and vice versa).

##' 1. Calculate monetary mirror value by adding (removing) a 12% mark-up on
##' imports (exports) to account for the difference between CIF and FOB prices.

## Mirroring for non reporting countries
tradedata <- mirrorNonReporters(tradedata, mirror = to_mirror)

# Add an auxiliary variable "mirrored" that will be removed later
tradedata <- tradedata %>%
  left_join(
    to_mirror %>% dplyr::mutate(mirrored = 1L),
    by = c('reporter' = 'area', 'flow')
  )

flog.trace("[%s] Flags to mirrored flows", PID, name = "dev")

tradedata <- tradedata %>%
  setFlag2(!is.na(mirrored), type = 'status', flag = 'T', var = 'all') %>%
  setFlag2(!is.na(mirrored), type = 'method', flag = 'i', var = 'value') %>%
  setFlag2(!is.na(mirrored), type = 'method', flag = 'c', var = 'quantity') %>%
  dplyr::select(-mirrored)

##' # Flag aggregation

##' Flags are aggregated as mentioned in the *Flags* section in
##' the main documentation or, more in depth, in the "Flag Management
##' in the Trade module" document.

################################################
# TODO Rethink/refactor: clean flags for fclunit != "$ value only"
################################################

##+ completed_trade_flow

###### TODO (Christian) Rethink/refactor
# separate flag_method and flag_status into 2 variables each one: _v and _q
flag_vars <- colnames(tradedata)[grep('flag_', colnames(tradedata))]
for (var in flag_vars) {
  tradedata <- separate_(tradedata, var, 1:2,
                         into = c('x', paste0(var, '_', c('v', 'q'))),
                         convert = TRUE) %>%
    dplyr::select(-x)
}

##' # Output for SWS

##' 1. Filter observations with missing CPC codes.

##' 1. Rename dimensions to comply with SWS standard,
##' e.g., `geographicAreaM49Reporter`.

##' 1. Calculate unit value (US$ per quantity unit) at CPC
##' level if the quantity is larger than zero.

# Modified in order to have X in the table
flagWeightTable_status <- frame_data(
  ~flagObservationStatus, ~flagObservationWeights,
  'Y',                   1.00, # This acts as blank
  '',                    0.99,
  'X',                   0.90,
  'T',                   0.80,
  'E',                   0.75,
  'I',                   0.50,
  'M',                   0.00
)

# There is no native "method" table
flagWeightTable_method <- frame_data(
  ~flagObservationStatus, ~flagObservationWeights,
  'h',                   1.00,
  '',                    0.99,
  'q',                   0.95,
  'p',                   0.90,
  'i',                   0.80,
  'e',                   0.60,
  'f',                   0.50,
  'c',                   0.40,
  '-',                   0.30,
  's',                   0.20
)

# XXX This will need a refactoring
flog.trace("[%s] Cycle on status and method flags", PID, name = "dev")
for (i in c('status', 'method')) {
  for (j in c('v', 'q')) {

    dummies <-
      tradedata %>%
      dplyr::select(matches(paste0('flag_', i, '_._', j))) %>%
      dplyr::mutate_all(funs(ifelse(equals(., 0), NA, .)))

    flags <- sub('.*_(.)_.$', '\\1', colnames(dummies))

    flagWeightTable <-
      switch(i, status = flagWeightTable_status, method = flagWeightTable_method)

    found_flags <- flagWeightTable[match(flags, flagWeightTable$flagObservationStatus),]

    var <- paste0('flag', toupper(i), '_', j)

    final_flags <- suppressWarnings(apply(t(t(as.matrix(dummies)) * (found_flags$flagObservationWeights)), 1, min, na.rm = TRUE))

    final_flags[is.infinite(final_flags)] <- NA

    final_flags <- found_flags$flagObservationStatus[match(final_flags, found_flags$flagObservationWeights)]

    tradedata[[var]] <- final_flags
  }
}

flog.trace("[%s] Complete trade flow CPC", PID, name = "dev")
complete_trade_flow_cpc <- tradedata %>%
  # 5000 is, e.g., world (here reporter is in FAL)
  filter_(~reporter < 1000) %>%
  select_(~-fcl) %>%
  filter_(~!(is.na(cpc))) %>%
  transmute_(geographicAreaM49Reporter = ~reporterM49,
             geographicAreaM49Partner  = ~partnerM49,
             flow                      = ~flow,
             timePointYears            = ~year,
             measuredItemCPC           = ~cpc,
             value                     = ~value,
             weight                    = ~weight,
             qty                       = ~qty,
             unit                      = ~fclunit,
             flagObservationStatus_v   = ~flagSTATUS_v,
             flagObservationStatus_q   = ~flagSTATUS_q,
             flagMethod_v              = ~flagMETHOD_v,
             flagMethod_q              = ~flagMETHOD_q) %>%
  ## unit of monetary values is "1000 $"
  dplyr::mutate(uv = ifelse(qty > 0, value * 1000 / qty, NA))

##' 1. Keep officially reported weight in kilograms for livestock.
##' Besides of quantities in "heads" or "1000 heads" if a country
##' reported also the weight, it will be kept and saved to SWS.

# Keep weight for livestock
complete_trade_flow_cpc_live <-
  complete_trade_flow_cpc %>%
  dplyr::filter(unit %in% c('heads', '1000 heads')) %>%
  # XXX for now, no flags
  dplyr::select(-starts_with('flag')) %>%
  dplyr::mutate(flagObservationStatus = '', flagMethod = '') %>%
  # we need here just weight
  dplyr::mutate(measuredElementTrade = ifelse(flow == 1, '5610', '5910')) %>%
  dplyr::select(-value, -qty, -uv, -unit, -flow) %>%
  dplyr::rename(Value = weight)

# remove weight as not needed anymore
complete_trade_flow_cpc <-
  complete_trade_flow_cpc %>%
  dplyr::select(-weight)


##' Re-impute (at CPC level)

to_reimpute <- ReadDatatable('reimpute_uv')[, .(m49 = geographic_area_m49, reimp_year = year)]

to_reimpute <- as.character(to_reimpute[reimp_year == year]$m49)

if (length(to_reimpute) > 0) {

  setDT(complete_trade_flow_cpc)

  flog.trace("[%s] Re-impute at CPC level", PID, name = "dev")

  allReportersDim_tot <-
    Dimension(name = "geographicAreaM49", keys = to_reimpute)

  allElementsDim_tot <-
    c("5638", "5639", "5630", "5938", "5939", "5930") %>%
    Dimension(name = "measuredElementTrade", keys = .)

  allItemsDim_tot <-
    GetCodeList("trade", "total_trade_cpc_m49", "measuredItemCPC")$code %>%
    Dimension(name = "measuredItemCPC", keys = .)

  total_trade_key_uv <-
    DatasetKey(
      domain = "trade",
      dataset = "total_trade_cpc_m49",
      dimensions =
        list(
          allReportersDim_tot,
          allElementsDim_tot,
          allItemsDim_tot,
          Dimension(name = "timePointYears", keys = as.character(year - 3:1))
        )
    )

  prev_totals <- GetData(key = total_trade_key_uv, omitna = TRUE, flags = FALSE)

  prev_totals[, flow := ifelse(substr(measuredElementTrade, 2, 2) == '6', 1, 2)]

  prev_totals[, measuredElementTrade := NULL]

  setnames(prev_totals, c('geographicAreaM49', 'Value'), c('geographicAreaM49Reporter', 'uv_prev'))

  # Do the average of the previous years and keep only
  # those for which there was data in 2 or 3 years
  prev_totals_avg <-
    prev_totals[,
                .(n = .N, uv_prev = mean(uv_prev)),
                .(geographicAreaM49Reporter, measuredItemCPC, flow)
                ][
                  n %in% 2:3
                  ][,
                    n := NULL
                    ]

  # World UV by CPC in the previous year
  prev_totals_median <-
    prev_totals[
      timePointYears  == (year - 1)
      ][,
        .(uv_tot_median_prev = median(uv_prev)), .(measuredItemCPC, flow)
        ]


  current_totals <-
    complete_trade_flow_cpc[,
                            .(value = sum(value), qty = sum(qty)),
                            .(timePointYears, geographicAreaM49Reporter, measuredItemCPC, flow)
                            ][,
                              uv := value / qty * 1000
                              ]

  # World UV by CPC in current year
  current_totals_median <-
    current_totals[
      !is.na(uv)
      ][,
        .(uv_total_median = median(uv), n = .N),
        .(measuredItemCPC, flow)
        ]

  all_totals_median <-
    merge(
      current_totals_median,
      prev_totals_median,
      by = c('measuredItemCPC', 'flow'),
      all.x = TRUE
    )

  all_totals_median[, variation := uv_total_median / uv_tot_median_prev - 1]

  all_totals_median <- all_totals_median[variation %between% c(-0.5, 0.5) | n > 50]

  all_totals_median[, n := NULL]

  uv_total_variation <-
    merge(
      current_totals,
      prev_totals_avg,
      by = c('geographicAreaM49Reporter', 'measuredItemCPC', 'flow'),
      all.x = TRUE
    )

  uv_total_variation[!is.na(uv_prev), x := uv / uv_prev]

  uv_total_variation <- uv_total_variation[!(x %between% c(0.5, 1.5))]

  uv_total_imputed <-
    merge(
      uv_total_variation,
      all_totals_median,
      by = c("flow", "measuredItemCPC"),
      all.x = TRUE
    )

  uv_total_imputed[,
                   `:=`(
                     estimated_upwards = ifelse(x > 1, TRUE, FALSE),
                     uv_imputed = uv_prev * (1 + variation)
                   )
                   ]

  uv_total_imputed <-
    uv_total_imputed[,
                     .(geographicAreaM49Reporter, flow, timePointYears,
                       measuredItemCPC, uv_imputed, estimated_upwards)
                     ]

  complete_trade_flow_cpc[,
                          `:=`(
                            same_uv     = near(sd(uv), 0),
                            top_partner = value == max(value, na.rm = TRUE)
                          ),
                          by = c("timePointYears", "geographicAreaM49Reporter", "flow", "measuredItemCPC")
                          ]

  complete_trade_flow_cpc <-
    merge(
      complete_trade_flow_cpc,
      uv_total_imputed,
      by = c("geographicAreaM49Reporter", "flow", "timePointYears", "measuredItemCPC"),
      all.x = TRUE
    )

  ###### THIS SHOULD BE A TEMPORARY FIX FOR SOME USA ITEMS
  ## (keep total fixed, from external source, and distribute flows)

  #if (year == 2017L) {

  #  flog.trace("[%s] USA 2017 fix", PID, name = "dev")

  #  allReportersDim_tot <-
  #    Dimension(name = "geographicAreaM49", keys = "840")

  #  allElementsDim_tot <-
  #    Dimension(name = "measuredElementTrade", keys = c("5610", "5910"))

  #  allItemsDim_tot <-
  #    GetCodeList("trade", "total_trade_cpc_m49", "measuredItemCPC")$code %>%
  #    Dimension(name = "measuredItemCPC", keys = .)

  #  total_trade_key_usa <-
  #    DatasetKey(
  #      domain = "trade",
  #      dataset = "total_trade_cpc_m49",
  #        dimensions =
  #          list(
  #            allReportersDim_tot,
  #            allElementsDim_tot,
  #            allItemsDim_tot,
  #            Dimension(name = "timePointYears", keys = as.character(year))
  #          )
  #    )

  #  usa_total <- GetData(key = total_trade_key_usa, omitna = TRUE)

  #  usa_total_protected <-
  #    usa_total[
  #      flagObservationStatus == "" & flagMethod == "p"
  #    ][,
  #      flow := ifelse(substr(measuredElementTrade, 1, 2) == "56", 1, 2)
  #    ][,
  #      list(
  #        geographicAreaM49Reporter = geographicAreaM49,
  #        flow,
  #        measuredItemCPC,
  #        timePointYears = as.integer(timePointYears),
  #        total = Value
  #      )
  #    ]

  #  complete_trade_flow_cpc <-
  #    merge(
  #      complete_trade_flow_cpc,
  #      usa_total_protected,
  #      by = c("geographicAreaM49Reporter", "flow", "measuredItemCPC", "timePointYears"),
  #      all.x = TRUE
  #    )

  #  complete_trade_flow_cpc[
  #    !is.na(total),
  #    proportion := qty / sum(qty, na.rm = TRUE),
  #    by = c('geographicAreaM49Reporter', 'flow', 'timePointYears', 'measuredItemCPC')
  #  ]

  #  complete_trade_flow_cpc[!is.na(total), qty_prop := total * proportion]

  #  complete_trade_flow_cpc[!is.na(qty_prop), uv_imputed := value / qty_prop * 1000]

  #  complete_trade_flow_cpc[, c("total", "proportion", "qty_prop") := NULL]

  #  if (nrow(complete_trade_flow_cpc[!is.na(uv_imputed)]) > 0) {
  #    flog.trace("[%s] USA 2017 fix applied", PID, name = "dev")
  #  } else {
  #    flog.trace("[%s] USA 2017 fix NOT applied", PID, name = "dev")
  #  }

  #}

  ###### / TEMPORARY FIX

  # TODO: save metadata of these
  # FIXME: 840 and 36 are temporary (20190621)
  complete_trade_flow_cpc[
    (!is.na(uv_imputed) & flagObservationStatus_q == "I" & (same_uv == TRUE | top_partner == TRUE)) |
      (!is.na(uv_imputed) & (geographicAreaM49Reporter == "840" | geographicAreaM49Reporter == "36")),
    `:=`(
      uv                      = uv_imputed,
      qty                     = value / (uv_imputed / 1000),
      flagObservationStatus_q = "I",
      flagMethod_q            = "e"
    )
    ]

  mirrored_reimputed <-
    complete_trade_flow_cpc[
      (!is.na(uv_imputed) & flagObservationStatus_q == "I" & (same_uv == TRUE | top_partner == TRUE)) |
        (!is.na(uv_imputed) & (geographicAreaM49Reporter == "840" | geographicAreaM49Reporter == "36")),
      ][,
        .(
          timePointYears,
          geographicAreaM49Reporter = geographicAreaM49Partner,
          geographicAreaM49Partner = geographicAreaM49Reporter,
          flow = ifelse(flow == 1, 2, 1),
          measuredItemCPC,
          qty_mirror_imputed = qty
        )
        ]


  complete_trade_flow_cpc <-
    merge(
      complete_trade_flow_cpc,
      mirrored_reimputed,
      by = c('timePointYears', 'geographicAreaM49Reporter',
             'geographicAreaM49Partner', 'flow', 'measuredItemCPC'),
      all.x = TRUE
    )

  complete_trade_flow_cpc[
    !is.na(qty_mirror_imputed) & flagObservationStatus_q == 'T',
    qty := qty_mirror_imputed,
    ][
      !is.na(qty_mirror_imputed) & flagObservationStatus_q == 'T',
      uv := value / qty * 1000
      ]


  complete_trade_flow_cpc[, c("same_uv", "top_partner", "uv_imputed", "estimated_upwards", "qty_mirror_imputed") := NULL]

  complete_trade_flow_cpc <- tbl_df(complete_trade_flow_cpc)

}


##' 1. Use corrections set by analysts during the validation process.

corrections_table <- corrections_table_all %>%
  dplyr::rename(correction_year = year) %>%
  dplyr::filter(correction_year == year)

corrections_exist <- nrow(corrections_table) > 0

if (corrections_exist) {
  flog.trace("[%s] Corrections exist", PID, name = "dev")

  corrections_table <- corrections_table %>%
    dplyr::filter(correction_level == 'CPC') %>%
    dplyr::select(-correction_year, -correction_level, -correction_hs) %>%
    # Some of these cases were found, but are probably mistakes: should inform
    dplyr::filter(!is.na(correction_input) | !near(correction_input, 0)) %>%
    # XXX actually, flow should be integer in complete_trade_flow_cpc
    dplyr::mutate(flow = as.numeric(flow)) %>%
    # XXX Remove duplicate corrections
    group_by(reporter, partner, item, flow, data_type) %>%
    dplyr::arrange(dplyr::desc(date_correction)) %>%
    dplyr::slice(1) %>%
    ungroup()

  corrections_metadata <- apply(dplyr::select(corrections_table, name_analyst, data_original, correction_type:date_validation),
                                1, function(x) paste(names(x), ifelse(x == '', NA, x), collapse = '; ', sep = ': '))

  corrections_table <- corrections_table %>%
    dplyr::select(-(correction_note:date_validation)) %>%
    dplyr::mutate(correction_metadata = gsub('  *', ' ', corrections_metadata))

  flog.trace("[%s] Apply corrections to reporter", PID, name = "dev")

  complete_corrected <- useValidationCorrections(complete_trade_flow_cpc,
                                                 corrections_table)

  complete_trade_flow_cpc_mirror <- complete_trade_flow_cpc %>%
    dplyr::mutate(is_mirror = (flagObservationStatus_v %in% 'T' | flagObservationStatus_q %in% 'T')) %>%
    dplyr::filter(is_mirror) %>%
    dplyr::select(-is_mirror)

  complete_with_corrections_mirror <- complete_corrected$corrected %>%
    dplyr::select(
      geographicAreaM49Reporter = geographicAreaM49Partner,
      geographicAreaM49Partner  = geographicAreaM49Reporter,
      flow,
      measuredItemCPC
    ) %>%
    dplyr::mutate(flow = recode(flow, '2' = 1, '1' = 2), to_correct = TRUE)

  complete_mirror_to_correct <- complete_trade_flow_cpc_mirror %>%
    left_join(
      complete_with_corrections_mirror,
      by = c(
        'geographicAreaM49Reporter',
        'geographicAreaM49Partner',
        'flow',
        'measuredItemCPC'
      )
    ) %>%
    dplyr::filter(to_correct) %>%
    dplyr::select(-to_correct)

  corrections_table_mirror <- corrections_table %>%
    dplyr::rename(reporter = partner, partner = reporter) %>%
    dplyr::mutate(flow = recode(flow, '2' = 1, '1' = 2)) %>%
    dplyr::mutate(
      correction_input =
        case_when(
          .$data_type == 'value' & .$flow == 1 ~ .$correction_input * 1.12,
          .$data_type == 'value' & .$flow == 2 ~ .$correction_input / 1.12,
          TRUE                                 ~ .$correction_input
        )
    )

  flog.trace("[%s] Apply corrections to partner (if mirrored)", PID, name = "dev")

  if (nrow(complete_mirror_to_correct)>0) {

    complete_mirror_corrected <- useValidationCorrections(complete_mirror_to_correct,
                                                          corrections_table_mirror)

    complete_all_corrected <- bind_rows(complete_corrected$corrected,
                                        complete_mirror_corrected$corrected)

  } else {

    complete_all_corrected <- complete_corrected$corrected
  }


  complete_uncorrected <- complete_trade_flow_cpc %>%
    anti_join(
      complete_all_corrected,
      by = c('geographicAreaM49Reporter', 'geographicAreaM49Partner',
             'flow', 'measuredItemCPC')
    )

  complete_trade_flow_cpc <- bind_rows(complete_uncorrected,
                                       complete_all_corrected)

  if (nrow(complete_corrected$to_drop) > 0) {
    unapplied_csv_filename <-
      tempfile(pattern = "unapplied_corrections_", fileext = ".csv")

    flog.trace("[%s] Some corrections were not applied", PID, name = "dev")

    warning('Some corrections were not applied. See reports. ', unapplied_csv_filename)
    corrections_unapplied <- complete_corrected$to_drop %>%
      dplyr::mutate(year = year) %>%
      dplyr::select(-correction_metadata) %>%
      left_join(
        corrections_table_all,
        by = c('year', 'reporter', 'partner', 'item',
               'flow', 'data_original',
               'data_type', 'correction_input',
               'correction_type')
      ) %>%
      dplyr::select(year, everything()) %>%
      as.data.frame()

    write.csv(
      dplyr::mutate(corrections_unapplied, item = paste0("'", item)),
      unapplied_csv_filename
    )

    if (!CheckDebug()) {
      send_mail(
        from    = "SWS-trade-module@fao.org",
        to      = EMAIL_RECIPIENTS,
        subject = paste0("Trade plugin: corrections unapplied, year ", year),
        body    = c("Some corrections cannot be applied.", unapplied_csv_filename),
        remove  = TRUE
      )
    }

    # XXX: bring back
    #rprt_writetable(corrections_unapplied, subdir = 'preproc')

    # File to check to be loaded in the validation tool: it
    # will contain also the new figures generated by the module
    corrections_unapplied_validation <-
      left_join(
        corrections_unapplied,
        complete_trade_flow_cpc %>%
          dplyr::select(
            geographicAreaM49Reporter,
            geographicAreaM49Partner,
            flow,
            timePointYears,
            measuredItemCPC,
            value,
            qty
          ),
        by = c(
          'flow',
          'reporter' = 'geographicAreaM49Reporter',
          'partner'  = 'geographicAreaM49Partner',
          'item'     = 'measuredItemCPC',
          'year'     = 'timePointYears'
        )
      ) %>%
      dplyr::mutate(data_new = ifelse(data_type == 'qty', qty, value)) %>%
      dplyr::select(year, reporter, partner, item, flow,
                    data_new, everything(), -value, -qty)


    # XXX add dir.create if it doesn't exist
    saveRDS(
      corrections_unapplied_validation,
      file = file.path(corrections_dir, 'unapplied', paste0(year, '.rds'))
    )

    # Original corrections with the unapplied ones removed
    corrections_table_all_rm_unapplied <-
      anti_join(
        corrections_table_all,
        corrections_unapplied,
        by = c('reporter', 'partner', 'year', 'item',
               'flow', 'data_type', 'correction_level')
      )

    # Save the corrections again for reporters that had unapplied corrections
    # XXX add check for error in writing
    invisible(
      sapply(
        unique(corrections_unapplied$reporter),
        function(x) {
          saveRDS(
            dplyr::filter(corrections_table_all_rm_unapplied, reporter == x),
            file = file.path(corrections_dir, x, 'corrections_table.rds')
          )
        },
        USE.NAMES = FALSE
      )
    )

  }

} else {
  flog.trace("[%s] Corrections do not exist", PID, name = "dev")

  complete_trade_flow_cpc <- complete_trade_flow_cpc %>%
    dplyr::mutate(
      correction_metadata_qty   = NA_character_,
      correction_metadata_value = NA_character_
    )
}

##' 1. Transform dataset separating monetary values, quantities and unit values
##' in different rows.

##' 1. Convert monetary values, quantities and unit values to corresponding SWS
##' element codes. For example, a quantity import measured in metric tons is
##' assigned `5610`.

##+ convert_element

quantityElements <- c("5607", "5608", "5609", "5610", "5907", "5908", "5909", "5910")
uvElements       <- c("5637", "5638", "5639", "5630", "5937", "5938", "5939", "5930")

# XXX: check some NAs in reporters/partners

setDT(complete_trade_flow_cpc)

complete_trade_flow_cpc <-
  melt(
    complete_trade_flow_cpc,
    c('geographicAreaM49Reporter', 'geographicAreaM49Partner', 'flow',
      'timePointYears', 'measuredItemCPC', 'unit', 'flagObservationStatus_v',
      'flagObservationStatus_q', 'flagMethod_v', 'flagMethod_q',
      'correction_metadata_qty', 'correction_metadata_value'),
    measure.vars = c('value', 'qty', 'uv'),
    variable.name = 'measuredElementTrade',
    variable.factor = FALSE,
    value.name = 'Value'
  )

convertMeasuredElementTrade(complete_trade_flow_cpc)

complete_trade_flow_cpc <- complete_trade_flow_cpc[measuredElementTrade != "999"]

complete_trade_flow_cpc[,
                        correction_metadata_uv :=
                          ifelse(
                            !is.na(correction_metadata_qty),
                            ifelse(
                              !is.na(correction_metadata_value),
                              paste('QTY:', correction_metadata_qty, '| VALUE:', correction_metadata_value),
                              correction_metadata_qty
                            ),
                            correction_metadata_value
                          )
                        ]

complete_trade_flow_cpc[,
                        correction_metadata :=
                          ifelse(
                            measuredElementTrade %in% quantityElements,
                            correction_metadata_qty,
                            ifelse(
                              measuredElementTrade %in% uvElements,
                              correction_metadata_uv,
                              correction_metadata_value
                            )
                          )
                        ]

complete_trade_flow_cpc[,
                        c("flow", "unit", "correction_metadata_qty",
                          "correction_metadata_value", "correction_metadata_uv") := NULL]

#apply_tp_criterion <-
#  ReadDatatable("ess_trade_apply_tp_criterion",
#                where = paste0("year IN ('", year, "')"))
#
#apply_tp_criterion <- apply_tp_criterion[toupper(apply) == "TRUE"]

# Exclude by Tp criteria

flog.trace("[%s] Exclude by Tp criteria", PID, name = "dev")

if (nrow(to_mirror_raw) > 0) { # should always be true, but just in case...

  tp_criterion_dir <-
    file.path(
      Sys.getenv("R_SWS_SHARE_PATH"),
      paste0("trade/validation_tool_files/tp_criterion")
    )

  if (!file.exists(tp_criterion_dir)) {
    dir.create(tp_criterion_dir)
  }

  mirrored_aggregated_total <-
    complete_trade_flow_cpc[
      geographicAreaM49Reporter %in% fs2m49(as.character(to_mirror$area)) &
        ###########################################################
      # FIXME:
      # for now, removing items that can have heads as unit #####
      ###########################################################
      !(measuredItemCPC %in% fcl2cpc(stringr::str_pad((fclunits %>% dplyr::filter(fclunit != 'mt'))$fcl, 4, 'left', 0)))
      ][
        # Remove unit values
        substr(measuredElementTrade, 3, 3) != 3,
        .(Value = sum(Value)),
        .(geographicAreaM49 = geographicAreaM49Reporter,
          measuredElementTrade, measuredItemCPC, timePointYears)
        ]

  allReportersDim_tot <-
    Dimension(
      name = "geographicAreaM49",
      keys = c(na.omit(unique(mirrored_aggregated_total$geographicAreaM49)))
    )

  allElementsDim_tot <-
    Dimension(
      name = "measuredElementTrade",
      keys = unique(mirrored_aggregated_total$measuredElementTrade)
    )

  allItemsDim_tot <-
    Dimension(
      name = "measuredItemCPC",
      keys = unique(mirrored_aggregated_total$measuredItemCPC)
    )

  totaltradekey <-
    DatasetKey(
      domain = "trade",
      dataset = "total_trade_cpc_m49",
      dimensions =
        list(
          allReportersDim_tot,
          allElementsDim_tot,
          allItemsDim_tot,
          Dimension(name = "timePointYears",
                    keys = as.character((year-5):(year-1)))
        )
    )

  existing_total_data <- GetData(key = totaltradekey)

  existing_data_mean <-
    existing_total_data[,
                        .(Value_mean = mean(Value)),
                        .(geographicAreaM49, measuredElementTrade, measuredItemCPC)
                        ]

  mirrored_to_compare <-
    merge(
      mirrored_aggregated_total,
      existing_data_mean,
      by = c('geographicAreaM49', 'measuredElementTrade', 'measuredItemCPC')
    )

  tmp <- (to_mirror_raw %>% count(area) %>% filter(area < 300 & n > 1))$area

  countries_to_apply_tp_criterion <- faoswsUtil::fs2m49(as.character(tmp))

  rm(tmp)

  exclude_from_mirroring <-
    mirrored_to_compare[,
                        Value_ratio := Value / Value_mean
                        ][,
                          `:=`(
                            low = sum(Value_ratio < 0.6),
                            n = .N,
                            big_qty = Value_mean[substr(measuredElementTrade, 3, 3) == "1"] > 1000
                          ),
                          .(timePointYears, geographicAreaM49, substr(measuredElementTrade, 2, 2), measuredItemCPC)
                          ][,
                            # low == n means that both elements of a flow are low (e.g.,
                            # import quantity and monetary values are both low). If only
                            # one of the elements is low, that is an outlier, not bad TP.
                            exclude := (low == n & big_qty == TRUE)
                            ][
                              timePointYears == year &
                                geographicAreaM49 %in% countries_to_apply_tp_criterion
                              ][,
                                .(timePointYears, geographicAreaM49Reporter = geographicAreaM49, measuredItemCPC, measuredElementTrade, exclude)
                                ]

  if (nrow(exclude_from_mirroring) > 0) {
    complete_trade_flow_cpc <-
      merge(
        complete_trade_flow_cpc,
        exclude_from_mirroring,
        by = c("timePointYears", "geographicAreaM49Reporter", "measuredItemCPC", "measuredElementTrade"),
        all.x = TRUE
      )

    complete_trade_flow_cpc[is.na(exclude), exclude := FALSE]

    excluded_using_tp_criteria <- complete_trade_flow_cpc[exclude == TRUE]

    excluded_tp_csv_filename <-
      tempfile(pattern = "excluded_tp_", fileext = ".csv")

    flog.trace("[%s] Some excluded TP", PID, name = "dev")

    write.csv(excluded_using_tp_criteria, excluded_tp_csv_filename)

    saveRDS(excluded_using_tp_criteria, file.path(tp_criterion_dir, paste0(year, ".rds")))

    if (!CheckDebug()) {
      send_mail(
        from    = "SWS-trade-module@fao.org",
        to      = EMAIL_RECIPIENTS,
        subject = paste0("Trade plugin: Excluded Tp, year ", year),
        body    = c("Excluded by applying Tp criteria.", excluded_tp_csv_filename),
        remove  = TRUE
      )
    }

    complete_trade_flow_cpc <-
      complete_trade_flow_cpc[exclude == FALSE][, exclude := NULL]
  }
}

# / Exclude by Tp criteria


if (corrections_exist) {
  flog.trace("[%s] Generating metadata data.table", PID, name = "dev")

  ##' 1. Generate metadata for corrections.

  metad <- complete_trade_flow_cpc %>%
    dplyr::filter(!is.na(correction_metadata)) %>%
    dplyr::select(
      geographicAreaM49Reporter,
      geographicAreaM49Partner,
      measuredElementTrade,
      measuredItemCPC,
      timePointYears,
      correction_metadata
    ) %>%
    dplyr::mutate(
      Metadata          = "GENERAL",
      Metadata_Element  = "COMMENT",
      Metadata_Language = "en",
      Metadata_Value    = correction_metadata
    ) #%>%
  ### NOTE: metadata can be splitted as shown below, though there is
  ### still some work to do on how to store metadata of unit values
  #separate(
  #  correction_metadata, # Or Metadata_Value, if computed above
  #  into = c(
  #    'name_analyst',
  #    'data_original',
  #    'correction_type',
  #    'correction_note',
  #    'note_analyst',
  #    'note_supervisor',
  #    'name_supervisor',
  #    'date_correction',
  #    'date_validation'
  #  ),
  #  sep = ' *; *'
  #) %>%
  #gather(
  #  key,
  #  value,
  #  name_analyst,
  #  data_original,
  #  correction_type,
  #  correction_note,
  #  note_analyst,
  #  note_supervisor,
  #  name_supervisor,
  #  date_correction,
  #  date_validation
  #) %>%
  #select(-key) %>%
  #dplyr::rename(Metadata_Value = value)

  # Required to be a data.table
  metad <- dplyr::select(metad, -correction_metadata) %>%
    as.data.table()
}

complete_trade_flow_cpc <- complete_trade_flow_cpc %>%
  dplyr::select(-correction_metadata) %>%
  dplyr::mutate(
    flagObservationStatus = ifelse(measuredElementTrade %in% quantityElements,
                                   flagObservationStatus_q,
                                   flagObservationStatus_v),
    flagMethod            = ifelse(measuredElementTrade %in% quantityElements,
                                   flagMethod_q,
                                   flagMethod_v)
  ) %>%
  # The Status flag will be equal to the weakest flag between
  # the numerator and the denominator, in this case the denominator.
  dplyr::mutate(
    flagObservationStatus = ifelse(measuredElementTrade %in% uvElements,
                                   flagObservationStatus_q,
                                   flagObservationStatus),
    flagMethod            = ifelse(measuredElementTrade %in% uvElements,
                                   'i',
                                   flagMethod)
  ) %>%
  dplyr::select(
    -flagObservationStatus_v, -flagObservationStatus_q,
    -flagMethod_v, -flagMethod_q
  )

# Adding weights for livestock
complete_trade_flow_cpc <-
  bind_rows(
    complete_trade_flow_cpc,
    complete_trade_flow_cpc_live
  )


setDT(complete_trade_flow_cpc)

data.table::setcolorder(complete_trade_flow_cpc,
                        c("geographicAreaM49Reporter",
                          "geographicAreaM49Partner",
                          "measuredElementTrade",
                          "measuredItemCPC",
                          "timePointYears",
                          "Value",
                          "flagObservationStatus",
                          "flagMethod"))

# XXX Temporary workaround: some NAs are given flags and given
# that NAs cannot have flags the system refuses to save them.
# These NAs are unit values computed on a zero quantity. Setting
# Value to zero.
# FIXME: the comment above does not really hold. Check.
complete_trade_flow_cpc[is.na(Value), Value := 0]

## "official" status flag should be <BLANK> instead of X (this was a choice
## made after X was chosen as official flag). Thus, change X to <BLANK>.
###### SEE next instruction
#complete_trade_flow_cpc[flagObservationStatus == 'X', flagObservationStatus := '']
###### this:
# "official" status flag should be <BLANK> instead of X (this was a choice
# made after X was chosen as official flag). Thus, change X to <BLANK>. THEN
# Xp was introduced, so these need to stay
complete_trade_flow_cpc[flagObservationStatus == 'Y', flagObservationStatus := '']


##' 1. Removed "protected" data from the module's output.

##' 1. Remove transactions saved on SWS that are not generated
##' by the module.

if (remove_nonexistent_transactions) {
  flog.trace("[%s] Remove non-existent transactions (RNET)", PID, name = "dev")

  GetCodeList2 <- function(dimension = NA) {
    GetCodeList(
      domain    = 'trade',
      dataset   = 'completed_tf_cpc_m49',
      dimension = dimension
    )
  }

  Keys <- list(reporters = GetCodeList2(dimension = 'geographicAreaM49Reporter')[type == 'country', code],
               partners  = GetCodeList2(dimension = 'geographicAreaM49Partner')[type == 'country', code],
               items     = GetCodeList2(dimension = 'measuredItemCPC')[, code],
               elements  = GetCodeList2(dimension = 'measuredElementTrade')[code %in% c('5607', '5608', '5609', '5610', '5907', '5908', '5909', '5910', '5622', '5922',
                                                                                        '5630', '5930', '5638', '5938', '5639', '5939', '5637', '5937'), code],
               years     = as.character(year))

  # TODO: use error handling
  key <- DatasetKey(domain     = 'trade',
                    dataset    = 'completed_tf_cpc_m49',
                    dimensions = list(
                      Dimension(name = 'geographicAreaM49Reporter', keys = Keys[['reporters']]),
                      Dimension(name = 'geographicAreaM49Partner',  keys = Keys[['partners']]),
                      Dimension(name = 'measuredItemCPC',           keys = Keys[['items']]),
                      Dimension(name = 'measuredElementTrade',      keys = Keys[['elements']]),
                      Dimension(name = 'timePointYears',            keys = Keys[['years']])))

  flog.trace("[%s] RNET: Download existent SWS dataset", PID, name = "dev")

  existing_data <- GetData(key = key, omitna = TRUE)

  flog.trace("[%s] Keep protected data", PID, name = "dev")

  # Some flags are "protected", i.e., data with these flags
  # should not be overwritten/removed
  protected_flags <-
    flagValidTable[(Protected == TRUE &
                      !(flagObservationStatus == 'T' &  flagMethod == 'c') &
                      !(flagObservationStatus == 'I' &  flagMethod == 'c') &
                      !(flagObservationStatus == ''  &  flagMethod == 'c') &
                      !(flagObservationStatus == ''  &  flagMethod == 'h')) |
                     # Protect T,q
                     (flagObservationStatus == 'T' &  flagMethod == 'q') |
                     # Protect X,p
                     (flagObservationStatus == 'X' &  flagMethod == 'p'),
                   paste(flagObservationStatus, flagMethod)]

  # Data that should be left untouched
  protected_data <-
    existing_data[paste(flagObservationStatus, flagMethod) %in% protected_flags]

  # XXX If timePointYears will eventually be used they need to
  # have the same class in existing_data and complete_trade_flow_cpc

  # Remove from saved data
  existing_data <-
    existing_data[!protected_data,
                  on = c('geographicAreaM49Reporter',
                         'geographicAreaM49Partner',
                         'measuredElementTrade',
                         'measuredItemCPC')]

  # Remove from new data
  complete_trade_flow_cpc <-
    complete_trade_flow_cpc[!protected_data,
                            on = c('geographicAreaM49Reporter',
                                   'geographicAreaM49Partner',
                                   'measuredElementTrade',
                                   'measuredItemCPC')]


  # Difference between what was saved and what the module produced:
  # whatever is not produced in the run should be set to NA. See #164
  # (No need of year as key as all data refer to the same year)
  data_diff <-
    existing_data[!complete_trade_flow_cpc,
                  on = c('geographicAreaM49Reporter',
                         'geographicAreaM49Partner',
                         'measuredElementTrade',
                         'measuredItemCPC')]

  if (nrow(data_diff) > 0) {
    flog.trace("[%s] RNET: Non-existent transactions set to NA", PID, name = "dev")

    data_diff[,`:=`(Value                 = NA_real_,
                    flagObservationStatus = NA_character_,
                    flagMethod            = NA_character_)]

    complete_trade_flow_cpc <- rbind(complete_trade_flow_cpc, data_diff)
  } else {
    flog.trace("[%s] RNET: There are no non-existent transactions", PID, name = "dev")
  }
}

flog.trace("[%s] Remove unchanged flows", PID, name = "dev")

complete_trade_flow_cpc[, timePointYears := as.character(timePointYears)]

setnames(
  existing_data,
  c("Value", "flagObservationStatus", "flagMethod"),
  c("Value_ex", "flagObservationStatus_ex", "flagMethod_ex")
)

x <- existing_data[
  complete_trade_flow_cpc,
  on = c("geographicAreaM49Reporter", "geographicAreaM49Partner",
         "measuredElementTrade", "measuredItemCPC", "timePointYears")
  ]

complete_trade_flow_cpc_changed <-
  existing_data[
    complete_trade_flow_cpc,
    on = c("geographicAreaM49Reporter", "geographicAreaM49Partner",
           "measuredElementTrade", "measuredItemCPC", "timePointYears")
    ][
      !((dplyr::near(Value, Value_ex, tol = 0.000001) &
           flagObservationStatus == flagObservationStatus_ex &
           flagMethod == flagMethod_ex) %in% TRUE)
      ]

complete_trade_flow_cpc_changed[, c("Value_ex", "flagObservationStatus_ex", "flagMethod_ex") := NULL]

##' # Save data

##' Finally, data is saved in the `completed_tf_cpc_m49` dataset of
##' the `trade` domain.

flog.trace("[%s] Writing data to session/database", PID, name = "dev")

if (corrections_exist) {
  stats <- SaveData("trade",
                    "completed_tf_cpc_m49",
                    complete_trade_flow_cpc_changed,
                    metadata    = metad,
                    waitTimeout = 10800)
} else {
  stats <- SaveData("trade",
                    "completed_tf_cpc_m49",
                    complete_trade_flow_cpc_changed,
                    waitTimeout = 10800)
}

end_message <- sprintf(
  "Module completed in %1.2f minutes.
  Values inserted: %s
  appended: %s
  ignored: %s
  discarded: %s",
  difftime(Sys.time(), startTime, units = "min"),
  stats[["inserted"]],
  stats[["appended"]],
  stats[["ignored"]],
  stats[["discarded"]]
)

if (!CheckDebug()) {
  # XXX SWS error:
  #
  # 'ERROR: null value in column "discarded" violates not-null constraint
  #   Detail: Failing row contains (2775, 2019-04-10 10:54:00.591, 2014, 0, 2608, 3428166, null, 2019-04-10 10:54, 2019-04-10 10:21, 0.55 hours).'
  #
  #updateInfoTable(year = year, table = 'complete_tf_runs_info',
  #                mode = 'save', results = stats)

  send_mail(
    from    = "SWS-trade-module@fao.org",
    to      = EMAIL_RECIPIENTS,
    subject = paste0("Bilateral trade plugin (year ", year, ") ran successfully"),
    body    = end_message
  )
}

## remove value only

flog.trace("[%s] Session/database write completed!", PID, name = "dev")

flog.info(end_message)

# Restore changed options
options(old_options)

print(end_message)
SWS-Methodology/faoswsTrade documentation built on Feb. 13, 2023, 1:04 a.m.