analysis/scripts/clean_old_routine.R

## reorganize and clean the original processed routine data files

library(atmoschem.process)
library(magrittr)

out_dir = 'analysis/intermediate'
site = commandArgs(trailingOnly = TRUE)[1]
config = read_csv_dir('analysis/config')

if (site == 'PSP') {
  # will need to read conversion efficiency values
  library(DBI)
  library(RSQLite)
}

# When deriving a value from two measurements, decide which of the two flags
# takes priority
prioritize_flags = function(f1, f2) {
  # later flags in this vector are prioritized over the earlier flags
  flags = c('V0', 'V1', 'V4', 'M1')
  # check for unexpected flags
  if (any(!na.omit(c(f1, f2)) %in% flags)) {
    stop('unprioritized flags ', setdiff(unique(c(f1, f2)), flags))
  }
  i1 = match(f1, flags)
  i2 = match(f2, flags)
  iout = pmax(i1, i2)
  flags[iout]
}

## convert a list to a named vector
map_to_dict = function(l) {
  sdf = stack(l)
  setNames(as.character(sdf$ind), sdf$values)
}

params_map = list(
    NO = 'NO_TEI',
    NO2 = 'NO2_TEI',
    NOy = 'NOy_TEI',
    `PM2.5` = 'TMC',
    Precip = 'PRECIP',
    SO2 = 'SO2(MDL0.2)',
    T = 'Temp',
    `Ultrafine PM` = 'Ultrafine',
    WD = 'RWD',
    WS = 'RWS',
    WS_Max = 'WS_MAX'
)
units_map = list(
    `AQS flag` = c('AQS', 'AQS_flag'),
    degrees = c('Az.', 'Azimuth', 'deg', 'Deg'),
    flag = c('NARSTO', 'NARSTO_flag'),
    `g/m3` = c('gm-3', 'g/m^3', 'gm^-3'),
    `in` = c('In', 'INCHES', 'Inches'),
    mbar = 'mb',
    `mm since 00Z` = c('MM Since 00Z', 'MM Since 00LT'),
    `M/m` = 'Mm-1',
    `particles/cm3` = 'particles/cm4',
    ppbv = c('ppb', 'ppbv '),
    `W/m2` = c('watts/(meter squared)', 'w/m2', 'w/m^2', 'wm-2'),
    `ug/m3` = c('ug/m3', 'ugm-3', 'ug/m³', 'μg/m3'),
    `%` = c('pct', 'pc', 'percent'),
    `degrees C` = c('deg C', 'oC', 'oc', '°C')
)
drop_list = c(
    'CH4 (ppbv)',
    'CNC',
    'H2O2 (ppbv)',
    'LWC (volts)',
    'LWC (g/m3)',
    'MWS (m/s)',
    'NMHC (ug/m3)',
    'NMHC (ppbv)',
    'PSA (cm2/m3)',
    'SO2(MDL2) (ppbv)',
    'SR_2 (W/m2)',
    'Ultrafine PM (AQS flag)',
    'WD_ELM (dir_qualifier)',
    'WS_ELM (m/s)',
    'WD_raw (degrees)',
    'WS_raw (m/s)'
)

params_dict = map_to_dict(params_map)
units_dict = map_to_dict(units_map)

standardize_col_names = function(params, units) {
  units = units %>%
    replace(., . %in% c('', 'NA'), NA) %>%
    ifelse(. %in% names(units_dict), units_dict[.], .)
  params = ifelse(params %in% names(params_dict), params_dict[params],
                  params)
  ## rename some precip columns
  params[which(units == 'mm since 00Z') + 0:1] = 'Precip since 00Z'
  units[units == 'mm since 00Z'] = 'mm'
  ## format column names
  paste0(params, ifelse(is.na(units), '', paste0(' (', units, ')')))
}

