knitr::opts_chunk$set(echo = TRUE)
library(dplyr)
library(ggplot2)

Goal

Greate a SQLite database from CATS PRHs. Given a deployment, find the corresponding NetCDF file on the external hard drive, create tibbles for the PRH and the deployment metadata, append to the database.

Connect to the database

cats_path <- "/Volumes/COPYCATSdat/CATS/"
db_path <- fs::path_join(c(cats_path, "cats.sqlite"))
cats_db <- DBI::dbConnect(RSQLite::SQLite(), db_path)
DBI::dbDisconnect(cats_db)

Find a NetCDF PRH

# Find the PRH NetCDF file
find_nc <- function(depid) {
  tag_path <- fs::dir_ls(fs::path_join(c(cats_path, "tag_data")), 
                         regexp = glue::glue("{depid}"))
  stopifnot(length(tag_path) == 1)
  nc_path <- fs::dir_ls(tag_path, regexp = ".nc$")
  stopifnot(length(nc_path) == 1)
  nc_path
}
# Test find_nc() on a known deployment
if (find_nc("bw180827-46") != "/Volumes/COPYCATSdat/CATS/tag_data/bw180827-46 (IOS_Monterey)/bw180827-46_prh10.nc") {
  stop("Error in find_nc()")
}

Import NetCDF PRH into database

# Read a PRH
# Creates an object of class `prh` (inherits from `tbl_df`)
# Has attributes for metadata (a data.frame) and sampling rate (e.g. 10 Hz)
# Trimmed to tag on time
read_prh_nc <- function(depid) {
  # Locate NetCDF
  nc_path <- find_nc(depid)
  stopifnot(length(nc_path) == 1)

  prh_nc <- ncdf4::nc_open(nc_path)
  metadata <- as_tibble(ncdf4::ncatt_get(prh_nc, 0))
  fs <- ncdf4::ncatt_get(prh_nc, "P", "sampling_rate")$value
  metadata$fs <- fs
  tz <- sprintf("Etc/GMT%+i", -metadata$dephist_device_tzone)

  # read variables
  dn <- ncdf4::ncvar_get(prh_nc, "DN")
  dt <- breath::dn_to_posix(dn, tz)
  p0 <- ncdf4::ncvar_get(prh_nc, "P")
  pitch0 <- ncdf4::ncvar_get(prh_nc, "pitch")
  roll0 <- ncdf4::ncvar_get(prh_nc, "roll")
  head0 <- ncdf4::ncvar_get(prh_nc, "head")
  speed0 <- ncdf4::ncvar_get(prh_nc, "speedJJ")
  speed0[is.nan(speed0)] <- NA
  A <- ncdf4::ncvar_get(prh_nc, "Aw")
  M <- ncdf4::ncvar_get(prh_nc, "Mw")
  G <- ncdf4::ncvar_get(prh_nc, "Gw")
  pos <- ncdf4::ncvar_get(prh_nc, "POS") %>% 
    as_tibble() %>% 
    purrr::set_names(c("secs", "lon", "lat"))

  # Create tibble (and smooth some variables)
  dep_start <- metadata$dephist_deploy_datetime_start %>% 
    lubridate::dmy_hms(tz = "UTC") %>% 
    lubridate::with_tz(tz)
  dep_end <- metadata$dephist_deploy_datetime_end %>% 
    lubridate::dmy_hms(tz = "UTC") %>% 
    lubridate::with_tz(tz)
  roll_mean <- function(x, n) {
    result <- RcppRoll::roll_mean(x, n, fill = NA)
    result[is.na(result)] <- x[is.na(result)]
    result
  }
  result <- tibble(dt, 
                   p0, pitch0, roll0, head0, speed0, 
                   A, M, G) %>%
    filter(between(dt, dep_start, dep_end)) %>% 
    mutate(secs = round(as.numeric(dt - dt[1], unit = "secs"), digits = 3)) %>%
    mutate_at(
      vars(p0:speed0),
      list(smooth = ~ roll_mean(.x, n = fs * 2))
    ) %>% 
    rename_at(
      vars(ends_with("smooth")),
      ~ substr(.x, 1, nchar(.x) - nchar("0_smooth"))
    ) %>% 
    left_join(pos, by = "secs") %>% 
    select(dt, secs, p:speed, lon:lat, A:G, p0:speed0)

  dimnames(result$A) <- list(NULL, c("x", "y", "z"))
  dimnames(result$M) <- list(NULL, c("x", "y", "z"))
  dimnames(result$G) <- list(NULL, c("x", "y", "z"))

  # Add metadata 
  attr(result, "class") <- c("prh", "data.frame")
  attr(result, "metadata") <- metadata

  result
}

