R/Handlers_csv.R

Defines functions convert_OzFlux convert_LaThuile read_era ReadCSVFluxData

Documented in convert_LaThuile convert_OzFlux ReadCSVFluxData read_era

#' Reads comma-delimited text files containing Fluxnet2015 data
#' 
#' @param fileinname fluxnet data file, including directory
#' @param vars data.table of variables and their attributes
#' @param time_vars vector of time variables
#' 
#' @return list of flux data, variables and timing information

ReadCSVFluxData <- function(
    fileinname, vars, datasetname, time_vars, add_psurf, site_log, ...) {

    ####### First read available variables, corresponding units and ranges ####

    # Get column names and classes:
    # La Thuile sites often have multiple data files, only use first
    # instance here
    tcol <- findColIndices(fileinname=fileinname[1],
                           var_names=vars$Fluxnet_variable,
                           var_classes=vars$Fluxnet_class,
                           essential_vars=vars$Essential_met,
                           preferred_vars=vars$Preferred_eval,
                           time_vars=time_vars,
                           #dset_vars=dset_vars,
                           site_log=site_log,
                           datasetname=datasetname, ...)


    # If using La Thuile or OzFlux dataset, convert time to Fluxnet2015 format
    if (datasetname=="LaThuile" | datasetname == "OzFlux") {

      
        if (datasetname == "LaThuile") {
          
          FluxData <- doCall(convert_LaThuile, infiles=fileinname,
                                       tcol=tcol,
                                       site_log=site_log, ...)
          
          #Add synthesised air pressure (PSurf) as missing values. Also add to tcol and
          #remove from failed vars
          if (add_psurf) {
            FluxData         <- cbind(FluxData, PSurf_synth=rep(NA, nrow(FluxData)))
            tcol$names       <- append(tcol$names, "PSurf_synth")
            tcol$failed_vars <- tcol$failed_vars[-which(tcol$failed_vars== "PSurf_synth")]
          }
          
          
        } else if (datasetname == "OzFlux") {
          
          FluxData <- convert_OzFlux(infile=fileinname, tcol=tcol)
          
        }
      
        # Rename time vars and column names to match new structure
        time_vars <- c("TIMESTAMP_START", "TIMESTAMP_END")

        tcol$time_names <- time_vars
        tcol$all_names  <- c(tcol$time_names, tcol$names)

        colnames(FluxData) <- c(time_vars, colnames(FluxData)[(length(time_vars)+1):ncol(FluxData)])
        
    # Else assume Fluxnet2015 format
    } else {
      
      # Read flux tower data (skips unwanted columns):
      FluxData <- read.csv(file=fileinname, header=TRUE,	colClasses=tcol$classes)
      
    }


    # Sanity check, does variable order in file match that specified in tcols?
    # Ignore any possible duplicates in tcol$all_names before check
    if (any(colnames(FluxData) != unique(tcol$all_names))) {
        error <- paste("Check variable ordering, variables don't match data",
                       "retrieved from file [ function:", match.call()[[1]], "]")
        stop_and_log(error, site_log)
        return(site_log)
    }


    # Split time variables from other variables
    # Extract time stamp data
    time_ind <- sapply(time_vars, function(x) which(colnames(FluxData)==x))
    FluxTime <- FluxData[,time_ind]
    # Remove time stamp variables from Data variable
    FluxData <- FluxData[,-time_ind]


    # Set all missing values to NA
    FluxData[FluxData==Sprd_MissingVal] <- NA


    # Duplicate Fluxnet data column if the same variable needs to be
    # processed several times (e.g. RH converted to RH and Qair)
    if (ncol(FluxData) != length(tcol$names))
    {

        FluxData <- duplicate_columns(data=FluxData, vars=tcol$names)

        # Make sure FluxData now has correct no. of columns
        if(ncol(FluxData) != length(tcol$names)){
            error <- paste("Duplicate variable names exist but columns could",
                           "not be be duplicated correctly [ function:",
                           match.call()[[1]], "]")
            stop_and_log(error, site_log)
            return(site_log)
        }

    }

    # Retrieve original and target units for variables present:
    units <- retrieve_units(vars_present=tcol$names,
                            all_vars=vars)

    # Retrieve acceptable variable ranges:
    var_ranges <- retrieve_ranges(vars_present=tcol$names,
                                  all_vars=vars)

    # Retrieve variable attributes (original Fluxnet name, long name and CF name):
    attributes <- retrieve_atts(vars_present=tcol$names,
                                all_vars=vars)

    # Retrieve variable categories (met/eval):
    categories <- retrieve_varinfo(vars_present=tcol$names,
                                   all_vars=vars, attribute="Category")

    # Retrieve names of ERAinterim variables
    era_vars <- retrieve_varinfo(vars_present=tcol$names,
                                 all_vars=vars, attribute="ERAinterim_variable")

    # Retrieve output variable names
    out_vars <- retrieve_varinfo(vars_present=tcol$names,
                                 all_vars=vars, attribute="Output_variable")

    # Retrieve output variable names
    ess_met <- retrieve_varinfo(vars_present=tcol$names,
                                all_vars=vars, attribute="Essential_met")

    # Retrieve output variable names
    pref_eval <- retrieve_varinfo(vars_present=tcol$names,
                                  all_vars=vars, attribute="Preferred_eval")

    # Retrieve aggregation method
    aggr_method <- retrieve_varinfo(vars_present=tcol$names,
                                    all_vars=vars, attribute="Aggregate_method")


    ###### Get time step and date information #######

    # Note number of time steps in data:
    ntsteps <- nrow(FluxTime)

    if (!(ntsteps>=12 && ntsteps < 1e9)) {
        error <- paste('Unable to determine number of time steps in:',
                       stripFilename(fileinname))
        stop_and_log(error, site_log)
        return(site_log)
    }

    # and time step size (convert to date string)
    start <- strptime(FluxTime$TIMESTAMP_START[1], "%Y%m%d%H%M")
    end   <- strptime(FluxTime$TIMESTAMP_END[1], "%Y%m%d%H%M")

    timestepsize <- as.numeric(end) - as.numeric(start)

    if ( !(timestepsize>=300 && timestepsize<=86400) ) {
        error <- paste("Time step size must be between",
                       "300 and 86400 seconds. Time step size",
                       timestepsize, "found in file")
        stop_and_log(error, site_log)
        return(site_log)
    }

    # Time steps in a day and number of days in data set
    tstepinday <- 86400/timestepsize
    ndays      <- ntsteps/tstepinday


    # Find starting date / time:
    starttime <- findStartTime(start=start)

    intyears  <- Yeardays(starttime$syear,ndays)

    # Create list for function exit:
    filedata <- list(data=FluxData, vars=tcol$names, era_vars=era_vars,
                     attributes=attributes, out_vars=out_vars,
                     essential_met=ess_met, preferred_eval=pref_eval,
                     units=units, var_ranges=var_ranges, categories=categories,
                     aggr_method=aggr_method, time=FluxTime, ntsteps=ntsteps,
                     starttime=starttime, timestepsize=timestepsize,
                     daysPerYr=intyears$daysperyear,
                     ndays=ndays, whole=intyears$whole)

    return(filedata)

}

