analysis/scripts/routine_package.R

# Create the routine chemistry dataset package.

# run this script from the project root directory with
# Rscript analysis/routine_package.R <out directory>

# produces file analysis/out/routine_chemistry_v$(PKGVERS).zip

library(atmoschem.process)
library(magrittr)
library(DBI)
library(RSQLite)

out_dir = commandArgs(trailingOnly = TRUE)[1]
dir.create(out_dir, showWarnings = FALSE, recursive = TRUE)
end_time = Sys.getenv('processing_end')
old_processed_dir = file.path('analysis', 'intermediate')
config = read_csv_dir('analysis/config')

fmt_decimals = function(x, k) format(round(x, k), trim = TRUE, nsmall = k)

# combine site/data source hourly files into a single site hourly file, using
# column names from report_columns
get_site_df = function(site) {
  site_sources = config$dataloggers$name[config$dataloggers$site == site]
  out = data.frame(time = numeric())
  query = paste0("select * from measurements where time<'", end_time, "'")
  for (s in site_sources) {
    dbpath = file.path('analysis', 'intermediate',
                       paste0('hourly_', site, '_', s, '.sqlite'))
    db = dbConnect(SQLite(), dbpath)
    meas = dbGetQuery(db, query, check.names = F)
    dbDisconnect(db)
    meas$time = as.POSIXct(meas$time, tz = 'EST')
    # update out times
    out = merge(out, meas[, 'time', drop = F], all = T)
    # get only the parameters included in the dataset
    s_cols = config$report_columns[config$report_columns$site == site, ] %>%
      subset(data_source == s)
    mtypes = config$channels[config$channels$site == site &
                             config$channels$data_source == s, ]
    params = c('time', s_cols$measurement)
    meas = meas[, sub('^value\\.|^flag\\.', '', names(meas)) %in% params]
    # format the decimals for each parameter, based on the channels file
    for (p in params[-1]) {
      report_decimals = mtypes$report_decimals[mtypes$name == p]
      if (!is.na(report_decimals)) {
        param_col = paste0('value.', p)
        meas[, param_col] = fmt_decimals(meas[, param_col], report_decimals)
      }
    }
    for (i in seq_len(nrow(s_cols))) {
      # replace unwanted values with NA, so that the merging works correctly
      # later
      param = s_cols$measurement[i]
      # add units to vals column names
      units = mtypes$units[match(param, mtypes$name)]
      column = s_cols$column[i]
      val_col = paste0(column, ' (', units, ')')
      flag_col = paste0(column, ' (flag)')
      if (!val_col %in% names(out)) out[, val_col] = NA
      if (!flag_col %in% names(out)) out[, flag_col] = NA
      times = s_cols$times[i]
      stime = as.POSIXct(gsub('^[[(]|,.*$', '', times), tz = 'EST')
      etime = as.POSIXct(gsub('^.*, ?|[])]$', '', times), tz = 'EST')
      param_times = meas$time[meas$time >= stime & meas$time < etime]
      out[match(param_times, out$time), c(val_col, flag_col)] =
        meas[match(param_times, meas$time), paste0(c('value.', 'flag.'), param)]
    }
  }
  names(out)[1] = 'Time (EST)'
  out
}

for (site in config$sites$abbreviation) {
  message('Organizing ', site, ' data')
  csv_file = paste0('old_', site, '.csv')
  old_processed_file = file.path(old_processed_dir, csv_file)
  out_file = file.path(out_dir, paste0('hourly_', site, '.csv'))

  if (site != 'QC') {
    # reprocessing all QC data, so don't care about the old QC files
    oldp = read.csv(old_processed_file, na.strings = c('NA', '-999'),
                    check.names = FALSE)
    oldp$`Time (EST)` = as.POSIXct(oldp$`Time (EST)`, tz = 'EST')
    names(oldp) = gsub('\\(AQS_flag\\)', '\\(AQS flag\\)', names(oldp))
  }

  newp = get_site_df(site)
  # make sure flag column names are all formatted consistently
  names(newp) = gsub('\\(NARSTO\\)', '\\(flag\\)', names(newp))
  names(newp) = gsub('\\(AQS\\)', '\\(AQS flag\\)', names(newp))

  # fix a silly inconsistency in the lodge data
  if (site == 'WFMB') {
    names(newp) = gsub('Precip \\(mm since 00Z\\)', 'Precip since 00Z \\(mm\\)', names(newp))
    names(newp) = gsub('Precip \\(flag\\)', 'Precip since 00Z \\(flag\\)', names(newp))
  }

  if (site != 'QC') {
    # make sure datasets have the same columns
    all_cols = union(names(newp), names(oldp))
    oldp[setdiff(names(newp), names(oldp))] = NA
    newp[setdiff(names(oldp), names(newp))] = NA
  } else {
    all_cols = names(newp)
  }

  # put the columns in the correct ordering
  cols = data.frame(name = all_cols)
  cols$time = cols$name == 'Time (EST)'
  cols$param = sub(' \\([^(]*\\)$', '', cols$name)
  cols$flag = grepl('flag\\)$', cols$name)
  cols$aqs_flag = grepl('AQS flag\\)$', cols$name)
  cols$order = with(cols, order(!time, param, flag, aqs_flag))
  cols = cols[cols$order, ]
  newp = newp[, cols$name]

  if (site == 'QC') {
    pout = newp
  } else {
    oldp = oldp[, cols$name]
    pout = rbind(oldp, newp)
  }

  # somehow the old PSP data has a couple of duplicate times
  pout = pout[!duplicated(pout$`Time (EST)`), ]

  # add missing hours
  hours = seq.POSIXt(min(pout$`Time (EST)`), max(pout$`Time (EST)`), by = 'hour')
  pout = merge(data.frame('Time (EST)' = hours, check.names = F), pout,
               all.x = T)

  # replace NA flags with M1
  flag_cols = with(cols, which(flag & !aqs_flag))
  for (i in flag_cols) pout[is.na(pout[, i]), i] = 'M1'

  # replace M1/M2 data with NA
  for (i in flag_cols) pout[startsWith(pout[, i], 'M'), i - 1] = NA

  pout$`Time (EST)` = format(pout$`Time (EST)`, format = '%Y-%m-%d %H:%M')
  write.csv(pout, file = out_file, na = '', row.names = FALSE)
}

# Supplementary data

## site info
write.csv(config$sites, file = file.path(out_dir, 'sites.csv'), na = '',
          row.names = FALSE)

## instrument info
# match column measurements to measurement instruments
measurement_insts = merge(config$channel_instruments, config$instruments,
                          by.x = c('site', 'instrument'),
                          by.y = c('site', 'name'))
column_insts = atmoschem.process:::merge_timerange(config$report_columns,
                                                   measurement_insts, 'times')
instr_cols = c('site', 'column', 'times', 'brand', 'model', 'serial_number')
instruments = column_insts[, instr_cols]
instr_path = file.path(out_dir, 'instruments.csv')
write.csv(instruments, file = instr_path, na = '', row.names = FALSE)
write.csv(narsto_flags[, 1:2], file = file.path(out_dir, 'flags.csv'),
          row.names = FALSE)
# readme
file.copy(file.path('analysis', 'docs', 'routine.md'),
          file.path(out_dir, 'README.txt'), overwrite = T)

# zip
# paths in the zipped file are determined by the working directory
setwd(dirname(out_dir))
routine_dir = basename(out_dir)
zip(paste0(routine_dir, '.zip'), routine_dir)
ASRCsoft/atmoschem.datasets documentation built on Feb. 15, 2023, 9:26 a.m.