# PRH metadata accessors
get_fs <- function(prh) {
  stopifnot(is_prh(prh))
  attr(prh, "metadata")$fs
}
get_depid <- function(prh) {
  stopifnot(is_prh(prh))
  attr(prh, "metadata")$depid
}

# Import a PRH into the database
# Returns TRUE on success
import_prh <- function(depid, cats_db, overwrite = FALSE, verbose = FALSE) {
  futile.logger::flog.trace(glue::glue("Starting {depid}..."))
  stopifnot(is.character(depid) && length(depid) == 1)
  futile.logger::flog.trace(glue::glue("...reading NetCDF..."))
  prh <- read_prh_nc(depid)
  futile.logger::flog.trace(glue::glue("...done reading NetCDF..."))
  for (sensor in c("A", "M", "G")) {
    for (axis in 1:3) {
      newcol <- paste0(sensor, axis)
      prh[[newcol]] <- prh[[sensor]][, axis]
    }
    prh[[sensor]] <- NULL
  }
  prh$depid <- get_depid(prh)
  class(prh) <- "data.frame"

  futile.logger::flog.trace(glue::glue("...writing to database..."))

  # If database is empty, initiate with PRH
  if (length(DBI::dbListTables(cats_db)) == 0) {
    DBI::dbWriteTable(cats_db, "prh", prh)
    DBI::dbWriteTable(cats_db, "tag_guide", attr(prh, "metadata"))
  } else {
    # If deployment is already in database, either overwrite or stop
    db_depids <- DBI::dbGetQuery(cats_db, "SELECT depid FROM tag_guide")
    if (depid %in% db_depids) {
      if (overwrite) {
        DBI::dbGetQuery(
          cats_db, 
          glue::glue("DROP FROM prh WHERE depid = {depid}")
        )
        DBI::dbGetQuery(
          cats_db, 
          glue::glue("DROP FROM tag_guide WHERE depid = {depid}")
        )
        DBI::dbWriteTable(cats_db, "prh", prh, append = TRUE)
        DBI::dbWriteTable(cats_db, "tag_guide", attr(prh, "metadata"), append = TRUE)
      } else {
        stop("Deployment in database and overwrite = FALSE")
      }
    } else {
      # Otherwise, append to existing tables
      DBI::dbWriteTable(cats_db, "prh", prh, append = TRUE)
      DBI::dbWriteTable(cats_db, "tag_guide", attr(prh, "metadata"), append = TRUE)
    }
  }
  futile.logger::flog.trace(glue::glue("...done!"))

  TRUE
}

