#!/usr/bin/env Rscript
# optparse may not be installed globally so look for renv directory before
# parsing cli args with optparse
args <- commandArgs(trailingOnly=TRUE)
renv_loc <- args == "--renv"
if (any(renv_loc)) {
renv_idx <- which(renv_loc)
if (length(args) > renv_idx) {
proj_dir <- renv::activate(args[renv_idx + 1])
message("activated renv directory ", proj_dir)
}
}
parser <- optparse::OptionParser(
usage = "usage: psd.R [options] db vct_dir",
description = "Create gridded particle size distribution data from VCT data"
)
parser <- optparse::add_option(parser, "--bins",
type = "integer", default = 85, metavar = "N",
help = "Number of bins along each dimension of the distribution [default %default]",
)
parser <- optparse::add_option(parser, "--dimensions",
type = "character", default = "fsc_small,pe,chl_small,Qc,diam",
help = "Comma-separated list of dimensions for grid. Qc will always be included regardless of this value. [default %default]"
)
parser <- optparse::add_option(parser, "--keep-outliers",
action = "store_true", default = FALSE,
help = "Don't remove 3 minute windows flagged as outliers [default %default]"
)
parser <- optparse::add_option(parser, "--no-data.table",
action = "store_true", default = FALSE,
help = "Don't use data.table for performance-critical aggregation [default %default]"
)
parser <- optparse::add_option(parser, "--only-full-counts",
action = "store_true", default = FALSE,
help = "Only produce counts for data at full time resolution [default %default]"
)
parser <- optparse::add_option(parser, "--out",
type = "character", default = "PSD", metavar = "FILE_BASE",
help = "Output file base path [default %default]",
)
parser <- optparse::add_option(parser, "--quantile",
type = "character", default = "2.5", metavar = "QUANTILE",
help = "Quantile. Choices are 2.5, 50, 97.5. [default %default]",
)
parser <- optparse::add_option(parser, "--remove-boundary-points",
action = "store_true", default = FALSE,
help = "Remove particles at the per-file boundary of fsc_small, chl_small, pe, diam, Qc [default %default]"
)
parser <- optparse::add_option(parser, "--renv",
type = "character", default = "", metavar = "dir",
help = "Optional renv directory to use. Requires the renv package."
)
parser <- optparse::add_option(parser, "--verbose",
action = "store_true", default = FALSE,
help = "Print extra diagnostic messages [default %default]"
)
parser <- optparse::add_option(parser, "--volume",
type = "integer", default = NULL, metavar = "VOLUME",
help = "Hard code a single volume value for abundance calculations, i.e. if stream pressure or file duration is unreliable [default %default]",
)
p <- optparse::parse_args2(parser)
if (length(p$args) < 2) {
optparse::print_help(parser)
quit(save="no")
} else {
db <- normalizePath(p$args[1])
vct_dir <- normalizePath(p$args[2])
bins <- p$options$bins
dimensions <- p$options$dimensions
keep_outliers <- p$options$keep_outliers
no_data.table <- p$options$no_data.table
only_full_counts <- p$options$only_full_counts
out <- p$options$out
quantile_ <- p$options$quantile
remove_boundary_points <- p$options$remove_boundary_points
verbose <- p$options$verbose
volume <- p$options$volume
if (!dir.exists(vct_dir) || !file.exists(db)) {
stop(paste0("vct_dir or db does not exist"))
}
}
message("using popcycle version ", packageVersion("popcycle"))
library(dplyr, warn.conflicts=FALSE)
dated_msg <- function(...) {
message(format(Sys.time(), "%Y-%m-%d %H:%M:%OS3"), ": ", ...)
}
# --------------------------
# Set up dimensions for grid
# --------------------------
possible_dimensions <- unlist(stringr::str_split("fsc_small,pe,chl_small,Qc,diam", "\\s*,\\s*"))
dimensions <- unlist(stringr::str_split(dimensions, "\\s*,\\s*"))
if (length(setdiff(dimensions, possible_dimensions)) > 0) {
stop(glue("Error: unknown dimensions {stringr::str_flatten(setdiff(dimensions, possible), ', ')}"))
}
# Always add Qc
if (! "Qc" %in% dimensions) {
dimensions[length(dimensions) + 1] <- "Qc"
}
dated_msg("Start")
message("Configuration:")
message("--------------")
message(paste0("db = ", db))
message(paste0("vct-dir = ", vct_dir))
message(paste0("bins = ", bins))
message(paste0("dimensions = ", stringr::str_flatten(dimensions, ", ")))
message(paste0("keep_outliers = ", keep_outliers))
message(paste0("no-data.table = ", no_data.table))
message(paste0("out = ", out))
message(paste0("quantile = ", quantile_))
message(paste0("remove-boundary-points = ", remove_boundary_points))
message(paste0("verbose = ", verbose))
message(paste0("volume = ", volume))
message("--------------")
# Create output directory tree
dir.create(dirname(out), recursive=T, showWarnings=F)
cruise <- popcycle::get_cruise(db)
vct_files <- list.files(vct_dir, "\\.parquet$", full.names=T)
meta_full <- popcycle::create_meta(db, as.numeric(quantile_))
meta <- meta_full[, c("date", "volume", "opp_evt_ratio", "flag")]
if (! is.null(volume)) {
dated_msg(glue::glue("setting volume to {volume} in meta table"))
meta$volume <- volume
}
dated_msg("Retrieving refractive index table")
refracs <- popcycle::read_refraction_csv()
refracs <- refracs[refracs$cruise == cruise, ]
dated_msg(paste0(capture.output(refracs), collapse="\n"))
refracs$cruise <- NULL
if (nrow(refracs) == 0) {
dated_msg(paste0("refractive index table has no entry for ", cruise, ", using mid for Pro/Syn, lwr for Pico/Croco/Beads/Unknown"))
refracs <- tibble::as_tibble(data.frame(
prochloro = "mid", synecho = "mid", croco = "lwr", picoeuk = "lwr", beads = "lwr", unknown = "lwr"
))
}
dated_msg(paste0(capture.output(refracs), collapse="\n"))
if (nrow(refracs) > 1) {
stop(paste0("refractive index table has multiple entries for ", cruise))
}
dated_msg("Retrieving influx calibration table")
calib_all <- popcycle::read_calib_csv()
calib <- calib_all[calib_all$cruise == cruise, ]
if (length(unique(calib$cruise)) == 0) {
dated_msg(paste0("calibration table has no entry for ", cruise, ", no calibration will be applied"))
calib <- NULL
}
dated_msg(paste0(capture.output(calib), collapse="\n"))
# Make the grid
grid <- popcycle::create_grid(bins, log_base=2, log_answers=FALSE)
grid_df <- tibble::tibble(fsc_small=grid$fsc_small, pe=grid$pe, chl_small=grid$chl_small, Qc=grid$Qc, diam=grid$diam)
# Subset down to dimensions from CLI
grid <- grid[dimensions]
grid_df <- grid_df[dimensions]
arrow::write_parquet(grid_df, paste0(out, ".grid.parquet"))
# ---------------------------------
# Create the full gridded data file
# ---------------------------------
if (!keep_outliers) {
ignore_dates <- meta %>% filter(flag != 0) %>% pull(date)
} else {
ignore_dates <- NULL
}
psd <- popcycle::create_PSD(
vct_files, quantile_, refracs, grid, log_base=NULL,
remove_boundary_points=remove_boundary_points, ignore_dates=ignore_dates,
use_data.table=!no_data.table, verbose=verbose
)
invisible(gc())
dated_msg("Full PSD dim = ", stringr::str_flatten(dim(psd), " "), ", MB = ", object.size(psd) / 2**20)
ptm <- proc.time()
arrow::write_parquet(psd, paste0(out, ".full.parquet"))
deltat <- proc.time() - ptm
dated_msg("Wrote full parquet in ", deltat[["elapsed"]], " seconds")
invisible(gc())
if (!only_full_counts) {
# -----------
# Hourly file
# -----------
# data.table multi-threading temporarily disabled for grouping
orig_threads <- data.table::getDTthreads()
data.table::setDTthreads(1)
hourly <- popcycle::group_psd_by_time(psd, time_expr="1 hours", use_data.table=!no_data.table)
data.table::setDTthreads(orig_threads)
psd <- tibble::as_tibble(psd)
invisible(gc())
# Add volume-normalized abundances to hourly data
if (!keep_outliers) {
meta <- meta %>% dplyr::filter(flag == 0) # remove flagged files
}
hourly_volumes <- popcycle::create_volume_table(meta, time_expr="1 hour")
hourly_psd <- popcycle::add_abundance(hourly, hourly_volumes, calib=calib)
dated_msg("Hourly PSD dim = ", stringr::str_flatten(dim(hourly_psd), " "), ", MB = ", object.size(hourly_psd) / 2**20)
ptm <- proc.time()
arrow::write_parquet(hourly_psd, paste0(out, ".hourly.parquet"))
deltat <- proc.time() - ptm
dated_msg("Wrote hourly parquet in ", deltat[["elapsed"]], " seconds")
ptm <- proc.time()
readr::write_csv(hourly_psd %>% dplyr::mutate(cruise=cruise) %>% dplyr::rename_with(tolower), paste0(out, ".hourly.csv.gz"))
deltat <- proc.time() - ptm
dated_msg("Wrote hourly CSV in ", deltat[["elapsed"]], " seconds")
ptm <- proc.time()
arrow::write_parquet(hourly_volumes %>% dplyr::select(date, volume_small, volume_large), paste0(out, ".hourly-volumes.parquet"))
deltat <- proc.time() - ptm
dated_msg("Wrote hourly volume parquet in ", deltat[["elapsed"]], " seconds")
}
dated_msg("Finished")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.