# load calibration data into SQLite
# run like so:
# Rscript analysis/load_calibration.R <site>
# produces file analysis/intermediate/cals_<site>.sqlite
library(atmoschem.process)
library(magrittr)
suppressPackageStartupMessages(library(lubridate))
library(DBI)
library(RSQLite)
site = commandArgs(trailingOnly = T)[1]
raw_folder = paste0('raw_data_v', Sys.getenv('raw_version'))
config = read_csv_dir('analysis/config')
is_psp_42C_cal = function(f)
startsWith(basename(f), 'Pinnacle_42C')
is_psp_API300EU_cal = function(f)
startsWith(basename(f), 'Pinnacle_API300EU_CO_Weekly')
is_psp_ASRC_TEI42i_Y_NOy_cal = function(f)
startsWith(basename(f), 'Pinnacle_ASRC_TEI42i_Y_NOy_146i_Weekly') |
startsWith(basename(f), 'Pinnacle_ASRC_TEI42i_Y_NOy_146i_WEEKLY') |
startsWith(basename(f), 'Pinnacle_ASRC_TEI42i_Y_NOy_T700_Weekly')
is_psp_DEC_TEI42i_NOy_cal = function(f)
startsWith(basename(f), 'Pinnacle DEC TEI42i NOy Weekly') |
startsWith(basename(f), 'Pinnacle_DEC_TEI42i_NOy_Weekly')
is_psp_TEI43i_SO2_cal = function(f)
startsWith(basename(f), 'Pinnacle_TEI43i_SO2_Weekly') |
startsWith(basename(f), 'Pinnacle_TEI43i_SO2_146i_Weekly')
is_psp_TEI49i_O3_49i_cal = function(f)
startsWith(basename(f), 'Pinnacle_TEI49i_O3_49i_Weekly') |
startsWith(basename(f), 'Pinnacle_TEI49i_O3_Weekly')
transform_psp_calibrations = function(f) {
# figure out which function to use for a Pinnacle calibration file
if (is_psp_42C_cal(f)) {
transform_psp_42C_calibrations(f)
} else if (is_psp_API300EU_cal(f)) {
transform_psp_API300EU_calibrations(f)
} else if (is_psp_ASRC_TEI42i_Y_NOy_cal(f)) {
transform_psp_ASRC_TEI42i_Y_NOy_calibrations(f)
} else if (is_psp_DEC_TEI42i_NOy_cal(f)) {
transform_psp_DEC_TEI42i_NOy_calibrations(f)
} else if (is_psp_TEI43i_SO2_cal(f)) {
transform_psp_TEI43i_SO2_calibrations(f)
} else if (is_psp_TEI49i_O3_49i_cal(f)) {
transform_psp_TEI49i_O3_49i_calibrations(f)
} else {
warning(paste('Transform not implemented for', f))
NULL
}
}
# decide what to do with a calibration file using its site and instrument
transform_calibration = function(f, site, ds) {
if (site == 'PSP') {
transform_psp_calibrations(f)
} else if (site == 'WFMS' && ds == 'Teledyne_300EU') {
transform_wfms_300EU(f)
} else if (site == 'WFMS' && ds == 'Thermo_42C') {
transform_wfms_42C(f)
} else if (site == 'WFMS' && ds == 'Thermo_42Cs') {
transform_wfms_42Cs(f)
} else if (site == 'WFMS' && ds == 'Thermo_43C') {
transform_wfms_43C(f)
} else if (site == 'WFMB' && ds == 'Thermo_42i') {
transform_wfml_42i(f)
} else if (site == 'WFMB' && ds == 'Thermo_48C') {
transform_wfml_48C(f)
}
}
# calibration-related functions
min_ma = function(x, k) min(suppressWarnings(runmed(x, k, 'constant')), na.rm = TRUE)
max_ma = function(x, k) max(suppressWarnings(runmed(x, k, 'constant')), na.rm = TRUE)
# add time around a character clock time interval
pad_time_interval = function(interval, start, end) {
stime = as.POSIXct(gsub('^[[(]|,.*$', '', interval), tz = 'EST',
format = '%H:%M')
etime = as.POSIXct(gsub('^.*, ?|[])]$', '', interval), tz = 'EST',
format = '%H:%M')
stime_min = as.POSIXct('00:00', tz = 'EST', format = '%H:%M')
etime_max = as.POSIXct('23:59', tz = 'EST', format = '%H:%M')
stime = max(stime - start, stime_min)
etime = min(etime + end, etime_max)
c(stime, etime) %>%
strftime(format = '%H:%M', tz = 'EST') %>%
paste(collapse = ',') %>%
paste0('[', ., ')')
}
# get a set of calibration results for a scheduled autocal
get_cal_results = function(dt_int, t_int, p, d, agg_f) {
stime = gsub('^[[(]|,.*$', '', t_int)
etime = gsub('^.*, ?|[])]$', '', t_int)
dbpath = file.path('analysis', 'intermediate',
paste0('raw_', site, '_', d, '.sqlite'))
db = dbConnect(SQLite(), dbpath)
sql_str = '
select time,
? as value
from measurements
where time>=?
and time<=?
and substr(time, 12, 5)>=?
and substr(time, 12, 5)<=?
order by time asc'
valname = dbQuoteIdentifier(db, paste0('value.', p))
sql = sqlInterpolate(db, sql_str, valname,
format(int_start(dt_int), tz = 'EST'),
format(int_end(dt_int), tz = 'EST'), stime, etime)
meas = try(dbGetQuery(db, sql))
dbDisconnect(db)
meas$time = as.POSIXct(meas$time, tz = 'EST')
meas
if (nrow(meas) == 0) {
warning('No measurements found for ', p, ' / ', d)
return(data.frame())
}
meas %>%
transform(date = as.Date(time, tz = 'EST')) %>%
aggregate(value ~ date, FUN = agg_f, data = .)
}
# Find the PSP conversion efficiency check time period using measurements from
# the overall calibration check period (meas) and the recorded NO2 "instrument
# response" value from the cal sheet (no2val)
find_42C_ce = function(meas, no2val) {
# get the calibrator info
calibrators = config$psp_no2_calibrators
calibrator = calibrators[findInterval(meas$time[1], as.POSIXct(calibrators$start_time, tz = 'EST')), ]
candidates = meas %>%
transform(no2 = nox - no) %>%
# split measurements by discontinuities and give each group an ID
transform(dnox = nox - lag2(nox), dno2 = no2 - lag2(no2)) %>%
transform(discontinuous = abs(dnox) > 1 | abs(dno2) > 1) %>%
transform(new_group = discontinuous & !lag2(discontinuous)) %>%
transform(group = cumsum(!is.na(new_group) & new_group)) %>%
subset(!discontinuous) %>%
# remove short-lived groups
transform(stime = ave(time, group, FUN = min),
etime = ave(time, group, FUN = max)) %>%
subset(etime - stime >= as.difftime(10, units = 'mins')) %>%
# get calibration results for each group
transform(nocal = ave(no, group, FUN = function(x) min_ma(x, 5)),
noxcal = ave(nox, group, FUN = function(x) max_ma(x, 5))) %>%
subset(select = c('stime', 'etime', 'nocal', 'noxcal')) %>%
unique %>%
transform(no2cal = noxcal - nocal)
if (calibrator$model == '146i') {
candidates = candidates %>%
# no2cal, no2val are both measured NOx - measured NO, should be close
# (within 10%)
transform(pctdiff = abs(100 * no2cal / no2val - 100)) %>%
subset(pctdiff < 10)
} else if (calibrator$model == 'T700U') {
candidates = candidates %>%
# in this case no2val is certified NOx - measured NO
transform(noval = calibrator$certified_nox - no2val) %>%
# look for NO values within 10% of the implied cal sheet NO value
transform(pctdiff = abs(100 * nocal / noval - 100)) %>%
subset(pctdiff < 10)
} else {
stop('Calibrator', calibrator$model, 'is not even possible. What??')
}
if (!nrow(candidates)) return(NA)
candidates[order(candidates$pctdiff), ] %>%
head(1) %>%
with(list(start_time = stime, end_time = etime, no = nocal, nox = noxcal))
}
# get the manual cals
files = file.path('analysis', 'raw', raw_folder, site, 'calibrations', '*', '*',
'*') %>%
Sys.glob
# ignore Pinnacle audit and calibration files-- currently only care about the
# weekly check files
files = files[!grepl('audit|calibration', basename(files), ignore.case = T)]
message('Loading ', length(files), ' files into cals_', site, '.sqlite...')
mcals = data.frame(f = files) %>%
transform(ds = gsub('^.*calibrations/|/[0-9]{4}/[^/].*$', '', f)) %>%
with(mapply(transform_calibration, f = f, site = site, ds = ds,
SIMPLIFY = F)) %>%
do.call(rbind, .)
# ideally, the data source would be assigned via the config files. But for now,
# hardcoded
mcals$data_source = switch(site, WFMS = 'campbell', WFMB = 'campbell',
PSP = 'envidas')
mcals$times = as_interval(mcals$times)
mcals$measured_value = as.numeric(mcals$measured_value)
# WFMS calibrations apply to both campbell and envidas data sources, so they
# need to be duplicated
if (site == 'WFMS') {
envidas_mcals = mcals %>%
subset(int_start(times) >= as.POSIXct('2019-01-01', tz = 'EST')) %>%
transform(data_source = 'envidas') %>%
# NOx_Avg is NOX, NOy_Avg is NOY in envidas (all uppercase)
transform(measurement_name = toupper(sub('_Avg', '', measurement_name)))
# Campbell CO data is offset by 200 compared to Envidas CO
co_cal = envidas_mcals$measurement_name == 'CO'
envidas_mcals$measured_value[co_cal] =
envidas_mcals$measured_value[co_cal] - 200
mcals = rbind(mcals, envidas_mcals)
}
if (site == 'PSP') {
# find NO conversion efficiency results, which weren't recorded in the PSP cal
# sheets
# get NO and NOx from sqlite
dbpath = file.path('analysis', 'intermediate', 'raw_PSP_envidas.sqlite')
db = dbConnect(SQLite(), dbpath)
sql_str = 'select time, ? as no, ? as nox from measurements order by time asc'
sql = sqlInterpolate(db, sql_str, dbQuoteIdentifier(db, 'value.NO'),
dbQuoteIdentifier(db, 'value.NOx'))
meas = dbGetQuery(db, sql)
dbDisconnect(db)
meas$time = as.POSIXct(meas$time, tz = 'EST')
# find the 42C CE results in the raw data since they weren't recorded in the
# PSP cal sheets
ce_mcals = subset(mcals, measurement_name == 'NOx' & type == 'CE')
nox_ces = meas %>%
transform(is_cal = time %within% as.list(ce_mcals$times)) %>%
transform(new_cal = is_cal & !lag2(is_cal)) %>%
transform(cal_id = cumsum(new_cal)) %>%
subset(is_cal) %>%
# guess 42C CE times for each calibration
split(., .$cal_id) %>%
lapply(function(x) {
# find the matching interval in ce_mcals
mcal_i = ce_mcals[x$time[1] %within% ce_mcals$times, ]
guess = find_42C_ce(x, mcal_i$measured_value)
if (!is.list(guess)) {
data.frame(measurement_name = 'NO', type = 'CE',
start_time = int_start(mcal_i$times),
end_time = int_end(mcal_i$times),
provided_value = mcal_i$provided_value, measured_value = NA,
nox = NA)
} else {
data.frame(measurement_name = 'NO', type = 'CE',
start_time = guess$start_time, end_time = guess$end_time,
provided_value = mcal_i$provided_value,
measured_value = guess$no, nox = guess$nox)
}
}) %>%
do.call(rbind, .) %>%
# fix time formatting messed up by rbind
transform(start_time = as.POSIXct(start_time, tz = 'EST', origin = '1970-01-01'),
end_time = as.POSIXct(end_time, tz = 'EST', origin = '1970-01-01')) %>%
transform(times = interval(start_time, end_time)) %>%
subset(select = -c(measurement_name, start_time, end_time))
names(nox_ces)[3:4] = c('measured_value.NO', 'measured_value.NOx')
nox_ces = reshape(nox_ces, varying = 3:4, timevar = 'measurement_name',
idvar = names(nox_ces)[-(3:4)], direction = 'long')
nox_ces$corrected = F
nox_ces$data_source = 'envidas'
# add NO CE values to the other manual cals
mcals = mcals %>%
# remove the values that came from the cal sheet
subset(!(measurement_name == 'NOx' & type == 'CE')) %>%
# append the new values
rbind(nox_ces[, names(mcals)])
}
# fix up mcal formatting
mcals = mcals %>%
transform(start_time = int_start(times),
end_time = int_end(times))
mcals$times = NULL
mcals$manual = T
# get the automated cals
# first let's get a data frame of the cal times
acals0 = config$autocals %>%
subset(type %in% c('zero', 'span', 'CE')) %>%
transform(dt_int = as_interval(dates))
acals0 = acals0[acals0$site == site, ]
# WFMS autocalibrations apply to both campbell and envidas data sources, so they
# need to be duplicated
if (site == 'WFMS') {
envidas_acals = acals0 %>%
transform(data_source = 'envidas') %>%
# NOx is NOX, NOy is NOY in envidas (all uppercase)
transform(measurement_name = toupper(sub('_Avg', '', measurement_name)))
# envidas data starts in 2019
int_start(envidas_acals$dt_int) =
pmax(int_start(envidas_acals$dt_int), as.POSIXct('2019-01-01', tz = 'EST'))
acals0 = rbind(acals0, envidas_acals)
}
if (nrow(acals0)) {
message('Calculating autocal results...')
# for ongoing schedules use the current time as the end
int_end(acals0$dt_int[is.na(int_end(acals0$dt_int))]) = Sys.time()
acals_list = list()
for (i in 1:nrow(acals0)) {
acalsi = acals0[i, ]
agg_f = switch(acalsi$type, zero = function(x) min_ma(x, 5),
span = function(x) max_ma(x, 5),
CE = function(x) max_ma(x, 5), function(x) NA)
# add a few minutes on either end to account for possible clock drift
timesp3 = pad_time_interval(acalsi$times, as.difftime(3, units = 'mins'),
as.difftime(3, units = 'mins'))
# spans calibrations tend to spike at the beginning, so remove the first 15
# minutes
if (acalsi$type == 'span') {
timesp3 = pad_time_interval(timesp3, as.difftime(-15, units = 'mins'), 0)
}
c1 = with(acalsi, get_cal_results(dt_int, timesp3, measurement_name,
data_source, agg_f))
if (nrow(c1) == 0) next()
names(c1)[2] = 'measured_value'
stime = hm(gsub('^[[(]|,.*$', '', acalsi$times))
etime = hm(gsub('^.*, ?|[])]$', '', acalsi$times))
c1$start_time = as.POSIXct(as.character(c1$date), tz = 'EST') + stime
c1$end_time = as.POSIXct(as.character(c1$date), tz = 'EST') + etime
c1$date = NULL
c1$measurement_name = acalsi$measurement_name
c1$data_source = acalsi$data_source
c1$type = acalsi$type
c1$provided_value = acalsi$value
acals_list[[i]] = c1
}
acals = do.call(rbind, acals_list)
# add acals to the mcals
acals$corrected = F
acals$manual = F
all_cals = rbind(mcals, acals[, names(mcals)])
} else {
all_cals = mcals
}
# write to sqlite file
# fix time formatting for sqlite compatibility
all_cals$start_time = format(all_cals$start_time, '%Y-%m-%d %H:%M:%S', tz = 'EST')
all_cals$end_time = format(all_cals$end_time, '%Y-%m-%d %H:%M:%S', tz = 'EST')
interm_dir = file.path('analysis', 'intermediate')
dir.create(interm_dir, F, T)
dbpath = paste0('cals_', site, '.sqlite') %>%
file.path(interm_dir, .)
db = dbConnect(SQLite(), dbpath)
dbWriteTable(db, 'calibrations', all_cals, overwrite = T)
dbDisconnect(db)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.