patch_wfms = function(f, df) {
  ## rename wind columns for consistency
  if (any(grepl('WD_V ', names(df)))) {
    ## WD_V is preferred over older plain WD (though both are pretty
    ## sucky before 2011)
    if (basename(f) == 'WFMS2009.csv') {
      ## combine earlier WD with WD_V
      df$`WD_V (degrees)`[df$time < '2009-09-01'] =
        df$`WD (degrees)`[df$time < '2009-09-01']
      df$`WD_V (flag)`[df$time < '2009-09-01'] =
        df$`WD (flag)`[df$time < '2009-09-01']
    }
    df[, grepl('WD ', names(df))] = NULL
    names(df) = names(df) %>%
      sub('WD_V ', 'WD ', .)
  }
  if (basename(f) < 'WFMS2011_v4.csv') {
    ## all wind speeds/directions are raw means before 2011
    names(df) = names(df) %>%
      sub('WS ', 'WS_raw ', .) %>%
      sub('WD ', 'WD_raw ', .)
  }
  # Flag very suspicious (probably wrong) CO dip with V4
  suspicious_co = df$`Time (EST)` >= as.POSIXct('1999-03-01', tz = 'EST') &
    df$`Time (EST)` < as.POSIXct('2001-05-02 05:00', tz = 'EST')
  if (any(suspicious_co)) {
    make_v4 = suspicious_co & df$`CO (flag)` != 'M1'
    df$`CO (flag)`[make_v4] = 'V4'
  }
  # Wind speed max values are not calculated correctly
  df$`WS_Max (m/s)` = NA
  df$`WS_Max (flag)` = 'M1'
  # In July 2014, something knocked the wind direction sensor off by 140
  # degrees. Brian, not yet knowing the true offset or timing of the error,
  # corrected the winds by 75 degrees, gradually phasing in the full 75 degree
  # correction over the month of July. I dug around in the data and found that
  # the error occurred abruptly at 9pm July 18th. Rich corrected the issue in
  # January 2015 and found that the true offset was 140 degrees. After the
  # sensor was corrected, Brian forgot to remove the original 75 degree
  # correction, so it mistakenly continued through to 2018 when he left and the
  # old dataset ends. Here I'm undoing Brian's adjustments and applying a more
  # accurate adjustment.
  phasein = df$`Time (EST)` >= as.POSIXct('2014-07-01', tz = 'EST') &
    df$`Time (EST)` < as.POSIXct('2014-08-01', tz = 'EST')
  if (any(phasein)) {
    phasein_hours = which(phasein)
    h_since_jul1 = difftime(df$`Time (EST)`[phasein_hours],
                            as.POSIXct('2014-07-01', tz = 'EST'),
                            units = 'hours')
    # He phased the correction in by minute. This is the best I can do to
    # correct it without looking at the minute data. If some minutes are flagged
    # or missing in an hour then this will be very slightly inaccurate.
    correction = 75 * (29.5 + as.numeric(h_since_jul1) * 60) / (60 * 24 * 31)
    df$`WD (degrees)`[phasein_hours] =
      round((df$`WD (degrees)`[phasein_hours] - correction) %% 360, 1)
    # Flag the exact hour that the event occurred
    jul18 = which(df$`Time (EST)` == as.POSIXct('2014-07-18 21:00', tz = 'EST'))
    df$`WD (degrees)`[jul18] = NA
    df$`WD (flag)`[jul18] = 'M1'
    df$`WS (m/s)`[jul18] = NA
    df$`WS (flag)`[jul18] = 'M1'
  }
  corrected75 = df$`Time (EST)` >= as.POSIXct('2014-08-01', tz = 'EST')
  if (any(corrected75)) {
    df$`WD (degrees)`[corrected75] =
      round((df$`WD (degrees)`[corrected75] - 75) %% 360, 1)
  }
  off140 = df$`Time (EST)` >= as.POSIXct('2014-07-18 22:00', tz = 'EST') &
    df$`Time (EST)` < as.POSIXct('2015-01-14 12:00', tz = 'EST')
  if (any(off140)) {
    df$`WD (degrees)`[off140] = round((df$`WD (degrees)`[off140] + 140) %% 360, 1)
  }
  df
}

patch_wfml = function(f, df) {
  ## fix incorrect NMHC units
  names(df) = sub('NMHC (ppmv)', 'NMHC (ppmC)', names(df))
  ## rename wind columns for consistency
  if (any(grepl('WD_V ', names(df)))) {
    ## WD_V is preferred over older plain WD (though both are pretty
    ## sucky before 2011)
    if (basename(f) == 'WFML2009.csv') {
      ## combine earlier WD with WD_V
      df$`WD_V (degrees)`[df$time < '2009-09-01'] =
        df$`WD (degrees)`[df$time < '2009-09-01']
      df$`WD_V (flag)`[df$time < '2009-09-01'] =
        df$`WD (flag)`[df$time < '2009-09-01']
    }
    df[, grepl('WD ', names(df))] = NULL
    names(df) = names(df) %>%
      sub('WD_V ', 'WD ', .)
  }
  if (basename(f) < 'WFML2012_v03.csv') {
    ## all wind speeds are raw means before 2012
    names(df) = names(df) %>%
      sub('WS ', 'WS_raw ', .) %>%
      sub('WD ', 'WD_raw ', .)
  }
  df
}

