analysis/processing_viewer/server.R

library(atmoschem.process)
library(shiny)
library(magrittr)
library(DBI)
library(RSQLite)
library(ggplot2)
library(cowplot)

# relative path to the sqlite files from the app directory
interm_dir = file.path('..', 'intermediate')
config = read_csv_dir('../config')

# get only true values
is_true = function(x) !is.na(x) & x

# get all available channels from a sqlite measurements file
.get_channels = function(f) {
  if (!file.exists(f)) return(character())
  db = dbConnect(SQLite(), f)
  on.exit(dbDisconnect(db))
  columns = dbListFields(db, 'measurements')
  columns = columns[startsWith(columns, 'value')]
  sub('value\\.', '', columns)
}

# get all available channels (including derived values) for a given datalogger
get_channels = function(site, datalogger) {
  file_suffix = paste0(site, '_', datalogger, '.sqlite')
  raw_path = file.path(interm_dir, paste0('raw_', file_suffix))
  processed_path = file.path(interm_dir, paste0('processed_', file_suffix))
  unique(c(.get_channels(raw_path), .get_channels(processed_path)))
}

## retrieve data from table based on timerange and measure
get_raw = function(s, ds, m, t1, t2) {
  dbpath = file.path(interm_dir, paste0('raw_', s, '_', ds, '.sqlite'))
  param_col = paste0('value.', m)
  param_fcol = paste0('flagged.', m)
  db = dbConnect(SQLite(), dbpath)
  on.exit(dbDisconnect(db))
  q = paste0('select time, ?, ? from measurements where time >= ? and time <= ? order by time asc')
  sql = sqlInterpolate(db, q, dbQuoteIdentifier(db, param_col),
                       dbQuoteIdentifier(db, param_fcol),
                       format(t1, tz = 'EST'), format(t2, tz = 'EST'))
  res = try(dbGetQuery(db, sql))
  if (inherits(res, 'try-error')) return(data.frame())
  res[, 1] = as.POSIXct(res[, 1], tz = 'EST')
  # booleans are stored as numbers in SQLite so they need to be converted
  res[, 3] = !is.na(res[, 3]) & as.logical(res[, 3])
  res
}

get_processed = function(s, ds, m, t1, t2) {
  dbpath = file.path(interm_dir, paste0('processed_', s, '_', ds, '.sqlite'))
  param_col = paste0('value.', m)
  param_fcol = paste0('flagged.', m)
  db = dbConnect(SQLite(), dbpath)
  on.exit(dbDisconnect(db))
  q = paste0('select time, ?, ? from measurements where time >= ? and time <= ? order by time asc')
  sql = sqlInterpolate(db, q, dbQuoteIdentifier(db, param_col),
                       dbQuoteIdentifier(db, param_fcol),
                       format(t1, tz = 'EST'), format(t2, tz = 'EST'))
  res = try(dbGetQuery(db, sql))
  if (inherits(res, 'try-error')) return(data.frame())
  res[, 1] = as.POSIXct(res[, 1], tz = 'EST')
  # booleans are stored as numbers in SQLite so they need to be converted
  res[, 3] = !is.na(res[, 3]) & as.logical(res[, 3])
  res
}

.get_cals = function(s, ds, m, type, t1, t2) {
  dbpath = file.path(interm_dir, paste0('processedcals_', s, '.sqlite'))
  db = dbConnect(SQLite(), dbpath)
  on.exit(dbDisconnect(db))
  # if end_time is missing, crudely estimate it by adding an hour to the
  # start_time
  q = "
select *
  from calibrations
 where data_source = ?
   and measurement_name = ?
   and type = ?
 order by time asc"
  sql = sqlInterpolate(db, q, ds, m, type)
  zeros = dbGetQuery(db, sql)
  zeros$time = as.POSIXct(zeros$time, tz = 'EST')
  zeros$flagged = as.logical(zeros$flagged)
  zeros$filtered = F
  zeros$label = type

  # Get the estimated (median filtered) zeros from the raw values. These
  # calculations are the same as in `drift_correct`
  m_conf = subset(config$channels, site == s & data_source == ds & name == m)
  z_breaks = zeros$time[is_true(zeros$corrected)]
  # If we keep the values before and after the time range [t1,t2], they will
  # influence ggplot2's y-axis bounds calculations, which we don't want. But we
  # also want the preceding and succeeding points so that the zero/span lines
  # can be drawn to the edges of the graph.
  zeros0 = zeros
  zeros = zeros %>%
    transform(lead_time = c(tail(time, -1), NA),
              lag_time = c(NA, head(time, -1))) %>%
    subset(ifelse(is.na(lead_time), time > t1, lead_time > t1) &
           ifelse(is.na(lag_time), time < t2, lag_time < t2))
  fzeros = zeros
  good_zeros = replace(zeros0$value, zeros0$flagged, NA)
  smooth_window = m_conf[[paste0(tolower(type), '_smooth_window')]]
  fzeros$value = estimate_cals(zeros0$time, good_zeros, smooth_window,
                               zeros$time, z_breaks)
  fzeros$filtered = if (nrow(fzeros)) T else logical()
  zeros = rbind(zeros, fzeros)

  cols = c('time', 'value', 'flagged', 'filtered', 'label')
  zeros[, cols]
}