#' Reads ERA data and extracts time steps corresponding to obs
#'
#' @param ERA_file ERA5 file
#' @param datain input data file
#'
#' @return extracted values
#' @export
 
read_era <- function(ERA_file, datain) {

    # read data
    era_data <- read.csv(ERA_file, header=TRUE, colClasses=c("character", "character",
                                                             rep("numeric", 7)))

    # ERAinterim data provided for 1989-2014, need to extract common years with flux obs
    # Find start and end
    obs_start <- datain$time$TIMESTAMP_START
    start_era <- which(era_data$TIMESTAMP_START == obs_start[1])
    end_era   <- which(era_data$TIMESTAMP_START == obs_start[length(obs_start)])

    # Extract correct time steps
    era_data  <- era_data[start_era:end_era,]

    return(era_data)

}

#' Converts La Thuile files to FLUXNET2015 format
#'
#' @param infiles input file
#' @param fair_usage fair use?
#' @param fair_usage_vec fair use?
#' @param min_yrs min year
#' @param tcol tcol?
#' @param add_psurf psurf?
#' @param site_log log file
#' @param site_code site code 
#'
#' @import R.utils
#' 
#' @return converted file in FLUXNET format
#' @export

convert_LaThuile <- function(
    infiles, fair_usage=NA, fair_usage_vec=NA,
    min_yrs, tcol, add_psurf, site_log, site_code) {

    ### Find files to process ###

    # Find all available data years
    # TODO: could be improved with package string_r

    #get years
    all_years <- regmatches(basename(infiles), regexpr("[0-9]{4}", basename(infiles)))

    # Find Fair Use years if applicable
    if (!is.na(fair_usage)) {

        # Find indices for years that comply with fair use policy
        fair_ind <- unlist(sapply(fair_usage, function(x) which(fair_usage_vec==x)))

        # Extract years
        fair_use_years <- names(fair_usage_vec)[fair_ind]

        # Find years that are fair use and have files for
        years <- as.numeric(intersect(all_years,fair_use_years))

    } else {

        warning("Not using Fair Use policy to extract years")

        # Find years that are fair use and have files for
        years <- as.numeric(all_years)

    }


    # Check what years are consecutive
    consec <- seqToIntervals(years)

    # Find and remove consecutive time periods that are shorter than min_yrs
    rm_ind <- apply(consec, MARGIN=1, function(x) length(x[1]:x[2]) < min_yrs)

    if (any(rm_ind)) consec <- consec[-which(rm_ind), ]


    # If no periods to process, return error
    if (nrow(consec)==0) {

        error <- paste("No years to process,",
                       "available time period too short.")
        stop_and_log(error, site_log)
        return(site_log)
    }


    ### Read data for each year ###

    # Initialise (this should probably be written differently,
    # so data frame is pre-allocated properly)
    data <- vector()

    ### If all years consecutive
    if (nrow(consec)==1) {

        # Loop through years
        for(y in consec[1,1]:consec[1,2]){

            data <- rbind(data, read.csv(infiles[grepl(y, infiles)],
                                         header=TRUE,
                                         colClasses=tcol$classes))
        }

        ### Some years non-consecutive
    } else {

        # Time period from min to max
        tperiod <- seq(min(consec), max(consec), by=1)

        # Available years
        avail_yrs <- apply(consec, MARGIN=1, function(x) x[1]:x[2])
        avail_yrs <- sort(unlist(avail_yrs))

        # Find years missing in the middle
        missing_yrs <- setdiff(tperiod, avail_yrs)


        # Loop through years
        # If year available, read file. Else create
        # an empty year with correct no. of time steps

        # First, remove path from file names. This is in case the file path
        # contains the same string as the year(s). This could lead to wrong file
        # being read
        infile_short <- sapply(infiles, function(x) strsplit(x, site_code)[[1]][2])

        for (y in 1:length(tperiod)) {

            # Year available
            if (any(avail_yrs==tperiod[y])) {

                # Find file corresponding to year
                file_to_read <- which(grepl(tperiod[y], infile_short))

                data <- rbind(data, read.csv(infiles[file_to_read],
                                             header=TRUE,
                                             colClasses=tcol$classes))

                # Determine number of time steps per day
                if(y==1) tstep_per_day <- (24*60) / (data$Time[1] * 60)

            } else {

                # Create time information
                time_vec <- create_dummy_year(year=tperiod[y], tstep=tstep_per_day, time=tcol$time_names)

                # Set other variables to missing value (set colnames to row-binding works)
                dummy_mat <- matrix(data=Sprd_MissingVal, nrow=nrow(time_vec), ncol=ncol(data)-length(tcol$time_names))
                colnames(dummy_mat) <- colnames(data)[(length(tcol$time_names)+1):ncol(data)]

                # Append to data frame
                data <- rbind(data, cbind(time_vec, dummy_mat))

            }
        } # years

    } # consec/non-consec


    #### Convert time stamps to Fluxnet2015 format ####

    # Define timestep size. Maybe already done, in which case can pass it to function?
    tstepsize <- (data$Time[2] * 60) - (data$Time[1] * 60)

    # Find which columns have time info
    time_cols <- sapply(tcol$time_names, function(x) which(colnames(data)==x))

    # Convert time
    new_time <- t(apply(data[,time_cols], MARGIN=1, function(x) convert_LaThuile_time(x, tstepsize=tstepsize)))


    # Check that time and data matrices have same dimensions
    if (nrow(new_time) != nrow(data)) {
        stop("Problem appending La Thuile data into a Fluxnet2015 format")
    }

    
    # Append new time and data
    converted_data <- cbind(new_time, data[,-time_cols])

    return(converted_data)

}

