knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)

Install Required Packages

In order to run the script you need a bunch of R packages all of which are are publicly available, either on CRAN (Comprehensive R Archive Network) or on our GitHub-Account.

The script described in the following requires the package kwb.monitoring and all its dependencies. All these packages are automatically installed by running the following code. Make sure to have none of the packages to be installed already loaded.

# Install devtools, if required, from CRAN
# install.packages("devtools")

# Install kwb.monitoring from GitHub
devtools::install_github("kwb-r/kwb.monitoring", dependencies = TRUE)

Load kwb.monitoring

We are now ready to start the actual script. First, you need to load the package kwb.monitoring and all the packages that it depends on:

library(kwb.monitoring)

Configuration

We need to start with a configuration section that defines how the script will behave, depending on the monitoring station of which data is being analysed. We are using three letter codes to identify a monitoring station. Let's assume that we have two monitoring stations: ALT and NEU (taken from KWB project OGRE where they represent "Altbau" and "Neubau"). At both stations at least the water level and the flow are measured.

The code section printed in the following defines different types of settings:

Define Helper Functions

as_utc <- function(x) as.POSIXct(x, tc = "UTC")

Define Event Settings

# eventSettings ----------------------------------------------------------------
eventSettings <- list(

  #Hthresholds = c(ALT = 0.07, NEU = 0.07), 
  #Qthresholds = c(ALT = 5, NEU = 2.5),

  ALT = rbind(

    data.frame(
      tBeg = as_utc("2014-03-27 10:04:00"), 
      tEnd = as_utc("2014-04-23 17:14:59"), 
      Hthreshold = 0.07, 
      Qthreshold = 2
    ),

    data.frame(
      tBeg = as_utc("2014-04-23 17:00:00"), 
      tEnd = as_utc("2014-04-23 21:01:00"), 
      Hthreshold = 0.07, 
      Qthreshold = 4
    )

    # you may continue...
    # , data.frame(...)
  ),

  NEU = rbind(
    data.frame(
      tBeg = as_utc("2014-01-01 00:00:00"), 
      tEnd = as_utc("2014-07-09 19:00:00"), 
      Hthreshold = 0.07, 
      Qthreshold = 2.5
    ),

    data.frame(
      tBeg = as_utc("2014-07-09 19:00:01"), 
      tEnd = as_utc("2014-07-09 22:00:00"), 
      Hthreshold = 0.07, 
      Qthreshold = 5
    )

    # you may continue...
    # , data.frame(...)
  )
)

Define Regression Settings

# regressionSettings -----------------------------------------------------------
# models: which model file is to be used within which time interval
# usage: in which time intervals are the correlated values to be used
# instead of the linearly interpolated values
regressionSettings <- list(

  # Which regression models are to be used within different time intervals?
  models = list(

    # ALT = rbind(
    #   data.frame(from = "2000-01-01", 
    #              to = "2020-01-01", 
    #              modelFile = "ALT_01_regressionModel.txt")
    #   ),
    # 
    # NEU = rbind(
    #   data.frame(from = "2014-03-01", 
    #              to = "2014-10-01", 
    #              modelFile = "NEU_01_regressionModel.txt"),
    #   
    #   data.frame(from = "2014-10-02", 
    #              to = "2015-02-05", 
    #              modelFile = "NEU_02_regressionMode.txt"),
    #   
    #   data.frame(from = "2015-02-06", 
    #              to = "2015-12-31", 
    #              modelFile = "March_to_october_NEU_regressionModel.txt")
    #   )
  ),

  # In which time intervals shall the regression models actually be applied?
  usage = list(

    ALT = rbind(
      data.frame(from = "2015-01-30", to = "2015-02-01")
    ),

    NEU = rbind(
      data.frame(from = "2014-04-18", to = "2014-06-13"),
      data.frame(from = "2014-07-09", to = "2014-08-31")
    )
  )
)

Define Time Intervals to be Removed

# intervalsToRemove ------------------------------------------------------------
intervalsToRemove <- list(

  ALT = list(
    lubridate::interval(
      lubridate::ymd_hm("2014-04-03 17:38"), 
      lubridate::ymd_hm("2014-04-03 18:12")
    ),
    lubridate::interval(
      lubridate::ymd_hm("2014-04-04 12:00"), 
      lubridate::ymd_hm("2014-04-04 13:10")
    )
  ),

  NEU = list(
    lubridate::interval(
      lubridate::ymd_hm("2014-04-18 10:58"), 
      lubridate::ymd_hm("2014-04-18 21:18")
    ),
    lubridate::interval(
      lubridate::ymd_hm("2014-04-21 12:46"), 
      lubridate::ymd_hm("2014-04-21 16:30")
    )
  )
) 