get_cals = function(s, ds, m, t1, t2) {
  zeros = .get_cals(s, ds, m, 'zero', t1, t2)
  spans = .get_cals(s, ds, m, 'span', t1, t2)
  rbind(zeros, spans)
}

get_cal_breaks = function(s, ds, m, type, t1, t2) {
  dbpath = file.path(interm_dir, paste0('cals_', s, '.sqlite'))
  db = dbConnect(SQLite(), dbpath)
  on.exit(dbDisconnect(db))
  q = "
select end_time as time
  from calibrations
 where data_source = ?
   and measurement_name = ?
   and type = ?
   and corrected
   and end_time > ?
   and end_time < ?
 order by end_time asc"
  sql = sqlInterpolate(db, q, ds, m, type,
                       format(t1, '%Y-%m-%d %H:%M:%S', tz = 'EST'),
                       format(t2, '%Y-%m-%d %H:%M:%S', tz = 'EST'))
  res = dbGetQuery(db, sql)
  as.POSIXct(res[, 1], tz = 'EST')
}

get_ceffs = function(s, ds, m, t1, t2) {
  .get_cals(s, ds, m, 'CE', t1, t2)
}

get_hourly = function(s, ds, m, t1, t2) {
  dbpath = file.path(interm_dir, paste0('hourly_', s, '_', ds, '.sqlite'))
  param_col = paste0('value.', m)
  param_fcol = paste0('flag.', m)
  db = dbConnect(SQLite(), dbpath)
  on.exit(dbDisconnect(db))
  q = paste0('select time, ?, ? from measurements where time >= ? and time <= ? order by time asc')
  sql = sqlInterpolate(db, q, dbQuoteIdentifier(db, param_col),
                       dbQuoteIdentifier(db, param_fcol),
                       format(t1, tz = 'EST'), format(t2, tz = 'EST'))
  res = try(dbGetQuery(db, sql))
  if (inherits(res, 'try-error')) return(data.frame())
  res[, 1] = as.POSIXct(res[, 1], tz = 'EST')
  res
}

