R/FS_fRawDataProcessing.R

Defines functions FS_fRawDataProcessing

Documented in FS_fRawDataProcessing

#' This routine processes FLUXNET2015 and AmeriFlux raw data into a format that can be read by REddyProc.
#' In addition to creating REddyProc input files, this routine also generates (1) FLUXNET2015 dataframes using gap-filled data products,
#' and (2) output datasets for plotting purposes.
#' If the user so chooses, 4 composite figures can also be exported for data qc purposes.

#' @export
#' @title Process raw flux data (FLUXNET2015, AmeriFlux)
#' @param inpath character string, navigate to the directory containing raw flux data
#' @param outpath character string, set the directory by which data/figs will be exported (creates 3 new subdirectories: '/1_REddyProc inputs', '/2_FLUXNET2015 data products', '/3_Raw data for plotting')
#' @param site.meta dataframe, site metadata file contained in FluxSynthU
#' @param flag numeric, specify the quality check threshold to exclude (e.g. 'flag = c(2:3)' will exclude 'med' and 'poor' gapfills, 2 and 3) (FLUXNET only)
#' @param batch_process logical, if TRUE, batch processes all sites within the inpath directory (defaults to FALSE).

#'
#' @importFrom stringr str_remove str_detect str_remove
#' @importFrom patchwork plot_annotation




# Author: Kenny Smith

# Version notes -----------------------------------------------------------
# v5.0.KS
# 201005
# Major overhaul to ensure more flexible coding. Functionally there is nothing new here, but the code is more organized and relies on less hard-coding