Create a Testing Environment

testdir <- system.file("extdata", package = "kwb.monitoring")
stopifnot(testdir != "")
rawdir <- kwb.utils::createDirectory(file.path(testdir, "rawdata"))

Configure the Script

# Main Configuration -----------------------------------------------------------
settings <- configure(

  # Where is the "root" directory to raw data?
  rawdir = rawdir,

  # which station?
  station = "ALT", # "NEU"

  # "left", "centre" or "right" 
  sampleEventMethod = "centre",

  # either "interpolate" or "predict"
  # note the regressionSettings -> predict is only used for the intervals set 
  # in "usage", otherwise interpolate will be used
  replaceMissingQMethod = "interpolate", 

  # which bottles are to be considered (read from the sampler file)?
  bottlesToConsider = NA, # NA means: all bottles
  #bottlesToConsider = 1:4,

  # which bottles are to be discarded (because they are not full)?
  bottlesToDiscard = NA,
  #bottlesToDiscard = 7,

  # sample volume (in mL) given to the bottle representing the time 
  # interval with highest flow volume
  Vbottle = 1600, 

  # maximum total volume for mixed sample, in mL
  Vmax = 5000,

  # What are the level thresholds (in m) that trigger the start of the sampler?
  Hthresholds = c(ALT=0.055, NEU=0.05),

  # What are the level thresholds (in l/s) that can define the start of an event?
  Qthresholds = c(ALT=3, NEU=2.5), 

  # What are the minimum volumes that are to be considered in the plots?
  Vthresholds = c(ALT=50, NEU=32), 

  # time step in hydraulic data
  #tstep.s = 120, still needed?  

  tstep.fill.s = 120,

  # separation of events (how long in seconds may Hthreshold be underrun 
  # within an event?)
  evtSepTime = 4*3600, # in seconds, here: 4 hours

  # separation of sampled events within one and the same sampler file.
  # If the difference between two sample times is greater than this time 
  # (in seconds) two sampled events are distinguished. Set to NA to prevent
  # splitting of sampled events.  
  sampleEventSeparationTime = 4*3600, # NA, # 3600

  # minimum duration (in minutes) of hydraulic events to be considered
  durationThreshold = 10, # min

  # separator to be used in output files (csv)
  outsep = ";",

  # decimal character to be used in output files (csv)
  outdec = ",",

  # time interval in seconds for rain data aggregation, e.g. 600 = 10 minutes  
  # NA = no aggregation of original rain data  
  rain.aggregation.interval = 600
)

# Set the path to a path dictionary file
dictionaryFile <- system.file("extdata/pathDictionary.txt", package = "kwb.monitoring")

# Read the path dictionary. The dictionary describes paths, file names and file
# name patterns in the form of a "grammar". By using the dictioanry keywords can
# be resolved to full paths.
dictionary <- pathDictionary(
  dictionaryFile = dictionaryFile,
  RAW_DIR = settings$rawdir,
  STATION = settings$station
)

# Assign the path dictionary to the main settings. 
settings$dictionary <- dictionary

# Make the event settings and regression settings part of the main settings
settings$event = eventSettings
settings$regression = regressionSettings

# Define the path to a .RData file containing rain data
rainDataFile <- file.path(settings$rawdir, "..", "RData", "data_rain.RData")

Provide Create a Testing Environment

# Create a test directory structure for raw data
flow_dir <- getOrCreatePath("FLOW_DIR", dictionary)

last_flow_dir <- kwb.utils::createDirectory(file.path(flow_dir, "20180601"))

file <- file.path(last_flow_dir, "flows.csv")

writeLines(con = file, c(
  "NIVUS",
  "CPU32",
  "Datum",
  "Fenster min",
  "Fenster max",
  paste0("skip_", 6:8),
  "Datum\tUhrzeit\tH\tv\tQ\tT",
  "01.06.2018\t00:00:00\t0.1\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:01:00\t0.2\t0.1\t0.1\t0.1",
  "01.06.2018\t00:02:00\t0.3\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:03:00\t0.2\t0.1\t0.1\t0.1",
  "01.06.2018\t00:04:00\t0.1\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:05:00\t0.0\t0.1\t0.1\t0.1"
))