patch_psp = function(f, df) {
  ## fix some wind info
  if (basename(f) == 'PSP1995_v02.csv') {
    ## original (1995) WD/WS is raw
    names(df) = names(df) %>%
      sub('WS ', 'WS_raw ', .) %>%
      sub('WD ', 'WD_raw ', .)
  }
  ## RW[SD] is vector averaged and MWS is raw mean
  names(df) = names(df) %>%
    sub('RWS ', 'WS ', .) %>%
    sub('RWD ', 'WD ', .) %>%
    sub('MWS ', 'WS_raw ', .)
  # wind speeds/directions from 2009-04-14 09:00 until 2016-07-26 11:00:00 are
  # raw averages instead of vector averages
  raw = df$`Time (EST)` >= as.POSIXct('2009-04-14 09:00', tz = 'EST') &
    df$`Time (EST)` < as.POSIXct('2016-07-26 11:00:00', tz = 'EST')
  if (any(raw)) {
    df$`WD (degrees)`[raw] = NA
    df$`WD (flag)`[raw] = 'M1'
    df$`WS (m/s)`[raw] = NA
    df$`WS (flag)`[raw] = 'M1'
  }
  # Conversion efficiencies were miscalculated 2018-08 and 2018-09. See
  # https://github.com/ASRCsoft/atmoschem.process/issues/127.
  if (basename(f) == 'PSP2018_v04.csv') {
    # undo Brian's CE correction
    mon08 = df$`Time (EST)` >= '2018-08-01' & df$`Time (EST)` < '2018-09-01'
    mon09 = df$`Time (EST)` >= '2018-09-01' & df$`Time (EST)` < '2018-10-01'
    df$`NO2 (ppbv)`[mon08] = df$`NO2 (ppbv)`[mon08] * 0.922
    df$`NO2 (ppbv)`[mon09] = df$`NO2 (ppbv)`[mon09] * 0.929
    # ^ those two constants came from the excel worksheets for those months
    # get calculated CE values
    dbpath = file.path('analysis', 'intermediate', 'processedcals_PSP.sqlite')
    db = dbConnect(SQLite(), dbpath)
    sql_text = "
select *
  from calibrations
 where data_source='envidas'
   and measurement_name='NO2'
   and type='CE'
 order by time asc"
    ce_cals = dbGetQuery(db, sql_text)
    dbDisconnect(db)
    ce_cals$time = as.POSIXct(ce_cals$time, tz = 'EST')
    ce_cals$manual = as.logical(ce_cals$manual)
    # apply new CE correction
    needs_ce = mon08 | mon09
    no2_conf = config$channels %>%
      subset(site == 'PSP' & data_source == 'envidas' & name == 'NO2')
    df$`NO2 (ppbv)`[needs_ce] =
      with(df[needs_ce, ], atmoschem.process:::ceff_correct(`Time (EST)`,
                                                            `NO2 (ppbv)`,
                                                            ce_cals, no2_conf))
  }
  df
}

patch_qc = function(f, df) {
  ## fix methane with incorrect units
  if (df$`Time (EST)`[1] == '2008-01-01 00:00') {
    df$`CH4 (ppbC)` = 1000 * df$`CH4 (ug/m3)`
  }
  if ('CH4 (ug/m3)' %in% names(df)) {
    df[, 'CH4 (ug/m3)'] = NULL
  }
  df
}

make_site_specific_fixes = function(site, f, df) {
  if (site == 'WFMS') {
    patch_wfms(f, df)
  } else if (site == 'WFML') {
    patch_wfml(f, df)
  } else if (site == 'PSP') {
    patch_psp(f, df)
  } else if (site == 'QC') {
    patch_qc(f, df)
  }
}