FS_fRawDataProcessing <- function(inpath, outpath, site.meta, flag, batch_process = FALSE) {

  # Required packages -------------------------------------------------------
  # require(tidyverse) # bread 'n butta (ggplot2, tid1yr, dplyr, etc)
  # require(lubridate) # for easily manipulating date strings
  # require(patchwork) # creates composite figures with a clean interface
  # require(lutz) # allows you to determine UTC offset based on lat/long coords
  # require(svDialogs)
  # require(shiny)
  # require(beepr)



  # File handling -----------------------------------------------------------

  if (!dir.exists(outpath)) {
    message(paste('Warning! No such directory exists!'))
    x <- askYesNo(paste("Would you like to create a new directory in: ",outpath))

    if (x) {
      dir.create(file.path(outpath), showWarnings = FALSE)
    }
  }

  data_outpath <- paste0(outpath, "/1_Processed data")
  doc_outpath <- paste0(outpath, "/2_Documents")


  # Create main directories for the exported data and documents/figures
  dir.create(file.path(data_outpath), showWarnings = FALSE)
  dir.create(file.path(doc_outpath), showWarnings = FALSE)

  # Create subdirectories to house the exported data
  dir.create(file.path(paste0(data_outpath, "/1_REddyProc inputs")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/2_FLUXNET2015 data products")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/3_Raw data for plotting")), showWarnings = FALSE)

  dir.create(file.path(paste0(data_outpath, "/4_REddyProc outputs/1_after-night-part")), showWarnings = FALSE, recursive = TRUE)
  dir.create(file.path(paste0(data_outpath, "/4_REddyProc outputs/2_after-day-part")), showWarnings = FALSE, recursive = TRUE)
  dir.create(file.path(paste0(data_outpath, "/5_Stacked REddyProc outputs/1_after-night-part")), showWarnings = FALSE, recursive = TRUE)
  dir.create(file.path(paste0(data_outpath, "/5_Stacked REddyProc outputs/2_after-day-part")), showWarnings = FALSE, recursive = TRUE)

  dir.create(file.path(paste0(data_outpath, "/6_Complete output datasets")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/7_GPPsat data")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/8_Critical dates")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/9_Complete output datasets (winter only)")), showWarnings = FALSE)
  dir.create(file.path(paste0(data_outpath, "/10_Winter NEE v Tsoil")), showWarnings = FALSE)


  # Create subdirectories to house the exported figs
  dir.create(file.path(paste0(doc_outpath, "/1_Initial site quality check")), showWarnings = FALSE)
  dir.create(file.path(paste0(doc_outpath, "/2_Critical dates")), showWarnings = FALSE)
  dir.create(file.path(paste0(doc_outpath, "/3_Winter NEE v Tsoil")), showWarnings = FALSE)
  dir.create(file.path(paste0(doc_outpath, "/4_Winter NEE v Tsoil - Summary Stats")), showWarnings = FALSE)



  # find all raw data files (including rejected sites)
  all.files <- list.files(paste(inpath), recursive = TRUE)

  # select only the non-rejected ('keep') files
  keep.files <- all.files[ !grepl("rejected sites", all.files) ]

  # extract paths for FLUXNET2015 and AMERIFLUX (half-) hourly data
  fluxnet.files <- keep.files[grep(pattern = '\\FULLSET_H', keep.files)]
  amf.files <- keep.files[grep(pattern = '\\BASE_H', keep.files)]
  neon.files <- keep.files[grep(pattern = '\\NEO', keep.files)]

  # concatenate the flux datasets
  files <- c(fluxnet.files, amf.files, neon.files)

  # list of all available sites with database type (eg, 'AMF_CA-Qcu')
  db.and.sites <- unique(substr(x = files, start = 1,stop = 10))

  # matching database for each
  all.site.db <- substr(x = files, start = 1,stop = 3)

  neon_index <- all.site.db %in% "NEO"

  # all available site names
  flx.amf.site.names <- substr(x = files[!neon_index], start = 5,stop = 10)
  neon.site.names <- substr(x = files[neon_index], start = 5,stop = 8)
  all.site.names <- c(flx.amf.site.names, neon.site.names)

  # update db.and.sites to fix NEON names
  db.and.sites[neon_index] <- gsub("_N", "", db.and.sites[neon_index])



  # Select site(s) or batch process all sites at once
  if (batch_process) {

    selected.db_sites <- db.and.sites
    selected.sites <- all.site.names

    export.files <- TRUE
    export.plot.data <- TRUE
    export.plots <- TRUE

  } else {

    # give a description for your routine:
    routine_description <- "Prepare raw flux data for QC and gap-filling / flux-partitioning"


    # Here the user provides arguments for how the routine behaves
    choices <- fRawDataProc_UserInputs(db.and.sites, descrip = routine_description)

    # Assign user choices to objects
    export.files <- !is.null(attributes(choices[[1]]$`Export datafiles for REddyProc?`)$stselected)
    export.plot.data <- !is.null(attributes(choices[[1]]$`Export plotting files?`$`Export plotting data as .txt? (needed for Shiny QC)`)$stselected)
    export.plots <- !is.null(attributes(choices[[1]]$`Export plotting files?`$`Export plots as .png? (16-panel; 1-6 MB per plot)`)$stselected)



    if (length(choices[["AMF"]]) > 0) {
      AMF_selected <- paste0("AMF_", choices[["AMF"]])
    } else {AMF_selected <- NULL}

    if (length(choices[["FLX"]]) > 0) {
      FLX_selected <- paste0("FLX_", choices[["FLX"]])
    } else {FLX_selected <- NULL}

    if (length(choices[["NEO"]]) > 0) {
      NEO_selected <- paste0("NEO_", choices[["NEO"]])
    } else {NEO_selected <- NULL}



    # Includes database name in site selection
    selected.db_sites <- c(AMF_selected, FLX_selected, NEO_selected)

    # Omits database name in site selection
    selected.sites <- c(choices[["AMF"]], choices[["FLX"]], choices[["NEO"]])

  }


  # User inputs -------------------------------------------------------------



  # Loop thru sites ---------------------------------------------------------
  for (i in 1:length(selected.db_sites)) {



    # Get site info -----------------------------------------------------------
    # The current site (and database)
    db_site <- selected.db_sites[i]

    # Extract the database name and site name from the 'db_site' string
    db <- substr(x = db_site, start = 1, stop = 3)
    site <- str_remove(db_site, paste0(db, "_"))


    # Check whether site exists in more than one database
    site.test <- selected.db_sites[grep(pattern = site, selected.db_sites)]


    if (length(site.test)>1){
      message(paste('Site', site, 'exists in both FLUXNET2015 and AmeriFlux databases, defaulting to AMF'))
      # beep('ping')
      db <- 'AMF'
    }



    # Get site metadata
    site.info <- fGetSiteInfo(dat = site.meta,
                 site = site,
                 db = db,
                 site_col = "site_ID",
                 db_col = "database",
                 site_name_col = "site_Name",
                 start_yr_col = "start_year",
                 end_yr_col = "end_year",
                 exclude_yr_col = "exclude_years",
                 lat_col = "lat",
                 long_col = "long",
                 country_col = "country",
                 IGBP_col = "IGBP",
                 elev_col = "elev_orig",
                 MAT_col = "MAT_orig",
                 MAP_col = "MAP_orig")


    message('')
    message('')
    message('Processing site (', min(which(selected.sites %in% site)), '/',
            length(selected.sites), '):')
    message('\t\t', site, ', ', db, " (", site.info$country, ")")
    message('\t\tName: ', site.info$info)
    message('\t\tlat: ', round(site.info$lat,3), '\tlong: ', round(site.info$long,3))
    message('\t\tMAT: ', round(site.info$MAT,1), '°C\tMAP: ', round(site.info$MAP,0),  'mm')
    message('\t\tForest Type: ', site.info$IGBP)

    message('')
    message('  ', 'Years of record: ', site.info$start_year, '-', site.info$end_year)



    message('')
    message('  ', 'Reading flux data from:  ')
    message('    ', paste0(inpath))

    # Find only the files that match your site selection
    # this should always be be a single file per site for Fluxnet and Ameriflux
    # but one for year for NEON sites - will need to update this
    logical_test_for_selected_sites <- all.site.names %in% site & all.site.db %in% db
    file <- files[logical_test_for_selected_sites]



    # Import data -------------------------------------------------------------
    if (db == 'FLX') {
      dat <- read.csv(paste(inpath, file, sep ="/"), na.strings = -9999, stringsAsFactors = FALSE)
    } else if (db == 'AMF') {
      dat <- read.csv(paste(inpath, file, sep ="/"), na.strings = -9999, stringsAsFactors = FALSE, skip = 2)
    } else if (db == 'NEO') {
      dat <- read.table(paste(inpath, file, sep ="/"), na.strings = -9999, stringsAsFactors = FALSE, header=TRUE)
    }




    # The following line of code looks for any 'character' columns and converts them to 'numeric.'
    dat <- fCharacter2Numeric(dat)

    # The following line of code looks for any 'logical' columns and converts them to 'numeric.'
    dat <- fLogic2Numeric(dat)



    # Data handling -----------------------------------------------------------
    # parse data based on database type (db)
    message('')
    message('  ', 'Parsing flux data...')

    if (db == "FLX") {
      # fParseFLX2015() is a new user function that takes a FLUXNET2015 half-hourly (HH) or hourly (HR) datafile...
      # ...and parses out date strings as well as selects data columns of interest.
      # This function takes 4 arguments: 'dat' (your dataframe), 'tz_name' and 'UTC_offset' are the timezone names and respective offset (hrs)
      # from UTC timezone, and 'qc' (when set to TRUE, it gives an NA to every data that was flagged as bad (qc = 3))

      dat_processed <- fParseFLX2015(dat, site.info$tz_name, site.info$UTC_offset, flag)
    } else if (db == "AMF"){
      # fParseAmeriFlux() reads AmeriFlux database files
      # these do not have qc flags
      dat_processed <- fParseAmeriFlux(site, dat, site.info$tz_name, site.info$UTC_offset)
    } else if (db == 'NEO') {
      # fParseNEON() reads NEON database files
      # these do not have qc flags
      dat_processed <- fParseNEON(dat, site.info$tz_name, site.info$UTC_offset)
    } else {}


    # clear variables that will not be used again
    rm(dat)

    test_for_missing <- sapply(dat_processed$k_out, function(x)all(is.na(x)))
    missing_all <- colnames(dat_processed$k_out)[test_for_missing]
    test_for_qc <- str_detect(missing_all, "qc_")
    missing_dat <- missing_all[!test_for_qc]
    missing_dat <- str_remove(missing_dat, "k_")

    if (length(missing_dat) >= 1){
      message('')
      message('  ', 'The following data were not available for site ', site, ': ')
      message('    ', paste(as.character(missing_dat),collapse=", ",sep=""))
    }

    years_of_record <- dat_processed$years_of_record


    # Data visualization ------------------------------------------------------
    if (export.plots == TRUE) {
      message('')
      message('  ', 'Generating plots...')

      # Grab your plotting data
      k_plots_24h <- dat_processed$k_plots

      # Subset your plotting data for midday hours only (includes 10 am up to 2 pm)
      k_plots_midday <- subset(dat_processed$k_plots, k_HH >= 10 & k_HH < 14)

      # Create full time series plots of 16 variables
      # (i.e. years are stacked end-to-end)
      fig_full_midday <- fPlot_FullTimeSeries_16panel_InitialQC(site.info, dat = k_plots_midday, POSIXdate_col = "k_POSIXdate_plotting")
      fig_full_24h <- fPlot_FullTimeSeries_16panel_InitialQC(site.info, dat = k_plots_24h, POSIXdate_col = "k_POSIXdate_plotting")

      # Create interannual time series plots of 16 variables
      # (i.e. data are plotted as a function of decimal day, with 'year' as the grouping variable)
      fig_interann_midday <- fPlot_InterannualTimeSeries_16panel_InitialQC(site.info, dat = k_plots_midday, DOY_col = "k_dd", year_col = "k_YY")
      fig_interann_24h <- fPlot_InterannualTimeSeries_16panel_InitialQC(site.info, dat = k_plots_24h, DOY_col = "k_dd", year_col = "k_YY")


      # Annotate each plot with information about how (half-)hourly data was filtered
      Fig1_FullTimeSeries_midday <- fig_full_midday$fig_full +
        plot_annotation(caption = "Midday (1000-1330 hrs)", theme = theme(plot.caption = element_text(size = 13, face="italic")))

      Fig2_FullTimeSeries_24h <- fig_full_24h$fig_full +
        plot_annotation(caption = "Full day (0-2400 hrs)", theme = theme(plot.caption = element_text(size = 13, face="italic")))

      Fig3_InterannualTimeSeries_midday <- fig_interann_midday$fig_interann +
        plot_annotation(caption = "Midday (1000-1330 hrs)", theme = theme(plot.caption = element_text(size = 13, face="italic")))

      Fig4_InterannualTimeSeries_24h <- fig_interann_24h$fig_interann +
        plot_annotation(caption = "Full day (0-2400 hrs)", theme = theme(plot.caption = element_text(size = 13, face="italic")))


      rm(k_plots_24h, k_plots_midday, fig_full_midday, fig_full_24h, fig_interann_midday, fig_interann_24h)

    }


    # Export plotting data and figures ----------------------------------------
    if (export.plot.data) {
      # Extract plotting data
      k_plots_24h <- dat_processed$k_plots

      plot.data.suffix <- "raw_data_for_plotting.txt"
      plot.data.outpath <- paste0(data_outpath, "/3_Raw data for plotting/") # MAKE SURE THIS ENDS WITH A '/'

      message('')
      message('  ', 'Exporting plot data to:  ')
      message('    ', paste0(plot.data.outpath))

      write.table(k_plots_24h, file = paste0(plot.data.outpath, sprintf("%s_%s_%d-%d-%s",
                                                                        db, site, years_of_record[1], tail(years_of_record,1), plot.data.suffix)),
                  sep = "\t", row.names = FALSE,
                  col.names = TRUE,
                  append = FALSE)
    }


    if (export.plots) {
      # export plots
      plot.png_outpath <- paste0(doc_outpath, "/1_Initial site quality check/")

      ## We want to send our exported plots to subdirectories based on date of creation using: dir.create(paste0(outpath.plots, Sys.Date()))
      dir.create(paste0(plot.png_outpath, Sys.Date()))

      timestamped.outpath.plots <- paste0(plot.png_outpath, Sys.Date())


      message('')
      message('  ', 'Exporting plots to:  ')
      message('    ', paste0(timestamped.outpath.plots))
      message('')
      message('    ', 'Exporting plot: 1/4')
      ggsave(path=timestamped.outpath.plots, filename = paste0(sprintf("%s_%s_%d-%d-%s",
                                                                       db, site, years_of_record[1], tail(years_of_record,1),
                                                                       "QC_FullTimeSeries_midday.png")),
             plot = Fig1_FullTimeSeries_midday, width = 16, height = 12, dpi=300)
      message('    ', 'Exporting plot: 2/4')
      ggsave(path=timestamped.outpath.plots, filename = paste0(sprintf("%s_%s_%d-%d-%s",
                                                                       db, site, years_of_record[1], tail(years_of_record,1),
                                                                       "QC_FullTimeSeries_24h.png")),
             plot = Fig2_FullTimeSeries_24h, width = 16, height = 12, dpi=300)
      message('    ', 'Exporting plot: 3/4')
      ggsave(path=timestamped.outpath.plots, filename = paste0(sprintf("%s_%s_%d-%d-%s",
                                                                       db, site, years_of_record[1], tail(years_of_record,1),
                                                                       "QC_Interannual_midday.png")),
             plot = Fig3_InterannualTimeSeries_midday, width = 16, height = 12, dpi=300)
      message('    ', 'Exporting plot: 4/4')
      ggsave(path=timestamped.outpath.plots, filename = paste0(sprintf("%s_%s_%d-%d-%s",
                                                                       db, site, years_of_record[1], tail(years_of_record,1),
                                                                       "QC_Interannual_24h.png")),
             plot = Fig4_InterannualTimeSeries_24h, width = 16, height = 12, dpi=300)

    }


    if (export.plot.data | export.plots) {
      rm(Fig1_FullTimeSeries_midday, Fig2_FullTimeSeries_24h, Fig3_InterannualTimeSeries_midday, Fig4_InterannualTimeSeries_24h)
    }


    # Output annual files for REddyProc ---------------------------------------
    if (export.files) {

      k_out <- dat_processed$k_out

      # Set outpath and file naming structure for the REddyProc export file
      REP.suffix <- "ToBePartitioned.txt"

      REP.outpath <- paste0(data_outpath, "/1_REddyProc inputs/")

      message('')
      message('  ', 'Exporting REddyProc input files to:  ')
      message('    ', paste0(REP.outpath))

      for (j in dat_processed$years_of_record) {

        # Logical test to determine whether the hour column contains any half-hours (hourly datafiles will return a vector of all TRUE)
        test_for_hourly_dat <- fCheckInteger(unique(k_out$k_HH))


        # Depending on whether we have hourly or half-hourly data, we want to format the output file so that REddyProc will behave.
        # The following code looks for whether the dataset is hourly or half-hourly, then runs 1 of 2 different functions:
        # fHRTimestampforREddyProc() - for hourly files
        # fHHTimestampforREddyProc() - for half-hourly files
        # Both perform a similar routine, outlined below.

        # REddyProc is expecting input files formatted in a very particular way: https://www.bgc-jena.mpg.de/bgi/index.php/Services/REddyProcWebDataFormat
        # Unforunately, when we parse out our data, it oftens violates this required format (esp on leapyears).
        # To get around this issue, these functions generate a list of dataframes for each year from 1980-2100 (plenty of time to work with).

        # Each data frame contains the Year, DoY, and Hour columns formatted EXACTLY how REddyProc likes.
        # We then match our data to these columns according to the exact same timestamp (Year, DoY, Hour) that we parsed out earlier using fParseFLX2015().

        ### CAREFUL HERE - So far I haven't had any issues with this method, but it's worth being cautious.
        if (!FALSE %in% test_for_hourly_dat) {
          k_this_year <- fHRTimestampforREddyProc(k_out, j, YY= "k_YY", DD = "k_doy", HH = "k_HH")
        } else {
          k_this_year <- fHHTimestampforREddyProc(k_out, j, YY= "k_YY", DD = "k_doy", HH = "k_HH")
        }


        # if any column is all nans, change to -9999 so that REddyProc does not skip creating output columns for it
        ### CAREFUL HERE - this may cause problems, not adequately tested
        for (k in 1:ncol(k_this_year)) {
          if (sum(is.na(k_this_year[,k])) == nrow(k_this_year)) {
            k_this_year[,k] = -9999
          }
        }

        # Specify the columns to keep for REddyProc (REP)
        cols_for_REP <- c("k_YY", "k_doy", "k_HH", "k_NEE", "k_LE", "k_H", "k_SW_in", "k_Tair", "k_Tsoil", "k_RH", "k_VPD", "k_ustar", "k_PPFD")

        k_REP <- k_this_year[,cols_for_REP]

        write.table(k_REP, file = paste0(REP.outpath, sprintf("%s_%s-%d-%s", db, site, j, REP.suffix)),
                    sep = "\t", row.names = FALSE,
                    col.names = c("Year", "DoY", "Hour", "NEE", "LE", "H", "Rg",
                                  "Tair", "Tsoil", "rH", "VPD", "Ustar", "PPFD"),
                    append = FALSE)

        # clear large variables
        rm(k_REP, k_this_year)

      }



      # Output data products (FLUXNET only) -------------------------------------
      if (db == "FLX") {
        k_FLX_products <- dat_processed$k_FLX_products

        # Set outpath and file naming structure for the REddyProc export file
        FLX_prod.suffix <- "FLUXNET2015_Products.txt"
        FLX_prod.outpath <- paste0(data_outpath, "/2_FLUXNET2015 data products/")

        message('')
        message('  ', 'Exporting FLUXNET2015 gapfilled product files to:  ')
        message('    ', paste0(FLX_prod.outpath))


        varnames <- c("Year", "DoY", "Hour", "NEE_U50_FLX", "NEE_U50_QC_FLX",
                      "GPP_NT_U50_FLX", "RECO_NT_U50_FLX", "GPP_DT_U50_FLX", "RECO_DT_U50_FLX")

        write.table(k_FLX_products, file = paste0(FLX_prod.outpath, sprintf("%s_%s_%d-%d-%s",
                                                                            db, site, years_of_record[1], tail(years_of_record,1), FLX_prod.suffix)),
                    sep = "\t", row.names = FALSE,
                    col.names = varnames,
                    append = FALSE)


        # clear large variables
        rm(k_FLX_products)
      }


      # clear large variables
      rm(k_out)

    }

    message('')
    message('Finalizing site (', min(which(selected.sites %in% site)), '/',
            length(selected.sites), '): ', site, ', ', db)
    message('')
    message('')
  }


  # clear large variables
  rm(dat_processed)
}






# Prior version notes -----------------------------------------------------
# v3.0.KS
# 200826
# added more site meta data to exported figures

# v1.0.KS
# 200512
# fRawDataProcessingKS is a function that takes raw flux (half-)hourly flux data from NEON, AmeriFlux, and FLUXNET2015 databases
# and export 3 types of files:

# 1. Dataframes formatted for REddyProc gap-filling/partitioning routine (stored in "./Data/Processed data/REddyProc Inputs/")
# 2. Dataframes with additional datetime variables for plotting with the raw data screening Shiny application (files stored in "./Data/Processed data/Raw data for plotting/")
# 3. 16-panel .png figures (4 plots total: 24-h and midday, continous time series and interannual) for QA/QC

# By default, bad data (flagged as val = 3) are removed from FLUXNET2015 data during processing.
# This was done to streamline the data pipeline.
ksmiff33/FluxSynthU documentation built on Dec. 15, 2020, 10:29 p.m.