#' Converts OzFlux files to FLUXNET2015 format
#'
#' @param infile input file
#' @param tcol tcol??
#'
#' @import ncdf4
#'
#' @return input file
#' @export
#' 
convert_OzFlux <- function(infile, tcol) {
  
  #Open file handle
  nc <- nc_open(infile)
  
  #Get data variables
  data_vars <- as.data.frame(lapply(tcol$names, function(var) ncvar_get(nc, var)))
  
  #Set column names
  colnames(data_vars) <- tcol$names
  
  #Get time stamps (this is the END time; pers. comm. Peter Isaac)
  time_var <- ncvar_get(nc, tcol$time_names)
  
  
  ### Convert time stamps to FLUXNET2015 format ###
  
  #Get time origin (should be of format "days since 1800-01-01 00:00:00.0")
  time_origin <- strsplit(ncatt_get(nc, tcol$time_names)$units,
                          "days since ")[[1]][2]

    
  #Convert decimal days to date-time (use GMT as time zone to avoid time offsets)
  end_time_dates <- as.POSIXct(time_var * 24*60*60,  origin=time_origin, tz="GMT")
  
  #Calculate start time
  start_time_dates <- end_time_dates - (end_time_dates[2]-end_time_dates[1])
  
  #Then convert to FLUXNET2015 format
  new_start_time <- format(start_time_dates, "%Y%m%d%H%M")
  new_end_time   <- format(end_time_dates, "%Y%m%d%H%M")
  
  #Append start and end time
  new_time <- cbind(new_start_time, new_end_time)
  
  #Append new time and data
  converted_data <- cbind(new_time, data_vars)
  
    
  #Close file
  nc_close(nc)

  return(converted_data)

}
aukkola/FLUXNET2015_processing documentation built on July 4, 2023, 12:02 p.m.