R/insert-measurement-results.R

Defines functions db_insert_results_m

Documented in db_insert_results_m

# input data frame: site, datetime, variable

#' Insert measurement data values to ODM2 database
#'
#' @param db database connection
#' @param datavalues data frame with columns "Timestamp" with
#'    YYYY-MM-DD H:M:S format, and column names corresponding to variable names
#' @param methodcode code for method used to collect data
#' @param variables
#' a named list of lists defining variable names, units, and columns in
#' datavalues data frame. Create with \code{\link{make_vars_list}} or see Details.
#' @param sampledmedium term from \href{http://vocabulary.odm2.org/medium}{
#' controlled vocabulary} for medium sampled eg. air or soil
#' @param processinglevel code for processing level. will be added to processinglevels table if new.
#'   defaults to "Raw data'.
#' @param actionby (optional) the person who performed the action
#' @param equipment_name (optional) the equipment used to collect the data
#' @param zlocation (optional) z location offset
#' @param zlocationunits (optional but required if zlocation is set) name of units of z location offset
#' @param ... parameters to pass to various db_describe_ functions
#' @param site_code_col name of column with site codes. defaults to "site"
#' @param time_aggregation_interval defaults to unknown
#' @param aggregationstatistic \href{http://vocabulary.odm2.org/aggregationstatistic}{
#' controlled vocabulary}, defaults to unknown
#' @param methodtypecv term from \href{http://vocabulary.odm2.org/methodtype}{
#' controlled vocabulary}, defaults to instrument deployment
#'
#' @return true if successful
#' @export
#' @family insert data functions

