library(knitr)
opts_knit$set(root.dir="../../..") # file paths are relative to the root of the project directory
opts_chunk$set(echo=FALSE)
library(tradeflows)
library(dplyr)
library(tidyr)
library(ggplot2)

# Comment out to pass tableanalysed as a parameter in the parent environment
recenttable <- 'raw_comext_monthly_201709' 
archivetable <- 'raw_comext_monthly_2016S1'
# Use a product which doesn't have a zero quantity
# Tropical product (data starts only in 2012)
# productanalysed <- 44072960 
# Spruce sawnwood sub-product with the most number of lines (data starts as early as  2000)
productanalysed <- "44071091"
# Another spruce
# productanalysed <- 44071031
con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows")

This document first tries to run the existing clean function - written for Comtrade data - on Comext data. Then it describes the development process needed to make the clean function work with Comext data.

Run the existing tradeharvester::transfer7zfolder2db() function on the Comext data

Note: 0 zero values and empty strings were replaced by NA (NULL in Mysql) during the load operation performed by `

wp <- tbl(con, recenttable) %>% 
    filter(productcode == 44071031) %>% 
    collect()
# Descriptive statistics
# number of rows
wp %>% count()
# Summary of data columns for that product
summary(wp[c("tradevalue", "weight","quantity")])
# Number of 0 values
wp %>% filter(tradevalue == 0) %>% count()
wp %>% filter(weight== 0) %>% count()
wp %>% filter(quantity == 0) %>% count()

Crash test run the clean() function alone

clean(wp)

Run components of the clean() function one by one

# sanitycheck OK
wp %>% 
    sanitycheck() 

# filterworldeu28 OK
wp %>% 
    # add dummy reporter and partner columns 
    # mutate(reporter = NA,
    #        partner = NA) %>%  
    # Dummy reporter & partner are not necessary since 20170911, 
    # the filterworldeu28() function now checks for the presence
    # of a "reporter" column and gives a warning if not present.
    filterworldeu28()

# removeduplicatedflows
wp %>% 
    mutate(lastchanged = NA,
           flow = flowcode,
           reporter = reportercode) %>% # add dummy columns
    removeduplicatedflows()

# addconversionfactorandprice --> price/weight is missing, 
# add pricew = tradevalue / weight to the function
wp %>% 
    addconversionfactorandprice()

# addregion --> leads to erroneous region names since reportercde and partnercode a
# are different between the Comtrade and Comext.
wp %>% 
    addregion()

# extractprices
price <- wp %>% 
    mutate(flow = flowcode, 
           regionreporter = 1, 
           year = substr(as.character(period),1,4),
           unit = NA) %>% 
    addconversionfactorandprice() %>% 
    extractprices()

# extractconversionfactors
conversionfactor <- extractconversionfactors(dtf, geoaggregation = geoaggregation)

Develop a new clean function for Comext data

Workflow

Prices are computed on a per-product basis. Trade flows data cleaning steps for a given product:

Reflections

Reflections on where to locate functions between the tradeflows and eutradeflows packages

The goal is to reuse as many components from the Comtrade function as possible. When it is not possible to reuse and or modify a function written for Comtrade, then write a new function, which should still be present in the tradeflows package, but in a separate script called cleancomext.R. These - Comext specific - functions might be moved later to the "eutradeflows" package if they become too numerous. However, there is no need to complexify the package structure. If there is only a handfull of - Comext speficic - functions, then they are well placed in tradeflows package.

Reflection on monthly and yearly prices

There is a difference between:

Reflections on storing prices and conversion factors

The 3 informations: price, pricew and conversion factor (cv) could be storred in the same database table since they are used at the same (time and geographical) aggregation level. I first added a conversion factor calculation to the extractprices() function. Then realised there were Inf et NaN values in the conversion factor. Which were normally filtered separately. Inf, NaN and NA values do not appear in the same rows for the price, pricew and conversion factor columns. That means that rows excluded vary between the 3 calculations. This means I will keep the price, pricew and cv calculation functions separate. I'll keep the 2 tables separate in the database too. Since I can't think of an absolute necessary to keep them together. And there is a slight chance that I might need different aggregation level for the 2 informations in the future.

Extract prices and write them to the database

# wp stands for wood products
wparchive <- tbl(con, archivetable) %>% 
    filter(productcode == productanalysed) %>% 
    collect()
wprecent <- tbl(con, recenttable) %>% 
    filter(productcode == productanalysed) %>% 
    collect()
# Reassamble recent and archive data in one data frame    
wp <-  rbind(wparchive, wprecent)

# Extract year with integer division
wp$year <- wp$period %/% 100
years <- unique(wp$year)
# Are there any missing years?
identical(min(years):max(years), as.integer(years))

wp <- wp  %>% 
    addconversionfactorandprice() 

prices <- wp %>%
    # grouping by productcode is technicaly not necessary, because
    # cleaning operations are performed product by product.
    # But it doesn't cost much to keep the product information in the grouping,
    # and it might prevent confusion in later data processing,
    # so keep "productcode" in the grouping.
    extractprices(grouping = c("productcode", "flowcode", "year")) %>% 
    # rename year to period
    rename(period = year)


# What DB is currently connected?
message("Connected to the `", RMariaDB::dbGetInfo(con)$dbname, "` database.")

# Write prices to the database
RMariaDB::dbWriteTable(con, "vld_comext_price", prices,
                     append = TRUE,
                     row.names = FALSE)
prices %>% 
    gather(key, value, -productcode, -flowcode, - year) %>% 
    # Add flow names for the plot
    left_join(data_frame(flowcode = c(1,2), flow = c("Import", "Export")),
              by = "flowcode") %>%
    ggplot(aes(x = year, y = value, color = key)) + 
    geom_line() +
    facet_wrap(~flow)

Extract pricew and write them to the database

pricew <- wp %>% 
    extractpricew(grouping = c("productcode", "flowcode", "year")) %>% 
    rename(period = year)
RMariaDB::dbWriteTable(con, "vld_comext_pricew", pricew,
                     append = TRUE,
                     row.names = FALSE)

Extract conversion factors and write them to the database

# current calculation has potentially 12 flows between 2 partner in each
# flow direction for each product per year. 
# I could also perform instead a year grouping before the calculation
# so that there would only be one flow per year.
# And then use the year variable in the grouping. 
cv <- wp %>% 
    extractconversionfactors(grouping = c("productcode", "flowcode", "year")) %>% 
    rename(period = year)
# Before using append=TRUE, you should empty the table
RMariaDB::dbSendQuery(con, "TRUNCATE table vld_comext_cv")
RMariaDB::dbWriteTable(con, "vld_comext_cv", cv,
                     append = TRUE,
                     row.names = FALSE)

developt the cleancomextmonthly1product() function

# Assign variables which will be used as function parameters in the real function
RMariaDBcon <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") 
tablearchive <- "raw_comext_monthly_2016S1"
tablerecent <- "raw_comext_monthly_201709"
tableprice <- "vld_comext_price"
tablepricew <- "vld_comext_pricew"
tablecv <- "vld_comext_cv"
# should have been copied from the vld_comext_monthly_template
tablewrite <- "vld_comext_monthly" 
productanalysed

# load trade flows from the database into a data frame
# Load recent data
dtfr <- tbl(RMariaDBcon, tablerecent) %>% 
    filter(productcode == productanalysed) %>% 
    # Add quantity units
    eutradeflows::addunit2tbl(RMariaDBcon, 
                              maintbl = ., 
                              tableunit = "vld_comext_unit")  %>% 
    collect()
beginrecentdata <- min(dtfr$period)

# Load archive data, for periods before the begin of recent data
dtfa <- tbl(RMariaDBcon, tablearchive) %>% 
    filter(productcode == productanalysed & 
               period < beginrecentdata) %>% 
    # Add quantity units
    eutradeflows::addunit2tbl(RMariaDBcon, 
                              maintbl = ., 
                              tableunit = "vld_comext_unit")  %>% 
    collect()


# Combine archive and recent data
dtf <- rbind(dtfa, dtfr)

# Remove unnecessary objects
rm(dtfa)
rm(dtfr)


# Add prices and conversion factors to the data frame
dtf <- addconversionfactorandprice(dtf)
# Prepare conversion factors and prices,
# compute median prices, price bounds and conversion factors in a data frame
# Extract year with integer division
dtf$year <- dtf$period %/% 100
years <- unique(dtf$year)
# Are there any missing years?
if(!identical(min(years):max(years), as.integer(years))){
    warning("These years are missing from the data: ",
            setdiff(min(years):max(years), years))
}

# Edit column names to matche comtrade columns
dtf <- mutate(dtf,
              # `unit` column hardcoded in estimatquantity() 
              unit = unitcode,
              flag = 0)

price <- extractprices(dtf, grouping = c("productcode", "flowcode", 
                                         "year", "unit")) 
pricew <- extractpricew(dtf, grouping = c("productcode", "flowcode", 
                                          "year", "unit"))
price <- price %>% 
    left_join(pricew, by = c("productcode", "flowcode", "year", "unit"))
cvf <- extractconversionfactors(dtf, grouping = c("productcode", "flowcode", 
                                          "year", "unit"))
# Store rows before the change
nrowbeforechange <- nrow(dtf)

# Estimate quantity
dtf <- estimatequantity(dtf, price, cvf) 

# Shave price
# based on upper and lower prices added above
# by the estimatequantity() function
dtf <- shaveprice(dtf)


count(dtf, flag)

# Before writing prices back to the database, rename some columns 

dtf <- mutate(dtf,
              unitcode = unit)


# Use database columns to select which columns to keep in the 
# data frame
# get column names
columnswrite  <- RMariaDB::dbListFields(RMariaDBcon, "vld_comext_monthly_template")
dtf <- select_(dtf, .dots = columnswrite)
# Delete existing data for the given product
query <- paste("DELETE FROM ", tablewrite,
               "WHERE productcode = ", productanalysed)
res <- RMariaDB::dbSendQuery(RMariaDBcon, query)
RMariaDB::dbClearResult(res)
message(paste("Writing", nrow(dtf), "flows to the database."))
# Write dtf
RMariaDB::dbWriteTable(con, name = tablewrite, 
                     value = dtf, append=TRUE, row.names = FALSE)

# Write prices and conversion factors
price <- rename(price, period = year)

RMariaDB::dbWriteTable(con, name = tableprice, 
                     value = price, append=TRUE, row.names = FALSE)
RMariaDB::dbWriteTable(con, name = tablepricew, 
                     value = pricew, append=TRUE, row.names = FALSE)
RMariaDB::dbWriteTable(con, name = tablecv, 
                     value = cvf, append=TRUE, row.names = FALSE)

Use the function

# "44079190"
# productanalysed <- "44071091"
# Another spruce
# productanalysed <- 44071031

# The function returns a data frame invisibly which can be storred here
# in dtf for inspection
dtf <- cleancomextmonthly1product(con ,
                                  productanalysed = "44071091",
                                  tablearchive = "raw_comext_monthly_2016S1",
                                  tablerecent = "raw_comext_monthly_201709",
                                  tablewrite = "vld_comext_monthly",
                                  tableprice = "vld_comext_price",
                                  tablepricew = "vld_comext_pricew",
                                  tablecv = "vld_comext_cv")


dtf %>% count(flag)

# Plot quantity and quantity raw
ggplot(dtf, aes(x = quantity, y = tradevalue)) +
    geom_point() +
    facet_wrap(~flag)

ggplot(dtf, aes(x = weight, y = tradevalue)) +
    geom_point() +
    facet_wrap(~flag)

ggplot(dtf, aes(x = weight, y = quantity)) +
    geom_point() +
    facet_wrap(~flag)
prices <- dtf %>% group_by(year, flowcode, unit) %>% 
    summarise(medianprice = unique(medianprice),
              medianpricew = unique(medianpricew))

p <- ggplot(prices, aes(x = year)) 
p + geom_line(aes(y = medianprice))
p + geom_line(aes(y = medianpricew))

Develop clean comext function for many products

RMariaDBcon <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") 
tablearchive <- "raw_comext_monthly_2016S1"
tablerecent <- "raw_comext_monthly_201709"
tablewrite <- "vld_comext_monthly" 

# Create an new empty vld_monthly table by copying frmo the template


# Get a vector of products available in the database
# Products in the recent data
dtfr <- tbl(RMariaDBcon, tablerecent) %>% 
    distinct(productcode) %>% collect()
# Products in the archive data
dtfa <- tbl(RMariaDBcon, tablearchive) %>% 
    distinct(productcode) %>% collect()
# Combine both in a vector
products <- union(dtfr$productcode, dtfa$productcode)

# Keep only 8 digit product codes
# two digit products do not have a unit and it doesn't make sense to clean them
products <- products[nchar(products)>2]

# Loop on all products
lapply(X = head(products), 
       FUN = cleancomextmonthly1product,
       RMariaDBcon = RMariaDBcon,
       tablearchive = tablearchive,
       tablerecent = tablerecent,
       tablewrite = tablewrite # add ... here
       )


# There were several errors cleaning product code: 44011100, see below
# 1. error concerning unit prefered 
# 2. error concerning length of 'dimnames' [2] not equal to array extent
#    this was due to a mistake in the if condition, 
#    an inferior sign was inside nrow(price<0) instead of nrow(price)<0 

Run cleancomextmonthly on the server

# Update packages 
devtools::install_git('ssh://git@bitbucket.org/paul4forest/tradeharvester.git',install_vignettes = TRUE) 
devtools::install_github("EuropeanForestInstitute/tradeflows")
devtools::install_github("stix-global/eutradeflows")

# Create the database structure for validated data
eutradeflows::createdbstructure(sqlfile = "vld_comext.sql", dbname = "tradeflows")

library(tradeflows)
# Connect to the database
con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows")

# Clean one product 44079910
dtf <- cleancomextmonthly1product(con ,
                                  productanalysed = "44071091",
                                  tablearchive = "raw_comext_monthly_2016S1",
                                  tablerecent = "raw_comext_monthly_201710",
                                  tablewrite = "vld_comext_monthly",
                                  tablepriceconversion =
                                      "vld_comext_priceconversion")


# Clean all products available in the database
if(FALSE){
    cleancomextmonthly(con ,
                       tablearchive = "raw_comext_monthly_2016S1",
                       tablerecent = "raw_comext_monthly_201710",
                       tablewrite = "vld_comext_monthly",
                       tabletemplate = "vld_comext_monthly_template",
                       tablepriceconversion = "vld_comext_priceconversion")
}

# Disconnect from the database
RMariaDB::dbDisconnect(con)

In the shell follow the log with

less comextcleaninglog.txt 
# Or check the mysql tables

In thet shell start the call that will be used in crontab

Rscript -e "library(tradeflows); cleancomext('tradeflows')" 

Issues

20170928 Speed of selecting archive data

Since productcode is stored as a character variable in SQL, is the filtering on productcode faster when productcode is a character, instead of a numeric value?

# numeric
system.time(dtfa <- tbl(RMariaDBcon, tablearchive) %>% 
    filter(productcode == 44071091) %>% 
    collect())
#  user  system elapsed 
# 0.180   0.004  47.816 

# character
system.time(dtfa <- tbl(RMariaDBcon, tablearchive) %>% 
    filter(productcode == "44071091") %>% 
    collect())
#  user  system elapsed 
# 0.188   0.004   5.349 

Yes, filtering only lasts 6 seconds for a productcode encoded as character. In comarison, filtering lasts 48 seconds when the productcode is encoded as a numeric variable.

20170928 Sum trade values works also with na.rm=TRUE

# In the database, sumarise is translated to SQL
tbl(RMariaDBcon, tablearchive) %>%
    summarise(n = sum(tradevalue, na.rm = TRUE)) %>% 
    explain() %>% collect()
# On a data frame

20170928 addunit2tbl function

The addunit2tbl function was originally developped for 2 tbl objects, Will it work for one data frame and one tbl object? The answer is no, it returns the error:

Error: `x` and `y` must share the same src, set `copy` = TRUE (may be slow)
eutradeflows::addunit2tbl(RMariaDBcon, maintbl = dtf, 
                          tableunit = "vld_comext_unit") 

In the end I decided to add units during the data frame load statement. See above, under the comment: "load trade flows from the database into a data frame"

20171005 product 44011100 unitprefered : replacement has length zero

Instruction to reproduce the error below. Now fixed by checking if(nrow(price)>0) before extracting the prefered quantity unit.

cleancomextmonthly1product(con ,
                           productanalysed = "44011100",
                           tablearchive = "raw_comext_monthly_2016S1",
                           tablerecent = "raw_comext_monthly_201709",
                           tablewrite = "vld_comext_monthly",
                           tableprice = "vld_comext_price",
                           tablepricew = "vld_comext_pricew",
                           tablecv = "vld_comext_cv")
# Error:
# no non-missing arguments to max; returning -InfError in dtf$unit[dtf$unit == "No Quantity"] <- unitprefered : 
#   replacement has length zero

# In debug mode, with a breakpoint in the cleancomextmonthly1product() function, 
# I can see that the price data frame is empty because 
# there are no quantities for that product.



# Sum quantities for all produts
# To find out if completely missing quantities appear frequently
dtf <- tbl(RMariaDBcon, tablerecent) %>% 
    group_by(productcode) %>% 
    summarise(quantity = sum(quantity)) %>% 
    collect()

dtf %>% filter(is.na(quantity))

20180330 no validated data

Raw data is present in the database. The output of the cleancomext() function shows empty message for each product. Cleaning one product individually seems to work but cleaning all products doesn't work.

# I created a table dump on the server
# dumptable("tradeflows", "raw_comext_monthly_201803")
# Downloaded the file, now loading the dump

# con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows")
# Create fake validation table
RMariaDB::dbSendQuery(con, sprintf("DROP TABLE IF EXISTS `%s`;",
                                 "vld_comext_monthly_to_delete"))
RMariaDB::dbSendQuery(con, sprintf("CREATE TABLE %s LIKE %s;",
                                 "vld_comext_monthly_to_delete",
                                 "vld_comext_monthly_template"))
# Try to validate one product
dtf <- cleancomextmonthly1product(con ,productanalysed = "44071091", tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201803", tablewrite = "vld_comext_monthly_to_delete", tablepriceconversion = "vld_comext_priceconversion")

# Other products
# 44011000
# 44011100
# 44011200

# All products
cleancomextmonthly(con, tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201803", tablewrite = "vld_comext_monthly", tabletemplate = "vld_comext_monthly_template", tablepriceconversion = "vld_comext_priceconversion")

# Find the imm product code to load it in the timeseries and sankey interface
eutradeflows::classificationimm %>%  filter(productcode == 44071091)

# Run the cleancomext function by copy pasting its content at the R prompt
# Feed the server with parameters of the cleancomext() function
dbname <- "tradeflows"
rawtabletemplate = "raw_comext_monthly_template"
vldtabletemplate = "vld_comext_monthly_template"
tablewrite = "vld_comext_monthly"
tablepriceconversion = "vld_comext_priceconversion"
templatecharacters = "template"
logfile = paste0('~/public_html/log/validate',format(Sys.Date(), '%Y'),'.txt')

# Also run the cleancomextmonthly() function by pasting all its content at the R prompt
RMariaDBcon <- con
tabletemplate = "vld_comext_monthly_template"
tablepriceconversion = "vld_comext_priceconversion"
tabletemplate = "vld_comext_monthly_template"
logfile = file.path("~/log", "cleaningerrorlog.txt")

# The function contains a loop on product codes 
# which makes the following call for each product
# this call works fine, the data is cleaned
cleancomextmonthly1product(RMariaDBcon = RMariaDBcon,
                           productanalysed = productcode,
                           tablearchive = tablearchive,
                           tablerecent = tablerecent,
                           tablewrite = tablewrite,
                           tablepriceconversion = tablepriceconversion)

# But the call is inside a try-catch statement
# And running this try-catch statement doesn't work
# Its seems to catche the warning and stop the execution
# I added a message for debugging
tryCatch({
            cleancomextmonthly1product(RMariaDBcon = RMariaDBcon,
                                       productanalysed = productcode,
                                       tablearchive = tablearchive,
                                       tablerecent = tablerecent,
                                       tablewrite = tablewrite,
                                       tablepriceconversion = tablepriceconversion)
        }, error = function(errorcondition){
            writeerror2log(errorcondition, logfile,
                      paste("productcode:", productcode))
        }, warning = function(warningcondition){
            message(warningcondition)
            writeerror2log(warningcondition, logfile,
                      paste("productcode:", productcode))
        }
        )

# 
# Lets try a few product codes
productcode <- 44071091

2018 Update to the Comext data structure

See ../issues_comext/units_2018_update

RMariaDB::dbDisconnect(con)


EuropeanForestInstitute/tradeflows documentation built on Oct. 7, 2019, 10:57 a.m.