kwb.utils::createDirectory(getOrCreatePath("SAMPLE_DIR", dictionary))

User defined functions

Before we start, we need to define some helper functions. The following functions are defined:

# read_hydraulics --------------------------------------------------------------
read_hydraulics <- function(settings)
{
  flowDirectory <- getOrCreatePath("FLOW_DIR", dictionary = settings$dictionary)

  flowSubdirCurrent <- getCurrentFlowSubDirectory(flowDirectory)

  csv <- getOrCreatePath(
    "FLOW_CSV", 
    dictionary = settings$dictionary, 
    FLOW_SUBDIR_CURRENT = flowSubdirCurrent
  )

  hydraulics <- kwb.logger::readLogger_NIVUS_PCM4_2(kwb.utils::safePath(csv))

  renames <- list(
    myDateTime = "DateTime",
    Fuellstand_m = "H",
    Geschw_m_s = "v",
    Durchfluss_l_s = "Q",
    T_.C = "T"
  )

  columnNames <- as.character(renames)

  hydraulics <- kwb.utils::selectColumns(
    kwb.utils::renameColumns(hydraulics, renames), 
    columnNames
  )

  hydraulics$DateTime <- as_utc(hydraulics$DateTime)

  hydraulics
  ### data frame with columns "DateTime" (POSIXct, UTC), "H",
  ### "v", "Q", "T"
}

# getCurrentFlowSubDirectory ---------------------------------------------------
getCurrentFlowSubDirectory <- function
(
  flowDirectory
)
{
  subDirectories <- dir(flowDirectory)

  patternSubdir <- "^\\d{8}$"
  unexpected <- grep(patternSubdir, subDirectories, value=TRUE, invert=TRUE)

  if (! kwb.utils::isNullOrEmpty(unexpected)) {
    stop(
      sprintf("There are unexpected files/folders in \"%s\": %s\n%s\n",
              flowDirectory, paste(kwb.utils::hsQuoteChr(unexpected)), 
              "Only folders of type 'YYYYMMDD' expected!")
    )
  }

  currentSubdir <- sort(setdiff(subDirectories, unexpected), decreasing=TRUE)[1]

  if (is.na(currentSubdir)) {
    warning("There is no subdirectory 'YYYYMMDD' in ", flowDirectory)
  }
  else {
    cat("Most recent flow sub-directory:", currentSubdir, "\n")
  }  

  return(currentSubdir)
}

# readSamplerFile --------------------------------------------------------------
readSamplerFile <- function 
(
  samplerFile, bottlesToConsider, siteCode = NA
)  
{
  cat(sprintf("Reading sample data from \"%s\"... ", basename(samplerFile)))

  if (containsNulString(samplerFile)) {
    cat("skipped!\n")
    warning(paste(
      sprintf(
        "File \"%s\" contains null string and is skipped!",
        basename(samplerFile)),
      "Remove first two bytes of the file!"))
    sampleDataExtended <- NULL
  }
  else {    
    sampleData <- kwb.logger::readLogger_SIGMA_SD900(samplerFile)
    cat("ok.\n")
    sampleSite <- attr(sampleData, "metadata")$SITE_ID

    if (!is.na(siteCode) && sampleSite != siteCode) {
      stop(sprintf("The SITE_ID indicated in \"%s\" is not \"%s\" as expected!",
                   sampleSite, siteCode))
    }

    sampleDataExtended <- cbind(
      samplerFile = basename(samplerFile), 
      sampleData, 
      stringsAsFactors = FALSE
    )
  }

  # filter for relevant bottles
  if (!is.null(bottlesToConsider) & 
        ! all(is.na(bottlesToConsider))) {
    cat("Filtering for bottles", commaCollapsed(bottlesToConsider), "... ")
    indices <- which(sampleDataExtended$bottle %in% bottlesToConsider)
    sampleDataExtended <- sampleDataExtended[indices, ]
    cat("ok.\n")
  }

  kwb.utils::renameColumns(sampleDataExtended, list(myDateTime = "sampleTime"))
}