make_processing_plot = function(s, ds, m, t1, t2, plot_types, logt = F,
                                show_flagged = T) {
  if (is.null(ds) || m == '') return(NULL)
  ## get measurement info
  m_info = subset(config$channels, site == s & data_source == ds & name == m)
  if (nrow(m_info)) {
    ylabel = paste0(m, ' (', m_info$units, ')')
    has_raw = 'raw' %in% plot_types && !is_true(m_info$derived)
    has_processed = is_true(m_info$apply_processing) &
      'processed' %in% plot_types
    has_cal = is_true(m_info$has_calibration) &
      any(c('zero','span') %in% plot_types)
    has_ce = is_true(m_info$apply_ce) &
      'ce' %in% plot_types
    has_hourly = is_true(m_info$apply_processing) &
      'hourly' %in% plot_types
  } else {
    # if this channel isn't in the channel tables then only the raw data is
    # available
    ylabel = m
    has_raw = 'raw' %in% plot_types
    has_processed = F
    has_cal = F
    has_ce = F
    has_hourly = F
  }
  if (logt) ylabel = paste('Log', ylabel)

  # organize subplots
  n = 1
  plist = list()
  rel_heights = numeric()
  if (has_raw || has_processed) {
    df_list = list()
    if (has_raw && nrow(raw <- get_raw(s, ds, m, t1, t2))) {
      raw$label = 'raw'
      df_list$raw = raw
    }
    if (has_processed && nrow(processed <- get_processed(s, ds, m, t1, t2))) {
      processed$label = 'processed'
      df_list$processed = processed
    }
    if (length(df_list)) {
      meas_orig = do.call(rbind, df_list)
      names(meas_orig)[2:3] = c('value', 'flagged')
      if (logt) meas_orig$value = log(meas_orig$value)
      if (!show_flagged) meas_orig$value[meas_orig$flagged] = NA
      plist[[n]] = ggplot(meas_orig, aes(x = time, y = value, color = flagged,
                                         group = 1)) +
        geom_line(size = .2) +
        scale_color_manual(values = c('black', 'red')) +
        xlim(t1, t2) +
        facet_wrap(~ label, ncol = 1, scales = 'free_y',
                   strip.position = 'right') +
        xlab('Time (EST)') + ylab(ylabel)
      rel_heights = length(df_list)
      n = n + 1
    }
  }

  if (has_cal && nrow(cals <- get_cals(s, ds, m, t1, t2))) {
    cals$segment = 0
    breaks = as.POSIXct(numeric(), tz = 'EST', origin = '1970-01-01')
    labels = character()
    for (type in c('zero', 'span')) {
      if (type %in% plot_types) {
        zero_breaks = get_cal_breaks(s, ds, m, type, t1, t2)
        cals$segment[cals$label == type] =
          findInterval(cals$time[cals$label == type], zero_breaks)
        if (length(zero_breaks)) {
          breaks = c(breaks, zero_breaks)
          labels = c(labels, rep(type, length(zero_breaks)))
        }
      } else {
        cals = subset(cals, label != type)
      }
    }
    breaks_df = data.frame(breaks = breaks, label = labels)
    if (!show_flagged) cals$value[cals$flagged] = NA
    plist[[n]] = ggplot(cals, aes(x = time, y = value, color = flagged,
                                  group = interaction(filtered, segment))) +
      geom_point(aes(shape = filtered)) +
      geom_line(aes(linetype = filtered), color = 'black') +
      geom_vline(aes(xintercept = breaks), breaks_df, color = 'darkgray',
                 size = .3) +
      scale_color_manual(values = c('black', 'red')) +
      scale_shape_manual(values = c(19, NA)) +
      scale_linetype_manual(values = c('blank', 'solid')) +
      coord_cartesian(xlim = c(t1, t2)) +
      facet_wrap(~ label, ncol = 1, scales = 'free_y',
                 strip.position = 'right') +
      xlab('Time (EST)') + ylab('value')
    rel_heights = c(rel_heights, length(unique(cals$label)))
    n = n + 1
  }

  if (has_ce && nrow(ceffs <- get_ceffs(s, ds, m, t1, t2))) {
    if (!show_flagged) ceffs$value[ceffs$flagged] = NA
    plist[[n]] = ggplot(ceffs, aes(x = time, y = value, color = flagged,
                                   group = filtered)) +
      geom_point(aes(shape = filtered)) +
      geom_line(aes(linetype = filtered), color = 'black') +
      scale_color_manual(values = c('black', 'red')) +
      scale_shape_manual(values = c(19, NA)) +
      scale_linetype_manual(values = c('blank', 'solid')) +
      coord_cartesian(xlim = c(t1, t2)) +
      facet_wrap(~ label, ncol = 1, scales = 'free_y',
                 strip.position = 'right') +
      xlab('Time (EST)') + ylab('value')
    rel_heights = c(rel_heights, 1)
    n = n + 1
  }

  if (has_hourly && nrow(hourly <- get_hourly(s, ds, m, t1, t2))) {
    hourly$label = 'hourly'
    names(hourly)[2:3] = c('value', 'flag')
    if (logt) hourly$value = log(hourly$value)
    if (!show_flagged) hourly$value[startsWith(hourly$flag, 'M')] = NA
    plist[[n]] = ggplot(hourly, aes(x = time, y = value, color = flag, group = 1)) +
      geom_line() +
      xlim(t1, t2) +
      facet_wrap(~ label, ncol = 1, scales = 'free_y', strip.position = 'right') +
      xlab('Time (EST)') + ylab(ylabel)
    rel_heights = c(rel_heights, 1)
  }

  # remove bottom axis, except for bottom plot
  plist[seq_len(length(plist) - 1)] =
    lapply(head(plist, -1), function(x) {
      x + theme(axis.text.x = element_blank(), axis.ticks.x = element_blank(),
                axis.title.x=element_blank())
    })
  # add space for the bottom axis labels
  rel_heights = .94 * rel_heights / sum(rel_heights)
  rel_heights[length(rel_heights)] = rel_heights[length(rel_heights)] + .06
  plot_grid(plotlist = plist, align = "v", ncol = 1, rel_heights = rel_heights)
}

shinyServer(function(input, output) {
  output$data_sources = renderUI({
    site_data_sources = subset(config$dataloggers, site == input$site)
    selectInput('data_source', 'Data Source:', site_data_sources$name)
  })
  output$measurements = renderUI({
    selectInput('measurement', 'Measurement:',
                get_channels(input$site, input$data_source))
  })
  output$plots = renderPlot({
    ## to make sure the time zone is handled correctly
    date_range = as.POSIXct(as.character(input$dateRange),
                            tz = 'EST')
    make_processing_plot(input$site, input$data_source, input$measurement,
                         date_range[1], date_range[2], input$plotTypes,
                         input$log, input$showFlagged)
  },
  height = 700, res = 100)
})
ASRCsoft/atmoschem.datasets documentation built on Feb. 15, 2023, 9:26 a.m.