library(knitr) opts_knit$set(root.dir="../../..") # file paths are relative to the root of the project directory opts_chunk$set(echo=FALSE) library(tradeflows) library(dplyr) library(tidyr) library(ggplot2) # Comment out to pass tableanalysed as a parameter in the parent environment recenttable <- 'raw_comext_monthly_201709' archivetable <- 'raw_comext_monthly_2016S1' # Use a product which doesn't have a zero quantity # Tropical product (data starts only in 2012) # productanalysed <- 44072960 # Spruce sawnwood sub-product with the most number of lines (data starts as early as 2000) productanalysed <- "44071091" # Another spruce # productanalysed <- 44071031 con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows")
This document first tries to run the existing clean function - written for Comtrade data - on Comext data. Then it describes the development process needed to make the clean function work with Comext data.
tradeharvester::transfer7zfolder2db()
function on the Comext dataNote: 0 zero values and empty strings were replaced by NA (NULL in Mysql) during the load operation performed by `
wp <- tbl(con, recenttable) %>% filter(productcode == 44071031) %>% collect() # Descriptive statistics # number of rows wp %>% count() # Summary of data columns for that product summary(wp[c("tradevalue", "weight","quantity")]) # Number of 0 values wp %>% filter(tradevalue == 0) %>% count() wp %>% filter(weight== 0) %>% count() wp %>% filter(quantity == 0) %>% count()
clean(wp)
# sanitycheck OK wp %>% sanitycheck() # filterworldeu28 OK wp %>% # add dummy reporter and partner columns # mutate(reporter = NA, # partner = NA) %>% # Dummy reporter & partner are not necessary since 20170911, # the filterworldeu28() function now checks for the presence # of a "reporter" column and gives a warning if not present. filterworldeu28() # removeduplicatedflows wp %>% mutate(lastchanged = NA, flow = flowcode, reporter = reportercode) %>% # add dummy columns removeduplicatedflows() # addconversionfactorandprice --> price/weight is missing, # add pricew = tradevalue / weight to the function wp %>% addconversionfactorandprice() # addregion --> leads to erroneous region names since reportercde and partnercode a # are different between the Comtrade and Comext. wp %>% addregion() # extractprices price <- wp %>% mutate(flow = flowcode, regionreporter = 1, year = substr(as.character(period),1,4), unit = NA) %>% addconversionfactorandprice() %>% extractprices() # extractconversionfactors conversionfactor <- extractconversionfactors(dtf, geoaggregation = geoaggregation)
Prices are computed on a per-product basis. Trade flows data cleaning steps for a given product:
The goal is to reuse as many components from the Comtrade function
as possible. When it is not possible to reuse and or modify a function
written for Comtrade, then write a new function, which should still be
present in the tradeflows package, but in a separate script called
cleancomext.R
. These - Comext specific - functions
might be moved later to the "eutradeflows" package if they become too numerous.
However, there is no need to complexify the package structure.
If there is only a handfull of - Comext speficic - functions, then they are well placed in tradeflows package.
There is a difference between:
The 3 informations: price, pricew and conversion factor (cv) could be storred in the same database table since they are used at the same (time and geographical) aggregation level. I first added a conversion factor calculation to the extractprices() function. Then realised there were Inf et NaN values in the conversion factor. Which were normally filtered separately. Inf, NaN and NA values do not appear in the same rows for the price, pricew and conversion factor columns. That means that rows excluded vary between the 3 calculations. This means I will keep the price, pricew and cv calculation functions separate. I'll keep the 2 tables separate in the database too. Since I can't think of an absolute necessary to keep them together. And there is a slight chance that I might need different aggregation level for the 2 informations in the future.
# wp stands for wood products wparchive <- tbl(con, archivetable) %>% filter(productcode == productanalysed) %>% collect() wprecent <- tbl(con, recenttable) %>% filter(productcode == productanalysed) %>% collect() # Reassamble recent and archive data in one data frame wp <- rbind(wparchive, wprecent) # Extract year with integer division wp$year <- wp$period %/% 100 years <- unique(wp$year) # Are there any missing years? identical(min(years):max(years), as.integer(years)) wp <- wp %>% addconversionfactorandprice() prices <- wp %>% # grouping by productcode is technicaly not necessary, because # cleaning operations are performed product by product. # But it doesn't cost much to keep the product information in the grouping, # and it might prevent confusion in later data processing, # so keep "productcode" in the grouping. extractprices(grouping = c("productcode", "flowcode", "year")) %>% # rename year to period rename(period = year) # What DB is currently connected? message("Connected to the `", RMariaDB::dbGetInfo(con)$dbname, "` database.") # Write prices to the database RMariaDB::dbWriteTable(con, "vld_comext_price", prices, append = TRUE, row.names = FALSE)
prices %>% gather(key, value, -productcode, -flowcode, - year) %>% # Add flow names for the plot left_join(data_frame(flowcode = c(1,2), flow = c("Import", "Export")), by = "flowcode") %>% ggplot(aes(x = year, y = value, color = key)) + geom_line() + facet_wrap(~flow)
pricew <- wp %>% extractpricew(grouping = c("productcode", "flowcode", "year")) %>% rename(period = year) RMariaDB::dbWriteTable(con, "vld_comext_pricew", pricew, append = TRUE, row.names = FALSE)
# current calculation has potentially 12 flows between 2 partner in each # flow direction for each product per year. # I could also perform instead a year grouping before the calculation # so that there would only be one flow per year. # And then use the year variable in the grouping. cv <- wp %>% extractconversionfactors(grouping = c("productcode", "flowcode", "year")) %>% rename(period = year) # Before using append=TRUE, you should empty the table RMariaDB::dbSendQuery(con, "TRUNCATE table vld_comext_cv") RMariaDB::dbWriteTable(con, "vld_comext_cv", cv, append = TRUE, row.names = FALSE)
# Assign variables which will be used as function parameters in the real function RMariaDBcon <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") tablearchive <- "raw_comext_monthly_2016S1" tablerecent <- "raw_comext_monthly_201709" tableprice <- "vld_comext_price" tablepricew <- "vld_comext_pricew" tablecv <- "vld_comext_cv" # should have been copied from the vld_comext_monthly_template tablewrite <- "vld_comext_monthly" productanalysed # load trade flows from the database into a data frame # Load recent data dtfr <- tbl(RMariaDBcon, tablerecent) %>% filter(productcode == productanalysed) %>% # Add quantity units eutradeflows::addunit2tbl(RMariaDBcon, maintbl = ., tableunit = "vld_comext_unit") %>% collect() beginrecentdata <- min(dtfr$period) # Load archive data, for periods before the begin of recent data dtfa <- tbl(RMariaDBcon, tablearchive) %>% filter(productcode == productanalysed & period < beginrecentdata) %>% # Add quantity units eutradeflows::addunit2tbl(RMariaDBcon, maintbl = ., tableunit = "vld_comext_unit") %>% collect() # Combine archive and recent data dtf <- rbind(dtfa, dtfr) # Remove unnecessary objects rm(dtfa) rm(dtfr) # Add prices and conversion factors to the data frame dtf <- addconversionfactorandprice(dtf) # Prepare conversion factors and prices, # compute median prices, price bounds and conversion factors in a data frame # Extract year with integer division dtf$year <- dtf$period %/% 100 years <- unique(dtf$year) # Are there any missing years? if(!identical(min(years):max(years), as.integer(years))){ warning("These years are missing from the data: ", setdiff(min(years):max(years), years)) } # Edit column names to matche comtrade columns dtf <- mutate(dtf, # `unit` column hardcoded in estimatquantity() unit = unitcode, flag = 0) price <- extractprices(dtf, grouping = c("productcode", "flowcode", "year", "unit")) pricew <- extractpricew(dtf, grouping = c("productcode", "flowcode", "year", "unit")) price <- price %>% left_join(pricew, by = c("productcode", "flowcode", "year", "unit")) cvf <- extractconversionfactors(dtf, grouping = c("productcode", "flowcode", "year", "unit")) # Store rows before the change nrowbeforechange <- nrow(dtf) # Estimate quantity dtf <- estimatequantity(dtf, price, cvf) # Shave price # based on upper and lower prices added above # by the estimatequantity() function dtf <- shaveprice(dtf) count(dtf, flag) # Before writing prices back to the database, rename some columns dtf <- mutate(dtf, unitcode = unit) # Use database columns to select which columns to keep in the # data frame # get column names columnswrite <- RMariaDB::dbListFields(RMariaDBcon, "vld_comext_monthly_template") dtf <- select_(dtf, .dots = columnswrite) # Delete existing data for the given product query <- paste("DELETE FROM ", tablewrite, "WHERE productcode = ", productanalysed) res <- RMariaDB::dbSendQuery(RMariaDBcon, query) RMariaDB::dbClearResult(res) message(paste("Writing", nrow(dtf), "flows to the database.")) # Write dtf RMariaDB::dbWriteTable(con, name = tablewrite, value = dtf, append=TRUE, row.names = FALSE) # Write prices and conversion factors price <- rename(price, period = year) RMariaDB::dbWriteTable(con, name = tableprice, value = price, append=TRUE, row.names = FALSE) RMariaDB::dbWriteTable(con, name = tablepricew, value = pricew, append=TRUE, row.names = FALSE) RMariaDB::dbWriteTable(con, name = tablecv, value = cvf, append=TRUE, row.names = FALSE)
# "44079190" # productanalysed <- "44071091" # Another spruce # productanalysed <- 44071031 # The function returns a data frame invisibly which can be storred here # in dtf for inspection dtf <- cleancomextmonthly1product(con , productanalysed = "44071091", tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201709", tablewrite = "vld_comext_monthly", tableprice = "vld_comext_price", tablepricew = "vld_comext_pricew", tablecv = "vld_comext_cv") dtf %>% count(flag) # Plot quantity and quantity raw ggplot(dtf, aes(x = quantity, y = tradevalue)) + geom_point() + facet_wrap(~flag) ggplot(dtf, aes(x = weight, y = tradevalue)) + geom_point() + facet_wrap(~flag) ggplot(dtf, aes(x = weight, y = quantity)) + geom_point() + facet_wrap(~flag) prices <- dtf %>% group_by(year, flowcode, unit) %>% summarise(medianprice = unique(medianprice), medianpricew = unique(medianpricew)) p <- ggplot(prices, aes(x = year)) p + geom_line(aes(y = medianprice)) p + geom_line(aes(y = medianpricew))
RMariaDBcon <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") tablearchive <- "raw_comext_monthly_2016S1" tablerecent <- "raw_comext_monthly_201709" tablewrite <- "vld_comext_monthly" # Create an new empty vld_monthly table by copying frmo the template # Get a vector of products available in the database # Products in the recent data dtfr <- tbl(RMariaDBcon, tablerecent) %>% distinct(productcode) %>% collect() # Products in the archive data dtfa <- tbl(RMariaDBcon, tablearchive) %>% distinct(productcode) %>% collect() # Combine both in a vector products <- union(dtfr$productcode, dtfa$productcode) # Keep only 8 digit product codes # two digit products do not have a unit and it doesn't make sense to clean them products <- products[nchar(products)>2] # Loop on all products lapply(X = head(products), FUN = cleancomextmonthly1product, RMariaDBcon = RMariaDBcon, tablearchive = tablearchive, tablerecent = tablerecent, tablewrite = tablewrite # add ... here ) # There were several errors cleaning product code: 44011100, see below # 1. error concerning unit prefered # 2. error concerning length of 'dimnames' [2] not equal to array extent # this was due to a mistake in the if condition, # an inferior sign was inside nrow(price<0) instead of nrow(price)<0
# Update packages devtools::install_git('ssh://git@bitbucket.org/paul4forest/tradeharvester.git',install_vignettes = TRUE) devtools::install_github("EuropeanForestInstitute/tradeflows") devtools::install_github("stix-global/eutradeflows") # Create the database structure for validated data eutradeflows::createdbstructure(sqlfile = "vld_comext.sql", dbname = "tradeflows") library(tradeflows) # Connect to the database con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") # Clean one product 44079910 dtf <- cleancomextmonthly1product(con , productanalysed = "44071091", tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201710", tablewrite = "vld_comext_monthly", tablepriceconversion = "vld_comext_priceconversion") # Clean all products available in the database if(FALSE){ cleancomextmonthly(con , tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201710", tablewrite = "vld_comext_monthly", tabletemplate = "vld_comext_monthly_template", tablepriceconversion = "vld_comext_priceconversion") } # Disconnect from the database RMariaDB::dbDisconnect(con)
In the shell follow the log with
less comextcleaninglog.txt # Or check the mysql tables
In thet shell start the call that will be used in crontab
Rscript -e "library(tradeflows); cleancomext('tradeflows')"
Since productcode is stored as a character variable in SQL, is the filtering on productcode faster when productcode is a character, instead of a numeric value?
# numeric system.time(dtfa <- tbl(RMariaDBcon, tablearchive) %>% filter(productcode == 44071091) %>% collect()) # user system elapsed # 0.180 0.004 47.816 # character system.time(dtfa <- tbl(RMariaDBcon, tablearchive) %>% filter(productcode == "44071091") %>% collect()) # user system elapsed # 0.188 0.004 5.349
Yes, filtering only lasts 6 seconds for a productcode encoded as character. In comarison, filtering lasts 48 seconds when the productcode is encoded as a numeric variable.
# In the database, sumarise is translated to SQL tbl(RMariaDBcon, tablearchive) %>% summarise(n = sum(tradevalue, na.rm = TRUE)) %>% explain() %>% collect() # On a data frame
The addunit2tbl function was originally developped for 2 tbl objects, Will it work for one data frame and one tbl object? The answer is no, it returns the error:
Error: `x` and `y` must share the same src, set `copy` = TRUE (may be slow)
eutradeflows::addunit2tbl(RMariaDBcon, maintbl = dtf, tableunit = "vld_comext_unit")
In the end I decided to add units during the data frame load statement. See above, under the comment: "load trade flows from the database into a data frame"
Instruction to reproduce the error below.
Now fixed by checking if(nrow(price)>0)
before extracting the prefered
quantity unit.
cleancomextmonthly1product(con , productanalysed = "44011100", tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201709", tablewrite = "vld_comext_monthly", tableprice = "vld_comext_price", tablepricew = "vld_comext_pricew", tablecv = "vld_comext_cv") # Error: # no non-missing arguments to max; returning -InfError in dtf$unit[dtf$unit == "No Quantity"] <- unitprefered : # replacement has length zero # In debug mode, with a breakpoint in the cleancomextmonthly1product() function, # I can see that the price data frame is empty because # there are no quantities for that product. # Sum quantities for all produts # To find out if completely missing quantities appear frequently dtf <- tbl(RMariaDBcon, tablerecent) %>% group_by(productcode) %>% summarise(quantity = sum(quantity)) %>% collect() dtf %>% filter(is.na(quantity))
Raw data is present in the database. The output of the cleancomext() function shows empty message for each product. Cleaning one product individually seems to work but cleaning all products doesn't work.
# I created a table dump on the server # dumptable("tradeflows", "raw_comext_monthly_201803") # Downloaded the file, now loading the dump # con <- RMariaDB::dbConnect(RMariaDB::MariaDB(), dbname = "tradeflows") # Create fake validation table RMariaDB::dbSendQuery(con, sprintf("DROP TABLE IF EXISTS `%s`;", "vld_comext_monthly_to_delete")) RMariaDB::dbSendQuery(con, sprintf("CREATE TABLE %s LIKE %s;", "vld_comext_monthly_to_delete", "vld_comext_monthly_template")) # Try to validate one product dtf <- cleancomextmonthly1product(con ,productanalysed = "44071091", tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201803", tablewrite = "vld_comext_monthly_to_delete", tablepriceconversion = "vld_comext_priceconversion") # Other products # 44011000 # 44011100 # 44011200 # All products cleancomextmonthly(con, tablearchive = "raw_comext_monthly_2016S1", tablerecent = "raw_comext_monthly_201803", tablewrite = "vld_comext_monthly", tabletemplate = "vld_comext_monthly_template", tablepriceconversion = "vld_comext_priceconversion") # Find the imm product code to load it in the timeseries and sankey interface eutradeflows::classificationimm %>% filter(productcode == 44071091) # Run the cleancomext function by copy pasting its content at the R prompt # Feed the server with parameters of the cleancomext() function dbname <- "tradeflows" rawtabletemplate = "raw_comext_monthly_template" vldtabletemplate = "vld_comext_monthly_template" tablewrite = "vld_comext_monthly" tablepriceconversion = "vld_comext_priceconversion" templatecharacters = "template" logfile = paste0('~/public_html/log/validate',format(Sys.Date(), '%Y'),'.txt') # Also run the cleancomextmonthly() function by pasting all its content at the R prompt RMariaDBcon <- con tabletemplate = "vld_comext_monthly_template" tablepriceconversion = "vld_comext_priceconversion" tabletemplate = "vld_comext_monthly_template" logfile = file.path("~/log", "cleaningerrorlog.txt") # The function contains a loop on product codes # which makes the following call for each product # this call works fine, the data is cleaned cleancomextmonthly1product(RMariaDBcon = RMariaDBcon, productanalysed = productcode, tablearchive = tablearchive, tablerecent = tablerecent, tablewrite = tablewrite, tablepriceconversion = tablepriceconversion) # But the call is inside a try-catch statement # And running this try-catch statement doesn't work # Its seems to catche the warning and stop the execution # I added a message for debugging tryCatch({ cleancomextmonthly1product(RMariaDBcon = RMariaDBcon, productanalysed = productcode, tablearchive = tablearchive, tablerecent = tablerecent, tablewrite = tablewrite, tablepriceconversion = tablepriceconversion) }, error = function(errorcondition){ writeerror2log(errorcondition, logfile, paste("productcode:", productcode)) }, warning = function(warningcondition){ message(warningcondition) writeerror2log(warningcondition, logfile, paste("productcode:", productcode)) } ) # # Lets try a few product codes productcode <- 44071091
See ../issues_comext/units_2018_update
RMariaDB::dbDisconnect(con)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.