## read processed files in the old awkward format
read_processed = function(f, index_cols = 5) {
  message('parsing ', f)
  site = basename(dirname(f))
  ## organize the header data
  headers = read.csv(f, skip = 3, nrows = 1, check.names = F,
                     stringsAsFactors = F)
  params = names(headers)
  units = as.character(headers)
  units[1:index_cols] = NA
  ## get parameters for QC columns
  for (n in 1:2) {
    ## repeat in case a column has two QC columns
    qc_params = which(params == 'QC')
    ## get the parameter name from the previous column
    params[qc_params] = params[qc_params - 1]
  }
  columns = standardize_col_names(params, units)
  ## get the data
  na_strings = c('-999', '#DIV/0!', '', 'NA', '#N/A')
  df = read.csv(f, header = F, na.strings = na_strings,
                skip = 5, fileEncoding = 'UTF-8', col.names = columns,
                check.names = F)
  ## remove last column if it's empty
  if (any(params[4:length(params)] == '')) {
    if (tail(params, 1) == '') {
      df = df[, 1:(ncol(df) - 1)]
      params = params[1:ncol(df)]
      units = units[1:ncol(df)]
      warning('Unlabeled column removed')
      if (any(params[1:length(params)] == '')) {
        warning('Unlabeled column, not removed')
      }
    } else {
      warning('Unlabeled column, not removed')
    }
  }
  ## remove empty rows
  non_empty = !is.na(df[, 1]) | !is.na(df[, 2]) | !is.na(df[, 3])
  df = df[non_empty, ]
  ## get a nice datetime
  first_date = as.character(df[1, 3])
  if (nchar(tail(strsplit(first_date, '/')[[1]], 1)) == 2) {
    date_format = '%m/%d/%y %H:%M'
  } else {
    date_format = '%m/%d/%Y %H:%M'
  }
  df$time = as.POSIXct(paste(df[, 3], df[, 4]),
                       format = date_format, tz = 'EST')
  if (any(is.na(df$time))) warning('unrecognized time values')
  ## remove some columns
  if (any(names(df) %in% drop_list)) {
    drop_params = params[names(df) %in% drop_list]
    df = df[, !params %in% drop_params]
    units = units[!params %in% drop_params]
    params = params[!params %in% drop_params]
  }
  ncols = ncol(df)
  ## make sure non-flag columns are numeric
  for (n in (index_cols + 1):(ncols - 1)) {
    if (!grepl('.*flag', units[n], ignore.case = T)) {
      if (is(df[, n], 'factor') || is(df[, n], 'character')) {
        warning('non-numeric values in ', names(df)[n])
        tmp_char = as.character(df[, n])
        old_nas = is.na(df[, n])
        df[, n] = as.numeric(tmp_char)
        new_nas = is.na(df[, n])
        print(tmp_char[head(which(new_nas & !old_nas))])
      }
    }
  }
  ## organize the index columns
  df = df[, c(ncols, (index_cols + 1):(ncols - 1))]
  ## combine column with identical params and units
  dup_cols = names(df)[grep('.*\\.1$', names(df))]
  for (dup_name in dup_cols) {
    orig_name = sub('\\.1$', '', dup_name)
    df[is.na(df[, orig_name]), orig_name] =
      df[is.na(df[, orig_name]), dup_name]
    df[, dup_name] = NULL
  }
  names(df)[1] = 'Time (EST)'
  df = make_site_specific_fixes(site, f, df)
  if (any(grepl('^WD_V', names(df))))
    warning('WD_V was not correctly replaced in', f)
  df
}

add_missing_cols = function(dfin, cols) {
  missing = cols[!cols %in% names(dfin)]
  dfin[, missing] = NA
  dfin
}

merge_dfs = function(dflist) {
  all_cols = dflist %>%
    lapply(names) %>%
    unlist %>%
    unique
  ## all_cols = unique(unlist(lapply(dflist, names)))
  dflist %>%
    lapply(function(x) add_missing_cols(x, all_cols)) %>%
    do.call(rbind, .)
  ## do.call(rbind, lapply(dflist, function(x) add_missing_cols(x, all_cols)))
}

order_columns = function(x) {
  ## reorder columns alphabetically (mostly)
  params = gsub(' \\(.*$', '', x)
  units = gsub('^.* \\((.*)\\)$|^[^()]*$', '\\1', x)
  order(params != 'Time', params, units %in% c('flag', 'AQS_flag'),
        units == 'AQS_flag')
}

write_site_file = function(site, ...) {
  if (site == 'WFMB') {
    # used to be WFML (lodge)
    site_path = file.path('analysis/raw/routine_chemistry_v0.1', 'WFML')
  } else {
    site_path = file.path('analysis/raw/routine_chemistry_v0.1', site)
  }
  files = site_path %>%
    list.files(full.names = T) %>%
    subset(., !grepl('instruments', .)) %>%
    sort
  n_files = length(files)
  dflist = lapply(files, read_processed, ...)
  finaldf = merge_dfs(dflist)
  finaldf = finaldf[, order_columns(names(finaldf))]
  # fix sea level pressure calculations, which incorrectly used inside
  # temperatures in these files
  site_info = subset(config$sites, abbreviation == site)
  site_elev = site_info$elevation
  finaldf$`SLP (mbar)` =
    with(finaldf, sea_level_pressure(`BP (mbar)`, `T (degrees C)`, site_elev))
  finaldf$`SLP (flag)` =
    with(finaldf, prioritize_flags(`BP (flag)`, `T (flag)`))
  out_path = file.path(out_dir, paste0('old_', site, '.csv'))
  write.csv(finaldf, file = out_path, row.names = F)
}


dir.create(out_dir, showWarnings = FALSE, recursive = TRUE)
## missing WFMS 1990 4th quarter?

if (site == 'PSP') {
  write_site_file('PSP', index_cols = 4)
} else {
  write_site_file(site)
}
ASRCsoft/atmoschem.datasets documentation built on Feb. 15, 2023, 9:26 a.m.