# rainGaugesCorrelatedWithStation ----------------------------------------------
rainGaugesCorrelatedWithStation <- function 
### selects gauges after the result of the volume correlation test
(
  station = NULL
)
{
  x <- list(
    ALT = c("ReiI", "BlnXI", "BlnIX", "BlnX"),
    NEU = c("Wit", "ReiI", "BlnIX")    
  )

  if (!is.null(station)) {
    x <- x[[station]]
  } 

  x
  ### list (one list element per monitoring site) of character vectors 
  ### representing rain gauge names
}

# processSampleEvent -----------------------------------------------------------
processSampleEvent <- function
(
  hydraulicData, 
  settings, 
  events, 
  eventsAndStat, 
  sampleEventIndex = -1, 
  to.pdf = TRUE,
  verbose = FALSE
) 
{  
  stopifnot(length(sampleEventIndex) == 1)

  # Filter for sampler event by index
  fileName <- getByPositiveOrNegativeIndex(
    elements = sort(unique(events$samplerEvents$samplerFile)), 
    index = sampleEventIndex
  ) 

  sampleInformation <- filterSampleEventsForFilename(
    events = events, 
    fileName = fileName
  )  

  samplerEvent <- sampleInformation$samplerEvents

  if (verbose) {
    printSampleInformation(sampleInformation)
  }

  saveSampleInformation(sampleInformation, settings, sampleFile=samplerEvent$samplerFile)

  # find "merged" event(s) containing the "sampler event"
  mergedEventNumber <- indicesOfEventsContainingEvent(eventsAndStat$merged, samplerEvent)

  stopifnot(length(mergedEventNumber) == 1)

  mergedEventAndStat <- eventsAndStat$merged[mergedEventNumber, ]

  # select subset of data within the sampler event
  hydraulicSubset <- hsGetEvent(hydraulicData, events = mergedEventAndStat, 1)

  if (kwb.utils::isNullOrEmpty(hydraulicSubset)) {
    warning("There is no hydraulic data for this event:\n", 
            formatEvent(mergedEventAndStat))
    return()
  }

  # generate a column containing the bottle number for each timestep  
  hydraulicSubset$bottle <- hsEventNumber(
    hydraulicSubset$DateTime, 
    events = sampleInformation$bottleEvents,
    eventNumbers=sampleInformation$bottleEvents$bottle
  )

  # generate a column containing the cumulative volume
  hydraulicSubset$Vcum_m3 <- cumsum(hydraulicSubset$Q)*settings$tstep.fill.s/1000

  # write interpolated data to files for "quality assurance"
  eventName <- sampleLogFileToSampleName(samplerEvent$samplerFile)

  writeCsvToPathFromDictionary(
    dataFrame = hydraulicSubset, 
    key = "SAMPLED_EVENT_CSV_HYDRAULICS", 
    SAMPLED_EVENT_NAME = eventName, 
    settings = settings, 
    open.directory = FALSE
  )

  # calculate sum of flows per bottle  
  volumeCompositeSample <- calculateVolumeCompositeSample(  
    hydraulicSubset, settings
  )

  writeCsvToPathFromDictionary(
    dataFrame = addSumRow(volumeCompositeSample), 
    key = "SAMPLED_EVENT_CSV_COMPOSITE", 
    SAMPLED_EVENT_NAME = eventName,
    settings = settings, 
    open.directory = FALSE
  )

  plot_sampled_event(
    hydraulicData = hydraulicData,
    settings = settings, 
    sampleInformation = sampleInformation, 
    mergedEventAndStat = mergedEventAndStat,
    volumeCompositeSample = volumeCompositeSample,
    to.pdf = to.pdf
  )  
}

Script I: Data Visualisation and Composite Sample Calculation

After having done all the configuration above you may run the main script that is printed in the following. The following steps of which some are optional, are performed:

And here comes the script:

# Read last available hydraulic data (required if station changed)
# "-10d" = last 10 days
hydraulicData.raw <- kwb.base::selectTimeInterval(
  read_hydraulics(settings), width = "-30d"
)

writeCsvToPathFromDictionary(
  hydraulicData.raw, key = "HYDRAULIC_DATA_RAW", settings, 
  open.directory = FALSE
)

# Show overview plots (complete and daily), optional
showOverview(
  hydraulicData.raw, settings, Qmax = 20, Hmax = 35, to.pdf = TRUE, 
  save.pdf = TRUE
)