#' @examples
#' \dontrun{
#' # make database and data frame
#' db <- rodm2::create_sqlite(connect = TRUE)
#' mrv <- data.frame(Timestamp = "2018-06-27 13:55:00",
#' "vwc" = 68.3, site = "BB2")
#'
#' db_insert_results_m(db = db, datavalues = mrv,
#'     methodcode = "soilmoisture", site_code_col = "site",
#'     variables = list("Volumetric water content" = list("vwc"," Percent")),
#'     sampledmedium = "Soil")
#' }
#'
db_insert_results_m <- function(db,
                                datavalues,
                                methodcode,
                                site_code_col = "site",
                                variables,
                                sampledmedium,
                                processinglevel = "Raw data",
                                aggregationstatistic = "Unknown",
                                time_aggregation_interval = list(1, "Minute"),
                                methodtypecv = 'Instrument deployment',
                                actionby = NULL,
                                equipment_name = NULL,
                                zlocation = NULL,
                                zlocationunits = NULL, ...){

  if (!class(db) %in% c("SQLiteConnection", "PostgreSQLConnection")) {
    stop("sorry, only sqlite and postgres database connections are supported so far")}

  # # check that all variables are in variables table
  # vars_to_add <- setdiff(names(variables), rodm2::db_get_variables(db)[[1]])
  # for(newvar in vars_to_add){
  #   rodm2::db_describe_variable(db, "Unknown", newvar, newvar)
  # }

  # check type of database object
  if (class(db) == "SQLiteConnection"){

    # sampled medium in CV
    sampledmedium <- check_medium_cv()
    # methodtype in CV
    methodtypecv <- check_methodtype_cv(methodcode)
    # Potential New method handling
    methodcode <- handle_new_method(methodcode, methodtypecv)
    # Potential New site handling
    site_code <- handle_new_site(site_code)

    sql <- "INSERT or IGNORE into processinglevels (processinglevelcode) VALUES (:processinglevel)"
    sql <- RSQLite::dbSendQuery(db, sql)
    RSQLite::dbBind(sql, params = list(processinglevel = processinglevel))
    RSQLite::dbClearResult(res = sql)

    # check that all variables are in variables table
    for(newvar in names(variables)){
      sql <- "INSERT OR IGNORE into variables
      (variabletypecv, variablecode, variablenamecv, nodatavalue)
      VALUES
      (:variabletypecv, :variablecode, :variablenamecv, :nodatavalue)"
      sql <- RSQLite::dbSendQuery(db, sql)
      RSQLite::dbBind(sql, params = list(variabletypecv = "Unknown",
                                         variablecode = newvar,
                                         variablenamecv = newvar,
                                         nodatavalue = '-9999'))
      RSQLite::dbClearResult(res = sql)

      if(!is.null(variables[[newvar]]$nodatavalue)){
        sql <- RSQLite::dbSendQuery(db,
                                    "UPDATE variables SET nodatavalue = :nodatavalue
                                    WHERE variablecode = :variablecode")
        RSQLite::dbBind(sql, params = list(nodatavalue = variables[[newvar]]$nodatavalue,
                                           variablecode = newvar))
        RSQLite::dbClearResult(res = sql)
      }
      if(!is.null(variables[[newvar]]$variabletypecv)){
        sql <- RSQLite::dbSendQuery(db,
                                    "UPDATE variables SET variabletypecv = :variabletypecv
                                    WHERE variablecode = :variablecode")
        RSQLite::dbBind(sql, params = list(variabletypecv = variables[[newvar]]$variabletypecv,
                                           variablecode = newvar))
        RSQLite::dbClearResult(res = sql)
      }
      if(!is.null(variables[[newvar]]$variabledefinition)){
        sql <- RSQLite::dbSendQuery(db,
                                    "UPDATE variables SET variabledefinition = :variabledefinition
                                    WHERE variablecode = :variablecode")
        RSQLite::dbBind(sql, params = list(variabledefinition = variables[[newvar]]$variabledefinition,
                                           variablecode = newvar))
        RSQLite::dbClearResult(res = sql)
      }
      message(paste(newvar, "has been added to the Variables table."))
    }

    #######################################
    # for each ROW of MRV data frame

    db_insert_one_mrv <- function(mrv_id){
      sql1 <- RSQLite::dbSendQuery(db, 'INSERT into actions
                                   (actiontypecv, methodid, begindatetime, begindatetimeutcoffset)
                                   VALUES
                                   ("Instrument deployment",
                                   (SELECT methodid from methods WHERE methodcode = :method),
                                   DATETIME(:begindatetime), :begindatetimeutcoffset)')

      RSQLite::dbBind(sql1, params = list(method = methodcode,
                                          begindatetime = format(as.POSIXct(
                                            datavalues[["Timestamp"]][mrv_id]), "%Y-%m-%d %H:%M:%S"),
                                          begindatetimeutcoffset = as.integer(substr(
                                            format(as.POSIXct(
                                              datavalues[["Timestamp"]][mrv_id]), "%z"),
                                            1,3))))
      RSQLite::dbClearResult(res = sql1)
      newactionid <- as.integer(RSQLite::dbGetQuery(db, "SELECT LAST_INSERT_ROWID()"))

      ##################
      if(!is.null(actionby)){
        if(!(actionby %in% rodm2::db_get_people(db))){
          rodm2::db_describe_person(db, PersonFirstName = actionby, ...)
        }
        sql2 <- RSQLite::dbSendStatement(db, 'INSERT into actionby
                                         (actionid, affiliationid, isactionlead)
                                         VALUES
                                         (:newactionid,
                                         (SELECT affiliationid FROM affiliations
                                         WHERE personid = (SELECT personid FROM people
                                         WHERE personfirstname = :actionby)),
                                         "TRUE")')
        RSQLite::dbBind(sql2, params = list(newactionid = newactionid,
                                            actionby = actionby))
        RSQLite::dbClearResult(res = sql2)
      }
      ###########################
      if(!is.null(equipment_name)){
        # db describe equipment function
        if(!(equipment_name %in% rodm2::db_get_equipment(db))){
          rodm2::db_describe_equipment(db, equip_name = equipment_name, ...)
        }
        # add to equipment used
        sql2b <- RSQLite::dbSendStatement(db, 'INSERT into equipmentused (actionid, equipmentid)
                                          VALUES
                                          (:newactionid,
                                          (SELECT equipmentid FROM equipment
                                          WHERE equipmentcode = :equipmentcode))')
        RSQLite::dbBind(sql2b, params = list(newactionid = newactionid,
                                             equipmentcode = equipment_name))
        RSQLite::dbClearResult(res = sql2b)
      }
      ################################
      # insert new feature action
      sql3 <- RSQLite::dbSendStatement(db, 'INSERT into featureactions
                                       (actionid, samplingfeatureid)
                                       VALUES
                                       (:newactionid,
                                       (SELECT samplingfeatureid
                                       FROM samplingfeatures
                                       WHERE samplingfeaturecode = :site_code))')
      RSQLite::dbBind(sql3, params = list(newactionid = newactionid,
                                          site_code = datavalues[[site_code_col]][mrv_id]))
      RSQLite::dbClearResult(res = sql3)
      newfaid <- as.integer(DBI::dbGetQuery(db, "SELECT LAST_INSERT_ROWID()"))
      #################################

      # add result! new result for each variable

      newresultids <- c()

      for(j in names(variables)){
        sql4 <- RSQLite::dbSendStatement(db, 'INSERT into results
                                         (resultuuid, featureactionid, resulttypecv,
                                         variableid, unitsid, processinglevelid,
                                         sampledmediumcv, valuecount)
                                         VALUES
                                         (:uuid, :newfaid, :resulttypecv,
                                         (SELECT variableid FROM variables
                                         WHERE variablenamecv = :variablenamecv),
                                         (SELECT unitsid FROM units WHERE unitsname = :units),
                                         (SELECT processinglevelid FROM processinglevels
                                         WHERE processinglevelcode = :processinglevel),
                                         :sampledmedium, :valuecount)')
        RSQLite::dbBind(sql4, params = list(uuid = uuid::UUIDgenerate(),
                                            newfaid = newfaid,
                                            resulttypecv = 'Measurement',
                                            variablenamecv = j,
                                            units = variables[[j]][["units"]],
                                            processinglevel = processinglevel,
                                            sampledmedium = sampledmedium,
                                            valuecount = 1
        ))
        RSQLite::dbClearResult(res = sql4)
        newresultids <- append(newresultids, as.integer(DBI::dbGetQuery(db, "SELECT LAST_INSERT_ROWID()")))
      }
      names(newresultids) <- names(variables)
      #######################################

      # for each of the new results, insert into measurement results
      for(j in names(newresultids)){

        qualitycodecv <- dplyr::select(datavalues, variables[[j]][["qualitycodecol"]])

        if(is.null(variables[[j]][["qualitycodecol"]])){
          qualitycodecv <- "Unknown"
        }

        censorcodecv <- dplyr::select(datavalues, variables[[j]][["censorcodecol"]])

        if(is.null(variables[[j]][["censorcodecol"]])){
          censorcodecv <- "Unknown"
        }

        sql5 <- 'INSERT into measurementresults
        (resultid, aggregationstatisticcv, zlocation, zlocationunitsid,
        timeaggregationinterval, timeaggregationintervalunitsid, censorcodecv, qualitycodecv)
        VALUES (:resultid, :aggstatcv, :zlocation,
        (SELECT unitsid FROM units WHERE unitsname = :zlocationunits),
        :timeaggregationinterval,
        (SELECT unitsid FROM units WHERE unitsname = :timeaggregationintervalunits),
        :censorcodecv, :qualitycodecv)'
        sql5 <- RSQLite::dbSendStatement(db, sql5)
        RSQLite::dbBind(sql5, params = list(resultid = newresultids[[j]],
                                            aggstatcv = ifelse(is.null(aggregationstatistic),
                                                               "Unknown", aggregationstatistic),
                                            zlocation = ifelse(is.null(zlocation), "", zlocation),
                                            zlocationunits = ifelse(is.null(zlocationunits),
                                                                    "", zlocationunits),
                                            timeaggregationinterval = time_aggregation_interval[[1]],
                                            timeaggregationintervalunits = time_aggregation_interval[[2]],
                                            censorcodecv = ifelse(is.null(variables[[j]]$censorcodecol),
                                                                  "Unknown", variables[[j]]$censorcodecol),
                                            qualitycodecv = ifelse(is.null(variables[[j]]$qualitycodecol),
                                                                   "Unknown", variables[[j]]$qualitycodecol)
        ))
        RSQLite::dbClearResult(res = sql5)
      }
      ################################
      # then insert into measurementresultvalues
      for(j in names(newresultids)){
        # subset data values
        var_colname <- variables[[j]]$column
        # make data frame to append
        datavalues_var <- datavalues %>%
          dplyr::slice(mrv_id) %>%
          dplyr::select(Timestamp, var_colname) %>%
          dplyr::rename(valuedatetime = Timestamp,
                        datavalue = var_colname) %>%
          dplyr::mutate(valuedatetimeutcoffset = as.integer(substr(
            format(as.POSIXct(valuedatetime), "%z"),1,3)),
            valuedatetime = format(as.POSIXct(valuedatetime), "%Y-%m-%d %H:%M:%S"),
            resultid = as.integer(newresultids[[j]]))
        # append
        RSQLite::dbAppendTable(db, "measurementresultvalues", datavalues_var)
      }
    }

    purrr::map(1:nrow(datavalues), db_insert_one_mrv)
  }

  if (class(db) == "PostgreSQLConnection"){
    if(!processinglevel %in%
       DBI::dbGetQuery(db, "SELECT definition from odm2.processinglevels")$definition){
      sql <- "INSERT into odm2.processinglevels (processinglevelcode, defintion)
      VALUES (?processinglevel, ?processinglevel)"
      sql <- DBI::sqlInterpolate(db, sql, processinglevel = processinglevel)
      RPostgreSQL::dbGetQuery(db, sql)
    }

    vars_to_add <- setdiff(names(variables), rodm2::db_get_variables(db)[[1]])
    # check that all variables are in variables table
    for(newvar in vars_to_add){
      sql <- "INSERT into odm2.variables
      (variabletypecv, variablecode, variablenamecv, nodatavalue)
      VALUES
      (?variabletypecv, ?variablecode, ?variablenamecv, ?nodatavalue)"
      sql <- DBI::sqlInterpolate(db, sql,
                                 variabletypecv = "Unknown",
                                 variablecode = newvar,
                                 variablenamecv = newvar,
                                 nodatavalue = "-9999")
      RPostgreSQL::dbGetQuery(db, sql)

      if(!is.null(variables[[newvar]]$nodatavalue)){
        sql <- DBI::sqlInterpolate(db,
                                   "UPDATE odm2.variables SET nodatavalue = ?nodatavalue
                                   WHERE variablecode = ?variablecode",
                                   nodatavalue = variables[[newvar]]$nodatavalue,
                                   variablecode = newvar)
        RPostgreSQL::dbGetQuery(db, sql)
      }

      if(!is.null(variables[[newvar]]$variabletypecv)){
        sql <- DBI::sqlInterpolate(db,
                                   "UPDATE odm2.variables SET variabletypecv = ?variabletypecv
                                   WHERE variablecode = ?variablecode",
                                   variabletypecv = variables[[newvar]]$variabletypecv,
                                   variablecode = newvar)
        RPostgreSQL::dbGetQuery(db, sql)
      }

      if(!is.null(variables[[newvar]]$variabledefinition)){
        sql <- DBI::sqlInterpolate(db,
                                   "UPDATE odm2.variables SET variabledefinition = ?variabledefinition
                                   WHERE variablecode = ?variablecode",
                                   variabledefinition = variables[[newvar]]$variabledefinition,
                                   variablecode = newvar)
        RPostgreSQL::dbGetQuery(db, sql)
      }
      message(paste(newvar, "has been added to the Variables table."))
    }

    #######################################
    # for each ROW of MRV data frame
    for(mrv_id in 1:nrow(datavalues)){

    # db_insert_one_mrv_postgres <- function(mrv_id){
      sql1 <- DBI::sqlInterpolate(db,
                                  'WITH newact AS (
                                  INSERT into odm2.actions
                                  (actiontypecv, methodid, begindatetime,
                                  begindatetimeutcoffset)
                                  VALUES
                                  (?actiontype,
                                  (SELECT methodid FROM odm2.methods WHERE methodcode = ?method),
                                  ?begindatetime,
                                  ?begindatetimeutcoffset)
                                  RETURNING actionid)

                                  INSERT into odm2.featureactions
                                  (actionid, samplingfeatureid)
                                  VALUES
                                  ((SELECT newact.actionid FROM newact),
                                  (SELECT samplingfeatureid FROM odm2.samplingfeatures
                                  WHERE samplingfeaturecode = ?site_code))
                                  RETURNING actionid, featureactionid',

                                  actiontype = "Instrument deployment",
                                  method = methodcode,
                                  begindatetime = format(as.POSIXct(
                                    datavalues[["Timestamp"]][mrv_id]), "%Y-%m-%d %H:%M:%S"),
                                  begindatetimeutcoffset = as.integer(substr(
                                    format(as.POSIXct(
                                      datavalues[["Timestamp"]][mrv_id]), "%z"),
                                    1,3)
                                  ),
                                  site_code = datavalues[[site_code_col]][mrv_id])

      new_fa <- RPostgreSQL::dbGetQuery(db, sql1)
      newactionid <- new_fa$actionid
      newfaid <- new_fa$featureactionid

      ##################
      if(!is.null(actionby)){
        if(!(actionby %in% rodm2::db_get_people(db))){
          rodm2::db_describe_person(db, PersonFirstName = actionby, ...)
        }
        sql2 <- DBI::sqlInterpolate(db,
                                    'INSERT into odm2.actionby
                                    (actionid, affiliationid, isactionlead)
                                    VALUES
                                    (?newactionid,
                                    (SELECT affiliationid FROM odm2.affiliations
                                    WHERE personid =
                                    (SELECT personid FROM odm2.people WHERE personfirstname = ?actionby)),
                                    ?isactionlead)',
                                    newactionid = newactionid,
                                    actionby = actionby,
                                    isactionlead = "TRUE")
        RPostgreSQL::dbGetQuery(db, sql2)
      }

      if(!is.null(equipment_name)){
        # db describe equipment function
        if(!(equipment_name %in% rodm2::db_get_equipment(db))){
          rodm2::db_describe_equipment(db, equip_name = equipment_name, ...)
        }
        # add to equipment used
        sql2b <- DBI::sqlInterpolate(db, 'INSERT into odm2.equipmentused (actionid, equipmentid)
                                     VALUES
                                     (?newactionid,
                                     (SELECT equipmentid
                                     FROM odm2.equipment WHERE equipmentcode = ?equipmentcode))',
                                     newactionid = newactionid,
                                     equipmentcode = equipment_name)
        RPostgreSQL::dbGetQuery(db, sql2b)
      }

      ################################

      # add result! new result for each variable
      # for variables list, if no 'column', make col = name
      for(j in names(variables)){
        if(!"column" %in% names(variables[[j]])){
          variables[[j]][["column"]] <- j
        }
      }

      newresultids <- c()

      for(j in names(variables)){
        sql4 <- DBI::sqlInterpolate(db, 'INSERT into odm2.results
                                    (resultuuid, featureactionid, resulttypecv,
                                    variableid, unitsid, processinglevelid,
                                    sampledmediumcv, valuecount)
                                    VALUES
                                    (?uuid, ?newfaid, ?resulttypecv,
                                    (SELECT variableid FROM odm2.variables
                                    WHERE variablenamecv = ?variablenamecv),
                                    (SELECT unitsid FROM odm2.units WHERE unitsname = ?units),
                                    (SELECT processinglevelid FROM odm2.processinglevels
                                    WHERE definition = ?processinglevel),
                                    ?sampledmedium, ?valuecount)
                                    RETURNING resultid',
                                    uuid = uuid::UUIDgenerate(),
                                    newfaid = newfaid,
                                    resulttypecv = 'Measurement',
                                    variablenamecv = j,
                                    units = variables[[j]][["units"]],
                                    processinglevel = processinglevel,
                                    sampledmedium = sampledmedium,
                                    valuecount = 1)

        newresult <- RPostgreSQL::dbGetQuery(db, sql4)
        newresultids <- append(newresultids, as.integer(newresult))
      }
      names(newresultids) <- names(variables)
      #######################################

      # for each of the new results, insert into measurement results
      for(j in names(newresultids)){

        qualitycodecv <- dplyr::select(datavalues, variables[[j]][["qualitycodecol"]])

        if(is.null(variables[[j]][["qualitycodecol"]])){
          qualitycodecv <- "Unknown"
        }

        censorcodecv <- dplyr::select(datavalues, variables[[j]][["censorcodecol"]])

        if(is.null(variables[[j]][["censorcodecol"]])){
          censorcodecv <- "Unknown"
        }

        sql5 <- 'INSERT into odm2.measurementresults
        (resultid, aggregationstatisticcv,
        timeaggregationinterval, timeaggregationintervalunitsid, censorcodecv, qualitycodecv)
        VALUES (?resultid, ?aggstatcv,
        ?timeaggregationinterval,
        (SELECT unitsid FROM odm2.units WHERE unitsname = ?timeaggregationintervalunits),
        ?censorcodecv, ?qualitycodecv)'
        sql5 <- DBI::sqlInterpolate(db, sql5,
                                    resultid = newresultids[[j]],
                                    aggstatcv = ifelse(is.null(aggregationstatistic),
                                                       "Unknown", aggregationstatistic),
                                    timeaggregationinterval = time_aggregation_interval[[1]],
                                    timeaggregationintervalunits = time_aggregation_interval[[2]],
                                    censorcodecv = ifelse(is.null(variables[[j]]$censorcodecol),
                                                          "Unknown", variables[[j]]$censorcodecol),
                                    qualitycodecv = ifelse(is.null(variables[[j]]$qualitycodecol),
                                                           "Unknown", variables[[j]]$qualitycodecol)
        )
        RPostgreSQL::dbGetQuery(db, sql5)

        if(!is.null(zlocation)){
          sql <- DBI::sqlInterpolate(db,
                                     'UPDATE odm2.measurementresults
                                     SET zlocation = ?zlocation,
                                     zlocationunitsid = (SELECT unitsid FROM odm2.units WHERE unitsname = ?zlocationunits)
                                     WHERE resultid = ?resultid)',
                                     zlocation = zlocation,
                                     zlocationunits = zlocationunits, resultid = j)
          RPostgreSQL::dbGetQuery(db, sql)}
      }
      ################################
      # then insert into measurementresultvalues
      for(j in names(newresultids)){
        # subset data values
        var_colname <- variables[[j]]$column
        # make data frame to append
        datavalues_var <- datavalues %>%
          dplyr::slice(mrv_id) %>%
          dplyr::select(Timestamp, var_colname) %>%
          dplyr::rename(valuedatetime = Timestamp,
                        datavalue = var_colname) %>%
          dplyr::mutate(valuedatetimeutcoffset = as.integer(substr(
            format(as.POSIXct(valuedatetime), "%z"), 1,3)),
            valuedatetime = format(as.POSIXct(valuedatetime), "%Y-%m-%d %H:%M:%S"),
            resultid = as.integer(newresultids[[j]]))
        # append
        DBI::dbWriteTable(db, c("odm2", "measurementresultvalues"),
                          datavalues_var, row.names = FALSE,
                          overwrite = FALSE,
                          append = TRUE)
      }
    }

    # purrr::map(1:nrow(datavalues), db_insert_one_mrv_postgres)

  }


  }
khondula/rodm2 documentation built on Jan. 9, 2020, 1:48 p.m.