#' Generate Lookup table for Aggregated Nextwork
#' @param gpkg to the aggregated network. The look up table will be added here as a layer called "lookup_table"
#' @param flowpath_name the name of the flowpath layer in gpkg
#' @param refactored_fabric the gpkg for the corresponding VPU reference fabric
#' @return data.frame
#' @export
#' @importFrom sf read_sf st_drop_geometry write_sf
#' @importFrom dplyr mutate select full_join
#' @importFrom tidyr unnest
generate_lookup_table = function(gpkg = NULL,
flowpath_name = "aggregate_flowpaths",
refactored_fabric = NULL){
if(is.null(gpkg) | is.null(refactored_fabric)){
stop("hydrofabrics must be provided.")
}
lu = read_sf(gpkg, flowpath_name) |>
st_drop_geometry() |>
select(aggregated_flowpath_ID = id,
toid = toid,
member_COMID = member_comid,
aggregated_divide_ID = realized_catchment,
) |>
mutate(member_COMID = strsplit(member_COMID, ",")) |>
unnest(col = 'member_COMID') |>
full_join(read_sf(refactored_fabric, "lookup_table"),
by = "member_COMID") |>
left_join(
select(read_sf(gpkg, "nexus"),
toid = id,
outlet_type = type),
by = "toid") |>
select(-toid)
write_sf(lu, gpkg, "lookup_table")
return(lu)
}
describe_hydrofabric = function(network_list, verbose = TRUE){
xx = sapply(network_list, nrow)
if(xx[[1]] != xx[[2]]){
hyaggregate_log( "WARN",
glue("{xx[1]} {names(xx)[1]} vs. {xx[2]} {names(xx)[2]}"),
verbose)
return(FALSE)
} else {
hyaggregate_log( "INFO",
glue("{xx[1]} features"),
verbose)
return(FALSE)
}
}
#' HyAggregate logging shorthand
#'
#' @param level
#' @param message
#' @param emit
#' @return log message
#' @export
#' @importFrom logger log_level
hyaggregate_log = function(level, message, emit = TRUE){ if(emit){ log_level(level, message) } }
drop_extra_features = function(network_list, verbose){
network_list$flowpaths = network_list$flowpaths[!duplicated(network_list$flowpaths), ]
network_list$catchments = network_list$catchments[!duplicated(network_list$catchments), ]
cond = describe_hydrofabric(network_list, verbose)
if(!cond){
bad_fps = filter(network_list$flowpaths, !id %in% network_list$catchments$id)$id
if(length(bad_fps) > 0) {
hyaggregate_log("WARN", glue("Removing flowpath(s): {paste(bad_fps, collapse = ', ')}"), emit = verbose)
}
bad_cats = filter(network_list$catchments, !id %in% network_list$flowpaths$id)$id
if(length(bad_cats) > 0) {
hyaggregate_log("WARN", glue("Removing catchment(s): {paste(bad_cats, collapse = ', ')}"), emit = verbose)
}
network_list = list(flowpaths = filter(network_list$flowpaths, id %in% network_list$catchments$id),
catchments = filter(network_list$catchments, id %in% network_list$flowpaths$id))
}
return(network_list)
}
#' Read Catchments and Flowpaths from Geopackage
#' Convienence function for reading two layers into a list
#' @param gpkg path to geopackage
#' @param catchment_name name of catchment layer
#' @param flowpath_name name of flowpath layer
#' @param crs desired CRS, if NULL they stay as read
#' @param vebose should message be emitted?
#' @return list
#' @export
read_hydrofabric_package = function(gpkg,
catchment_name = NULL,
flowpath_name = NULL,
crs = NULL,
verbose = TRUE){
hyaggregate_log("INFO", glue("\n--- Read in data from {gpkg} ---\n"), verbose)
if(is.null(flowpath_name)){
flowpath_name = grep("flowpath", st_layers(gpkg)$name, value = TRUE)
hyaggregate_log("INFO", glue("Reading flowpaths from: {flowpath_name}"), verbose)
}
if(is.null(catchment_name)){
catchment_name = grep("divide", st_layers(gpkg)$name, value = TRUE)
hyaggregate_log("INFO", glue("Reading flowpaths from: {catchment_name}"), verbose)
}
out = list()
if(layer_exists(gpkg, catchment_name)){
cat = read_sf(gpkg, catchment_name)
if(is.null(crs)){ cat = st_transform(cat, crs) }
out[["catchments"]] <- cat
}
if(layer_exists(gpkg, flowpath_name)){
fl = read_sf(gpkg, flowpath_name)
if(is.null(crs)){ fl = st_transform(fl, crs) }
out[["flowpaths"]] <- fl
}
return(out)
}
write_hydrofabric_package = function(network_list,
outfile,
catchment_name,
flowpath_name,
verbose){
hyaggregate_log("SUCCESS", glue("Writing {flowpath_name} & {catchment_name} to {outfile}"), verbose)
write_sf(network_list$flowpaths, outfile, flowpath_name)
write_sf(network_list$catchments, outfile, catchment_name)
}
#' Flush existing ID prefixes
#' Given a data object and column, remove a prefix and adjoining "-"
#' @param input input data object
#' @param col column to remove prefix from
#' @return data object with updated column
#' @export
flush_prefix = function(input, col) {
for (i in col) {
input[[i]] = as.numeric(gsub(".*-", "", input[[i]]))
}
input
}
#' Add hydrosequence
#' @param flowpaths sf object (LINESTRING)
#' @return sf object
#' @export
#' @importFrom nhdplusTools get_sorted rename_geometry
add_hydroseq = function(flowpaths) {
flowpaths$terminalID = NULL
flowpaths$terminalid = NULL
flowpaths$hydroseq = NULL
flowpaths$toid = ifelse(is.na(flowpaths$toid), 0, flowpaths$toid)
topo = get_sorted(st_drop_geometry(select(flowpaths, id, toid)), split = FALSE)
gc()
topo['hydroseq'] = 1:nrow(topo)
left_join(flowpaths, select(topo, id, hydroseq), by = "id")
}
#' Add Measures to Flowlines and Catchments
#' @param flowpaths sf object (LINESTRING)
#' @param cat sf object (POLYGON)
#' @return list
#' @export
#' @importFrom dplyr select left_join
#' @importFrom sf st_drop_geometry
#' @importFrom nhdplusTools rename_geometry
add_measures = function(flowpaths, cat) {
flowpaths$lengthkm = add_lengthkm(flowpaths)
cat$areasqkm = add_areasqkm(cat)
flowpaths$areasqkm = NULL
flowpaths = left_join(flowpaths,
select(st_drop_geometry(cat), id, areasqkm),
by = "id")
list(flowpaths = rename_geometry(flowpaths, "geometry"),
catchments = rename_geometry(cat, "geometry"))
}
#' @title Flowpaths to linestrings
#' @description Takes an input list of flowpaths object and converts
#' all possible features to LINESTRING
#' @param fl a LINESTRING/MULTILINESTRING `sf` flowlines object
#' @return a LINESTRING `sf` flowlines object
#' @export
#' @importFrom sf st_geometry_type st_geometry st_line_merge
#' @importFrom dplyr bind_rows
flowpaths_to_linestrings = function(fl){
bool = (st_geometry_type(sf::st_geometry(fl)) == "MULTILINESTRING")
multis = fl[bool, ]
if(nrow(multis) > 0){
sf::st_geometry(multis) = st_line_merge(sf::st_geometry(multis))
}
singles = fl[!bool, ]
bind_rows(multis, singles)
}
#' Compute length in kilometers
#' @param x LINESTRING sf object
#' @return numeric vector
#' @export
#' @importFrom units set_units drop_units
#' @importFrom sf st_length
add_lengthkm = function (x) { drop_units(units::set_units(st_length(x), "km")) }
#' Compute area in square kilometers
#' @param x POLYGON sf object
#' @return numeric vector
#' @export
#' @importFrom units set_units drop_units
#' @importFrom sf st_area
add_areasqkm = function (x) { drop_units(set_units(st_area(x), "km2")) }
#' Aggregate Sets by Index Table
#' @param flowpaths LINESTRING flowpaths
#' @param cat POLYGON catchments
#' @param index_table index table to aggregate with
#' @return a list of catchments and flowpaths that have been validated
#' @export
#' @importFrom dplyr group_by mutate slice_max ungroup select left_join everything filter bind_rows rename `%>%`
#' @importFrom sf st_as_sf
#' @importFrom nhdplusTools rename_geometry get_sorted
#'
aggregate_sets = function(flowpaths, cat, index_table) {
set_topo = index_table %>%
group_by(set) %>%
mutate(member_comid = paste(.data$member_comid, collapse = ","),
type = paste(.data$type[!is.na(.data$type)], collapse = ","),
type = ifelse(.data$type == "", NA, .data$type),
value = paste(.data$value[!is.na(.data$value)], collapse = ","),
value = ifelse(.data$value == "", NA, .data$value)
) %>%
slice_max(hydroseq) %>%
ungroup() %>%
select(set, id = toid, levelpathid, member_comid, type, value) %>%
left_join(select(index_table, toset = set, id), by = "id") %>%
select(-id)
####
single_flowpaths = filter(index_table, n == 1) %>%
left_join(flowpaths, by = "id") %>%
st_as_sf() %>%
select(set) %>%
rename_geometry("geometry")
flowpaths_out = filter(index_table, n > 1) %>%
left_join(flowpaths, by = "id") %>%
st_as_sf() %>%
select(set) %>%
filter(!sf::st_is_empty(.)) %>%
geos_union_linestring_hyaggregate('set') %>%
rename_geometry("geometry") %>%
bind_rows(single_flowpaths) %>%
select(set) %>%
left_join(set_topo, by = "set") %>%
rename(id = set, toid = toset)
####
single_catchments = filter(index_table, n == 1) %>%
left_join(cat, by = "id") %>%
st_as_sf() %>%
select(set) %>%
rename_geometry("geometry")
catchments_out = filter(index_table, n != 1) %>%
left_join(cat, by = "id") %>%
st_as_sf() %>%
select(set) %>%
filter(!sf::st_is_empty(.)) %>%
geos_union_polygon_hyaggregate('set') %>%
rename_geometry("geometry") %>%
bind_rows(single_catchments) %>%
select(set) %>%
left_join(set_topo, by = "set") %>%
rename(id = set, toid = toset)
catchments_out$toid = ifelse(is.na(catchments_out$toid), 0, catchments_out$toid)
list(flowpaths = flowpaths_out, catchments = catchments_out)
}
#' @title Fast LINESTRING union
#' @description Wayyyy faster then either data.table, or sf based line merging
#' @param lines lines to merge
#' @param ID ID to merge over
#' @return LINESTRING sf object
#' @export
#' @importFrom terra aggregate vect
#' @importFrom dplyr select
#' @importFrom sf st_as_sf
geos_union_linestring_hyaggregate = function (lines, ID) {
aggregate(vect(lines), by = eval(ID)) %>%
st_as_sf() %>%
select(!!ID) %>%
flowpaths_to_linestrings()
}
#' @title Fast POLYGON Union
#' @description This is significantly faster then sf::st_union or summarize
#' @param poly sf POLYGON object
#' @param ID the column name over which to union geometries
#' @return sf object
#' @export
#' @importFrom terra aggregate vect makeValid
#' @importFrom dplyr select
#' @importFrom sf st_as_sf st_collection_extract st_geometry_type st_make_valid
geos_union_polygon_hyaggregate = function(poly, ID) {
poly = makeValid(vect(poly)) %>%
aggregate(by = eval(ID)) %>%
st_as_sf() %>%
select(!!ID)
if (any(grepl("COLLECTION", st_geometry_type(poly)))) {
poly = st_collection_extract(poly, "POLYGON")
}
return(poly)
}
#' Enforces area and length grouping
#' @description This function takes a vector of area's and length's and returns a
#' grouping vector that enforces the grouping of lengths and areas less then defined thresholds
#' @param l a vector of lengths
#' @param a a vector of areas
#' @param lthres a minimum length that must be achieved
#' @param athres a minimum length that must be achieved
#' @return a vector of length(a) containing grouping indexes
#' @export
agg_length_area <- function(l, a, lthres, athres) {
ids = 1:length(l)
if(length(ids) != 1){
if(!is.null(lthres)){
for (i in 1:(length(l)-1)) {
if (l[i] < lthres) {
ids[(i+1):length(l)] = ids[(i+1):length(l)] - 1
l[i+1] = l[i] + l[i+1]
l[i] = l[i+1]
a[i+1] = a[i] + a[i+1]
a[i] = a[i+1]
}
}
}
if(!is.null(athres)){
for (i in 1:(length(a)-1)) {
if (a[i] < athres) {
ids[(i+1):length(a)] = ids[(i+1):length(a)] - 1
a[i+1] = a[i] + a[i+1]
a[i] = a[i+1]
}
}
}
if(is.null(athres)){ athres = 0 }
if(is.null(lthres)){ lthres = 0 }
if(a[length(a)] < athres | l[length(l)] < lthres){
ids[length(ids)] = pmax(1, ids[length(ids)] - 1)
}
}
return (ids)
}
splitAt <- function(x, pos) unname(split(x, cumsum(seq_along(x) %in% (pos +1)) ))
#' Index a Vector by cumulative Sum
#'
#' @param a a vector of values
#' @param athres the ideal size of each index. Cummulative sums will get as close to this value without exceeding it
#' @return a vector of length(a)
#' @export
assign_id = function(a, athres
#, amin
){
cumsum <- 0
group <- 1
result <- numeric()
for (i in 1:length(a)) {
cumsum <- cumsum + a[i]
if (cumsum > athres) {
group <- group + 1
cumsum <- a[i]
}
result = c(result, group)
}
# if(a[1] < amin & length(a) > 1){
# result[1] = result[2]
# }
#
# if(a[length(a)] < amin & length(a) > 1){
# result[length(result)] = result[length(result) - 1]
# }
return (result)
}
#' Cumulative sum area grouping
#' @description This function takes a vector of areas and lengths and returns a
#' index vector that combines them towards an ideal aggregate area (athres). While enforcing a minimum area (amin) and length (lmin).
#' Additionally, this function can take a set of indexes to exclude over which the network cannot be aggregated.
#' @param areas a vector of areas
#' @param lengths a vector of lengths
#' @param exclude a vector of equal length to areas and lengths. Any non NA value will be used to enforce an aggregation break
#' @param ideal_size_sqkm a vector of areas
#' @param amin a threshold, or target, cumulative size
#' @param lmin a threshold, or target, cumulative size
#' @return a vector of length(areas) containing grouping indexes
#' @export
cs_group <- function(areas, lengths, exclude, ideal_size_sqkm, amin, lmin) {
areas[is.na(areas)] = 0
lengths[is.na(lengths)] = 0
if(length(areas) == 1){ return(1) }
break_index = which(!is.na(exclude))
if(length(break_index) != 0){
sub_areas = splitAt(areas, break_index)
sub_lengths = splitAt(lengths, break_index)
#splitAt(index_table$id, break_index)
} else {
sub_areas = list(areas)
sub_lengths = list(lengths)
}
if(all(lengths(sub_areas) != lengths(sub_areas))){
stop("Yuck~")
}
o1 = lapply(sub_areas, assign_id, athres = ideal_size_sqkm)
#lengths(o1) == lengths(sub_areas)
o2 = lapply(1:length(sub_areas), function(i) { pinch_sides( x = sub_areas[[i]], ind = o1[[i]], thres = amin) })
#lengths(o2) == lengths(sub_areas)
o3 = lapply(1:length(sub_lengths), function(i) { pinch_sides( x = sub_lengths[[i]], ind = o2[[i]], thres = lmin) })
#lengths(o3) == lengths(sub_areas)
o4 = lapply(1:length(sub_areas), function(i) { middle_massage(x = sub_areas[[i]], ind = o3[[i]], thres = amin) })
#lengths(o4) == lengths(sub_areas)
o5 = lapply(1:length(sub_lengths), function(i) { middle_massage(x = sub_lengths[[i]], ind = o4[[i]], thres = lmin) })
#lengths(o5) == lengths(sub_areas)
for(i in 1:length(o5)){ o5[[i]] = o5[[i]] + 1e9*i }
unlist(o5)
}
#' Re-index the edges of vector by threshold
#' Merge the outside edges of a vector if they are less then the provides threshold.
#' @param x vector of values
#' @param ind current index values
#' @param thres threshold to evaluate x
#' @return a vector of length(x) containing grouping indexes
#' @export
pinch_sides = function(x, ind, thres){
# i = 2
# x = sub_areas[[i]]; ind = o[[i]]
tmp_areas = unlist(lapply(split(x, ind), sum))
if(length(tmp_areas) == 1){ return(ind) }
#
n = as.numeric(names(tmp_areas))
if(tmp_areas[1] < thres){
names(tmp_areas)[1] = names(tmp_areas[2])
}
if(tmp_areas[length(tmp_areas)] < thres){
names(tmp_areas)[length(tmp_areas)] = names(tmp_areas[length(tmp_areas) - 1])
}
n2 = as.numeric(names(tmp_areas))
n2[match(ind, n)]
}
#' Re-index the interior of vector by threshold
#' Merge the interior values of a vector if they are less then the provided threshold.
#' Merging will look "up" and "down" the vector and merge into the smaller of the two.
#' @param x vector of values
#' @param ind current index values
#' @param thres threshold to evaluate x
#' @return a vector of length(x) containing grouping indexes
#' @export
middle_massage = function(x, ind, thres){
# i = 5
# x = sub_areas[[i]]; ind = o2[[i]]
tmp_areas = unlist(lapply(split(x, ind), sum))
if(length(tmp_areas) == 1){ return(ind) }
n = as.numeric(names(tmp_areas))
if(any(tmp_areas < thres)){
tmp = which(tmp_areas < thres)
for(j in 1:length(tmp)){
base = as.numeric(tmp[j])
edges = c(base - 1, base + 1)
becomes = names(which.min(tmp_areas[edges]))
names(tmp_areas)[base] = becomes
}
}
n2 = as.numeric(names(tmp_areas))
n2[match(ind, n)]
}
#' Check Network Validity
#' **INTERNAL** function that validates a flowpath and catchment network
#' @param term_cut cutoff integer to define terminal IDs
#' @return a list containing flowline and catchment `sf` objects
#' @export
#' @importFrom dplyr mutate select left_join
#' @importFrom sf st_drop_geometry
check_network_validity <- function(network_list,
term_cut = 1e9,
check = TRUE){
flowpaths = network_list$flowpaths
cat = network_list$catchments
names(flowpaths) = tolower(names(flowpaths))
names(cat) = tolower(names(cat))
flowpaths$toid = ifelse(is.na(flowpaths$toid), 0, flowpaths$toid)
DAG = network_is_dag(flowpaths)
CONNECTION = sum(!(flowpaths$toid %in% flowpaths$id | flowpaths$toid > term_cut | flowpaths$toid == 0)) == 0
if(!check){ return(list(flowpaths = fl, catchments = cat))}
if(all(DAG, CONNECTION)){
flowpaths = mutate(flowpaths, lengthkm = add_lengthkm(flowpaths))
if(!is.null(cat)){
cat = mutate(cat, areasqkm = add_areasqkm(cat)) %>%
select(.data$id, .data$areasqkm)
if('areasqkm' %in% names(flowpaths)){
flowpaths = flowpaths %>%
select(-.data$areasqkm) %>%
left_join(st_drop_geometry(cat), by = "id")
} else {
flowpaths = left_join(flowpaths, st_drop_geometry(cat), by = "id")
}
}
return(list(flowpaths = flowpaths, catchments = cat))
} else {
if(!DAG){ stop("Network is not a graph.")}
if(!CONNECTION){stop("All toIDs are not present in network")}
return(NULL)
}
}
#' Check if network is DAG
#' Checks in a `sf` flowline network is a DAG (Directed acyclic graph).
#' @param fl a LINESTRING `sf` flowlines object
#' @param ID the name of the ID column in `fl`
#' @param toID the name of the toID column in `fl`
#' @return boolean
#' @export
#' @importFrom igraph graph_from_data_frame is.dag
#' @importFrom sf st_drop_geometry
#' @importFrom dplyr select
network_is_dag = function(fl, ID = "id", toID = "toid"){
st_drop_geometry(select(fl, !!ID, !!toID)) %>%
graph_from_data_frame(directed = TRUE) %>%
is.dag()
}
add_grid_mapping = function(gpkg = NULL,
catchment_name = "aggregate_divides",
template = '/Users/mjohnson/Downloads/AORC-OWP_2012063021z.nc4',
grid_name = NULL,
add_to_gpkg = TRUE,
log = TRUE
){
if(!is.logical(log)){
log_appender(appender_file(log))
verbose = TRUE
} else {
log_appender(appender_console)
verbose = log
}
out = weight_grid(rast(template),
geom = sf::read_sf(gpkg, catchment_name),
ID = "id",
progress = verbose)
if(add_to_gpkg){
if(is.null(grid_name)){ stop("To write this file to a gpkg, a `grid_name` must be provided ...") }
write_sf(out, gpkg, grid_name)
} else{
return(out)
}
hyaggregate_log("INFO", glue('Build AORC weight grid from {template}'), verbose)
log_appender(appender_console)
}
#' Check if a geopackage and layer exists
#' This function checks if a layer exists in a geopackage
#' @param gpkg path to geopackage
#' @param name name of layer to check
#' @return logical
#' @export
#' @importFrom sf st_layers
layer_exists = function(gpkg, name){
if(!file.exists(gpkg)){ return(FALSE) }
if(name %in% st_layers(gpkg)$name){
return(TRUE)
} else {
return(FALSE)
}
}
#' Extract nexus locations for Reference POIs
#'
#' @param gpkg a reference hydrofabric gpkg
#' @param type the type of desired POIs
#' @param verbose should messages be emitted?
#' @return data.frame with ID, type columns
#' @export
#' @importFrom sf read_sf st_drop_geometry
#' @importFrom dplyr select mutate_at vars filter mutate group_by summarize
#' @importFrom logger log_info
#' @importFrom tidyr pivot_longer
nexus_from_poi = function(gpkg,
type = c('HUC12', 'Gages', 'TE', 'NID', 'WBIn', 'WBOut'),
verbose = TRUE){
# Will either be inferred - or - reference... use "unique" ID (comid facepalm).
valid_types = c('HUC12', 'Gages', 'TE', 'NID', 'WBIn', 'WBOut', "Conf", "Term", "Elev", "Travel", "Con")
if(!all(type %in% valid_types)){
bad_ids = type[!which(type %in% valid_types)]
stop(bad_ids, " are not valid POI types. Only ", paste(valid_types, collapse = ", "), " are valid")
}
nexus_locations = read_sf(gpkg, "mapped_POIs") |>
st_drop_geometry() |>
select(ID, paste0("Type_", type)) %>%
mutate_at(vars(matches("Type_")), as.character) |>
pivot_longer(-ID) |>
filter(!is.na(value)) |>
mutate(name = gsub("Type_", "", name)) |>
group_by(ID) %>%
summarize(type = paste(name, collapse = ", "),
value = paste(value, collapse = ", "))
hyaggregate_log("INFO", "{nrow(nexus_locations)} distinct POIs found.", verbose)
nexus_locations
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.