test_import_prh <- function() {
  # Create temporary DB
  cats_path <- "/Volumes/COPYCATSdat/CATS/"
  db_path <- fs::file_temp(tmp_dir = cats_path, ext = "sqlite")
  cats_db <- DBI::dbConnect(RSQLite::SQLite(), db_path)

  # Import two PRHs
  bw_depid <- "bw180827-46"
  import_prh(bw_depid, cats_db)
  bb_depid <- "bb190302-52"
  import_prh(bb_depid, cats_db)

  # Check table names and number of rows
  db_tblnames <- DBI::dbListTables(cats_db)
  stopifnot(all(db_tblnames == c("prh", "tag_guide")))
  prh_nrows <- DBI::dbGetQuery(cats_db, "SELECT COUNT(depid) FROM prh")[[1]]
  stopifnot(prh_nrows == 3141027)
  tagguide_nrows <- DBI::dbGetQuery(
    cats_db, 
    "SELECT COUNT(depid) FROM tag_guide"
  )[[1]]
  stopifnot(tagguide_nrows == 2)

  # Re-import bw_depid without overwrite
  reimport_err <- tryCatch({
    import_prh(bw_depid, cats_db, overwrite = FALSE)
    FALSE
  }, error = function(e) TRUE)
  browser()
  stopifnot(reimport_err)

  # Re-import bw_depid with overwrite (number of rows shouldn't change)
  import_prh(bw_depid, cats_db, overwrite = TRUE)  
  prh_nrows <- DBI::dbGetQuery(cats_db, "SELECT COUNT(depid) FROM prh")[[1]]
  stopifnot(prh_nrows == 3141027)
  tagguide_nrows <- DBI::dbGetQuery(
    cats_db, 
    "SELECT COUNT(depid) FROM tag_guide"
  )[[1]]
  stopifnot(tagguide_nrows == 2)

  # Disconnect and delete temporary DB
  DBI::dbDisconnect(cats_db)
  fs::file_delete(db_path)

  print("Success!")
}

Run it

I picked out 19 candidate deployments for breath analysis (5 each bb, mn, and bw; 4 bp) that are all at least 6 hours long and most more than 24 hours. Import all 19 into the database.

candidates <- c("bb190302-52", "bb190309-52", "bb180304-45", "bb190224-52",
                "bb190226-56", "mn190228-42", "mn190227-44", "mn170908-44",
                "mn180227-43", "mn170817-50", "bp170907-41b", "bp180526-45",
                "bp180526-44", "bp180526-42", "bw180827-46", "bw170813-44",
                "bw170814-40", "bw180905-42", "bw180905-53")
futile.logger::flog.threshold(futile.logger::TRACE)
cats_db <- DBI::dbConnect(RSQLite::SQLite(), db_path)
import_results <- purrr::map_dfr(candidates, function(depid) {
  success <- tryCatch({
    import_prh(depid, cats_db, overwrite = FALSE, verbose = TRUE)
  },
  error = function(e) FALSE)
  tibble(depid, success)
})
DBI::dbDisconnect(cats_db)
futile.logger::flog.threshold(futile.logger::INFO)

Database to R

Reading PRHs back from the database into R.

read_prh_db <- function(depid, cats_db) {
  metadata <- tbl(cats_db, "tag_guide") %>% 
    filter(depid == !!depid) %>% 
    collect()
  if (nrow(metadata) == 0)
    stop(paste("depid not found:", depid))
  prh <- tbl(cats_db, "prh") %>% 
    filter(depid == !!depid) %>% 
    collect()

  # Matrix columns
  prh$A <- with(prh, as.matrix(cbind(A1, A2, A3)))
  prh$M <- with(prh, as.matrix(cbind(M1, M2, M3)))
  prh$G <- with(prh, as.matrix(cbind(G1, G2, G3)))
  dimnames(prh$A) <- list(NULL, c("x", "y", "z"))
  dimnames(prh$M) <- list(NULL, c("x", "y", "z"))
  dimnames(prh$G) <- list(NULL, c("x", "y", "z"))

  # POSIX column
  prh$dt <- as.POSIXct(
    prh$dt, 
    origin = "1970-01-01", 
    tz = sprintf("Etc/GMT%+d", -metadata$dephist_device_tzone)
  )

  prh <- select(prh, dt, secs, p:speed, lon:lat, A:G, p0:speed0)

  attr(prh, "class") <- c("prh", "data.frame")
  attr(prh, "metadata") <- metadata
  prh
}

# test it out
cats_db <- DBI::dbConnect(RSQLite::SQLite(), db_path)
mn190228_42_db <- read_prh_db("mn190228-42", cats_db)
DBI::dbDisconnect(cats_db)
mn190228_42_nc <- read_prh_nc("mn190228-42")
all.equal(mn190228_42_db, mn190228_42_nc)


FlukeAndFeather/breath documentation built on April 25, 2020, 3:15 a.m.