# Prepare hydraulic data (fill gaps, interpolate/predict)
hydraulicData.all <- validateAndFillHydraulicData(
  hydraulicData = hydraulicData.raw, 
  settings = settings
)

# Write data to file (optional)
writeCsvToPathFromDictionary(
  hydraulicData.all, key = "HYDRAULIC_DATA_VAL", settings, 
  open.directory = FALSE
)

hydraulicData <- removeIntervals(
  hydraulicData.all, 
  intervals = intervalsToRemove[[settings$station]]
)

# Build different kinds of events (hydraulic, sample, merged), required
events <- getAllTypesOfEvents(
  hydraulicData, settings, FUN.readSamplerFile = readSamplerFile
)

# Plot overview of event limits (optional)
# plotEventOverview(events[c("hydraulic", "samplerEvents", "merged")], settings)
# 
# # Write events to files (optional)
# writeCsvToPathFromDictionary(events$samplingEvents, "SAMPLING_EVENTS", settings, 
#                              open.directory = FALSE)
# writeCsvToPathFromDictionary(events$bottleEvents, "BOTTLE_EVENTS", settings, 
#                              open.directory = FALSE)
# writeCsvToPathFromDictionary(events$samplerEvents, "SAMPLER_EVENTS", settings, 
#                              open.directory = FALSE)
# 
# # Add statistics to hydraulic events
# eventsAndStat <- addStatisticsToEvents(events, hydraulicData.all)
# 
# # Write event including statistics to file (optional)
# writeCsvToPathFromDictionary(
#   eventsAndStat$hydraulic, "HYDRAULIC_EVENTS", settings, 
#   open.directory = FALSE
# )
# 
# # Load rain data if available
# if (file.exists(rainDataFile)) {
#   load(rainDataFile)
# } else {
#   rainData <- NULL #(if you dont want to show rain data)    
# }
# 
# seriesNames <- rainGaugesCorrelatedWithStation(station = settings$station)
# 
# hydraulicEvents <- eventsAndStat$hydraulic
# 
# Vt <- settings$Vthresholds[settings$station]
# 
# hydraulicEvents <- kwb.utils::renameColumns(
#   hydraulicEvents, list(event = "eventNumber")
# )
# 
# # Select events with a minimum volume
# selected <- !is.na(hydraulicEvents$V.m3) & hydraulicEvents$V.m3 > Vt
# events.to.plot <- hydraulicEvents[selected,]
# 
# # plot hydraulic events with rain
# plot_hydraulic_events(
#   hydraulicData = hydraulicData, 
#   settings = settings, 
#   eventsAndStat = events.to.plot,
#   to.pdf = TRUE, 
#   rainData = rainData, 
#   gauges = seriesNames[1:3]
# )  
# 
# # Analyse one sampler event given by its number ("sampleEventIndex")
# # according to the order in the sample log files. By giving a negative index
# # you may choose events from the end (-1 means "last", -2 one before last,
# # etc.)
# processSampleEvent(
#   hydraulicData, settings, events, eventsAndStat, sampleEventIndex = -2, 
#   to.pdf = FALSE
# )

Script II: Finding and storing H/Q-regression models

The following script helps you to generate a regression model between water level and discharge values. Raw data are loaded into hydraulicData.raw. Data of time intervals that have been assessed as "invalid" (defined in intervalsToRemove, see above) are removed. The resulting timeseries of H and Q are displayed as well as a plot showing H over Q. You are provided a simple user interface to further manipulate the selection of values to be used to calculate a regression model.

# Find H-Q regression models ---------------------------------------------------
if (FALSE) 
{ 
  # Read last available hydraulic data (required if station changed)
  # "-10d" = last 10 days
  hydraulicData.raw <- kwb.base::selectTimeInterval(read_hydraulics(settings), width="-30d")

  #Remove bad Intervals
  hydraulicData <- removeIntervals(
    hydraulicData.raw, 
    intervals = intervalsToRemove[[settings$station]]
  )

  # run "regression finder" with a subset of the raw data
  selectIntervalsForCorrelation(kwb.base::selectTimeInterval(hydraulicData, width="-30d"), settings)

  # the model last applied is now available in the variable "regresssionState"
  saveRegressionModel(regressionState$model, settings)
}


KWB-R/kwb.monitoring documentation built on May 17, 2019, 1:06 p.m.