R/adapter_BR_ANA.R

Defines functions timeseries.hydro_service_BR_ANA .br_ana_parse_qa_matrix .br_ana_pick_datetime .br_ana_parse_monthly_matrix stations.hydro_service_BR_ANA .stations_br_ana .br_ana_key_data_fim .br_ana_key_data_ini .br_ana_key_tipo .br_ana_key_estacao .br_inventory_meta .br_as_flag .br_norm_names .br_pick_first .br_num .br_read_mdb_table .br_mdb_connect .br_fix_chr_utf8 .br_ana_resolve_mdb .br_ana_seed_path .br_ana_cache_file .br_ana_cache_dir .br_ana_perform_with_refresh .br_ana_authed_request .br_ana_get_token .br_ana_creds .br_ana_token_stale .br_param_map timeseries_parameters.hydro_service_BR_ANA register_BR_ANA

Documented in stations.hydro_service_BR_ANA

# R/adapter_BR_ANA.R
# Brazil - ANA (HidroWeb) adapter
# Auth via SNIRH header-based endpoint; downloads via hidroweb REST.
# Token is supplied as Bearer to the download endpoints.
#
# Requires: {httr2}, {cli}, {utils}, {tibble}, {dplyr}, {tidyr}, {jsonlite},
#           {lubridate}, {odbc}, {DBI}, {readr}, {rappdirs}, {ratelimitr}, {progress}
# Uses package helpers: register_service_usage(), build_request(), resolve_dates(), to_ascii()

# =============================================================================
# Registration
# =============================================================================

#' @keywords internal
#' @noRd
register_BR_ANA <- function() {
  register_service_usage(
    provider_id   = "BR_ANA",
    provider_name = "Brazil - ANA HidroWeb",
    country       = "Brazil",
    base_url      = "https://www.ana.gov.br/hidrowebservice",
    rate_cfg      = list(n = 3, period = 1),
    auth          = list(
      type   = "bearer",
      env_id = "ANA_IDENTIFICADOR",
      env_pw = "ANA_SENHA"
    )
  )
}

#' @export
timeseries_parameters.hydro_service_BR_ANA <- function(x, ...) {
  c("water_discharge", "water_level", "water_quality")
}

# -----------------------------------------------------------------------------
# parameter -> endpoint map
# -----------------------------------------------------------------------------
.br_param_map <- function(parameter) {
  switch(parameter,
         "water_discharge" = list(
           path       = "EstacoesTelemetricas/HidroSerieVazao/v1",
           unit       = "m3/s",
           value_keys = c("Media","media","Valor","valor")
         ),
         "water_level" = list(
           path       = "EstacoesTelemetricas/HidroSerieCotas/v1",
           unit       = "cm",
           value_keys = c("Media","media","Valor","valor")
         ),
         "water_quality" = list(
           path       = "EstacoesTelemetricas/HidroSerieQA/v1",
           unit       = NA_character_,   # per-analyte units vary; resolved row-wise
           value_keys = character(0)     # not used by QA matrix parser
         ),
         stop("Unknown parameter for BR_ANA: ", parameter)
  )
}

# -----------------------------------------------------------------------------
# Token state & helpers (single implementation)
# -----------------------------------------------------------------------------
# persistent in-session state
if (!exists(".br_ana_state", inherits = FALSE) || !is.environment(.br_ana_state)) {
  .br_ana_state <- new.env(parent = emptyenv())
}
if (is.null(.br_ana_state$token))     .br_ana_state$token     <- NULL
if (is.null(.br_ana_state$issued_at)) .br_ana_state$issued_at <- NULL

.br_ana_token_stale <- function(max_age = 600) { # 10 minutes
  tok <- .br_ana_state$token
  t0  <- .br_ana_state$issued_at
  if (is.null(tok) || !nzchar(tok) || is.null(t0)) return(TRUE)
  (as.numeric(Sys.time()) - as.numeric(t0)) > max_age
}

.br_ana_creds <- function(...) {
  dots <- list(...)
  id <- dots$identificador %||% getOption("ANA_IDENTIFICADOR", NULL) %||% Sys.getenv("ANA_IDENTIFICADOR", "")
  pw <- dots$senha          %||% getOption("ANA_SENHA",         NULL) %||% Sys.getenv("ANA_SENHA",         "")
  list(id = trimws(id), pw = trimws(pw))
}

.br_ana_get_token <- function(x, force = FALSE, ...) {
  if (!force && !.br_ana_token_stale()) return(.br_ana_state$token)

  creds <- .br_ana_creds(...)
  if (!nzchar(creds$id) || !nzchar(creds$pw)) {
    cli::cli_abort(c(
      "x" = "ANA HidroWeb requires credentials.",
      "i" = "Set ANA_IDENTIFICADOR / ANA_SENHA or pass identificador= / senha=."
    ))
  }

  req <- build_request(
    x, path = "EstacoesTelemetricas/OAUth/v1",
    headers = list(Identificador = creds$id, Senha = creds$pw, accept = "*/*")
  ) |>
    httr2::req_error(is_error = function(resp) httr2::resp_status(resp) >= 400) |>
    httr2::req_timeout(30)

  resp <- httr2::req_perform(req)
  js   <- httr2::resp_body_json(resp, simplifyVector = TRUE)

  token <- js$items$tokenautenticacao %||% js$token %||% js$access_token
  if (!nzchar(token)) cli::cli_abort("Auth OK but no token field returned.")

  .br_ana_state$token     <- token
  .br_ana_state$issued_at <- as.numeric(Sys.time())
  token
}

# Build an authed request, with retries on transient errors
.br_ana_authed_request <- function(x, path, query) {
  invisible(.br_ana_get_token(x))                 # ensure we have a token
  tok <- .br_ana_state$token

  # diacritic keys built at runtime; ensure UTF-8 names
  names(query) <- enc2utf8(names(query))

  build_request(
    x, path = path, query = query,
    headers = list(Authorization = paste("Bearer", tok))
  ) |>
    httr2::req_timeout(60) |>
    httr2::req_retry(
      is_transient = function(resp) {
        # retry network / transient server issues (not 401)
        if (inherits(resp, "error")) return(TRUE)
        st <- httr2::resp_status(resp)
        st %in% c(408, 429, 500, 502, 503, 504)
      },
      max_tries = 5,
      backoff   = ~ 1.5 ^ (.x - 1)
    )
}

# Perform and, on 401, refresh token once and retry
.br_ana_perform_with_refresh <- function(x, path, query) {
  # First attempt
  req  <- .br_ana_authed_request(x, path, query)

  resp <- try(httr2::req_perform(req), silent = TRUE)

  # If we got a response object, handle 401 explicitly
  if (!inherits(resp, "try-error")) {
    if (httr2::resp_status(resp) != 401) return(resp)
    # 401 with response > refresh and retry once
    invisible(.br_ana_get_token(x, force = TRUE))
    req2  <- .br_ana_authed_request(x, path, query)
    return(httr2::req_perform(req2))
  }

  # If we got an error (e.g., req_error from upstream), try one forced refresh
  invisible(.br_ana_get_token(x, force = TRUE))
  req2  <- .br_ana_authed_request(x, path, query)
  try(httr2::req_perform(req2), silent = FALSE)
}

# -----------------------------------------------------------------------------
# Cache helpers (stations)
# -----------------------------------------------------------------------------
.br_ana_cache_dir <- function() {
  base <- tryCatch(rappdirs::user_cache_dir(), error = function(e) NULL)
  if (is.null(base)) base <- file.path(tempdir(), "hydrodownloadR_cache")
  dir <- file.path(base, "hydrodownloadR", "BR_ANA")
  dir.create(dir, recursive = TRUE, showWarnings = FALSE)
  dir
}
.br_ana_cache_file <- function(cache_dir = .br_ana_cache_dir()) file.path(cache_dir, "BR_ANA_stations.rds")
.br_ana_seed_path <- function() {
  pkg <- tryCatch(utils::packageName(), error = function(e) NULL) %||% "hydrodownloadR"
  system.file("extdata", "BR_ANA_stations_seed.rds", package = pkg, mustWork = FALSE)
}

# -----------------------------------------------------------------------------
# ZIP/MDB resolution & readers (for stations())
# -----------------------------------------------------------------------------
.br_ana_resolve_mdb <- function(zip_or_mdb, dest_dir = "data-raw/BR_ANA") {
  if (!file.exists(zip_or_mdb)) cli::cli_abort("Path does not exist: {zip_or_mdb}")
  ext <- tolower(tools::file_ext(zip_or_mdb))
  if (ext == "mdb") return(normalizePath(zip_or_mdb, winslash = "/", mustWork = TRUE))
  if (ext == "zip") {
    dir.create(dest_dir, recursive = TRUE, showWarnings = FALSE)
    utils::unzip(zip_or_mdb, exdir = dest_dir)
    mdbs <- list.files(dest_dir, pattern = "\\.mdb$", recursive = TRUE, full.names = TRUE)
    if (!length(mdbs)) cli::cli_abort("No .mdb found after unzipping {basename(zip_or_mdb)}.")
    return(normalizePath(mdbs[1], winslash = "/", mustWork = TRUE))
  }
  cli::cli_abort("Unsupported file type: {ext}. Provide a .zip or .mdb.")
}

.br_fix_chr_utf8 <- function(df, try_from = c("UTF-8","CP1252","latin1")) {
  is_chr <- vapply(df, is.character, logical(1))
  if (!any(is_chr)) return(df)
  fix1 <- function(x) {
    if (all(enc2utf8(x) == x, na.rm = TRUE)) return(enc2utf8(x))
    for (f in try_from) {
      y <- suppressWarnings(iconv(x, from = f, to = "UTF-8"))
      if (!any(is.na(y) & !is.na(x))) return(y)
    }
    suppressWarnings(iconv(x, from = "CP1252", to = "UTF-8", sub = "byte"))
  }
  df[is_chr] <- lapply(df[is_chr], fix1)
  df
}

.br_mdb_connect <- function(mdb_path) {
  drivers <- c(
    "{Microsoft Access Driver (*.mdb, *.accdb)}",
    "Microsoft Access Driver (*.mdb, *.accdb)",
    "MDBTools","MDBTools Driver","MDBTools ODBC",
    "Driver do Microsoft Access (*.mdb)","Access"
  )
  last_err <- NULL
  for (drv in drivers) {
    con <- try(DBI::dbConnect(
      odbc::odbc(),
      .connection_string = sprintf("Driver=%s;Dbq=%s;", drv, normalizePath(mdb_path, winslash = "/")),
      encoding = "windows-1252",
      timeout  = 10
    ), silent = TRUE)
    if (!inherits(con, "try-error")) return(con)
    last_err <- con
  }
  attr(last_err, "hydro_msg") <- paste(
    "No usable ODBC driver for Access found.",
    "Install 'Microsoft Access Database Engine 2016 (x64)' or ensure 'mdbtools' is available."
  )
  last_err
}

.br_read_mdb_table <- function(mdb_path, table) {
  con <- .br_mdb_connect(mdb_path)
  if (!inherits(con, "try-error")) {
    on.exit(try(DBI::dbDisconnect(con), silent = TRUE), add = TRUE)
    return(DBI::dbReadTable(con, table))
  }
  if (nzchar(Sys.which("mdb-export"))) {
    out <- file.path(tempdir(), paste0(table, ".csv"))
    status <- try(system2("mdb-export", c(shQuote(mdb_path), shQuote(table)), stdout = out, stderr = TRUE), silent = TRUE)
    if (!inherits(status, "try-error") && file.exists(out)) {
      return(readr::read_csv(out, show_col_types = FALSE, progress = FALSE))
    }
  }
  base_msg <- attr(con, "hydro_msg")
  cli::cli_abort(c(
    "x" = sprintf("Could not read table '%s' from MDB.", table),
    if (!is.null(base_msg)) c("!" = base_msg) else NULL,
    ">" = "Option A: Install Microsoft Access Database Engine (x64).",
    ">" = "Option B: Install 'mdbtools' for CSV fallback."
  ))
}

# -----------------------------------------------------------------------------
# Small utils
# -----------------------------------------------------------------------------
.br_num <- function(x) { if (is.numeric(x)) return(x); x <- gsub(",", ".", as.character(x), fixed = TRUE); suppressWarnings(as.numeric(x)) }
.br_pick_first <- function(df, candidates) { ix <- match(candidates, names(df)); candidates[which(!is.na(ix))][1] }
.br_norm_names <- function(df) {
  nm <- names(df)
  nm2 <- iconv(enc2utf8(nm), from = "UTF-8", to = "ASCII//TRANSLIT")
  nm2[is.na(nm2)] <- nm[is.na(nm2)]
  nm2 <- gsub("[^A-Za-z0-9 ]", " ", nm2, perl = TRUE)
  nm2 <- gsub("\\s+", " ", nm2, perl = TRUE)
  nm2 <- trimws(nm2)
  names(df) <- nm2
  df
}
.br_as_flag <- function(x) {
  if (is.logical(x)) return(replace(x, is.na(x), FALSE))
  if (is.numeric(x)) return(replace(x > 0, is.na(x), FALSE))
  if (is.character(x) && all(grepl("^[0-9]*$", x[!is.na(x)]))) {
    nx <- suppressWarnings(as.numeric(x))
    return(replace(nx > 0, is.na(nx), FALSE))
  }
  y <- tolower(paste0(x))
  replace(grepl("vaza|desc|liquid|qual|cota|nivel", y, perl = TRUE), is.na(x), FALSE)
}
.br_inventory_meta <- function(zip_or_mdb, mdb_path) {
  nm <- basename(zip_or_mdb %||% mdb_path)
  m  <- regexec("Inventario(\\d{2})_(\\d{2})_(\\d{4})", nm, ignore.case = TRUE)
  mt <- regmatches(nm, m)[[1]]
  inv_date <- if (length(mt) == 4) as.Date(sprintf("%s-%s-%s", mt[4], mt[3], mt[2])) else as.Date(file.info(mdb_path)$mtime)
  list(
    inventory_date = inv_date,
    mdb_path       = normalizePath(mdb_path, winslash = "/", mustWork = FALSE),
    mdb_size       = unname(file.info(mdb_path)$size)
  )
}

# -----------------------------------------------------------------------------
# ASCII-safe builders for ANA query keys (no literal diacritics in source)
# -----------------------------------------------------------------------------
.br_ana_key_estacao <- function() {
  paste0("C", intToUtf8(0x00F3), "digo da Esta",
         intToUtf8(0x00E7), intToUtf8(0x00E3), "o")
}
.br_ana_key_tipo      <- function() "Tipo Filtro Data"
.br_ana_key_data_ini  <- function() "Data Inicial (yyyy-MM-dd)"
.br_ana_key_data_fim  <- function() "Data Final (yyyy-MM-dd)"

# -----------------------------------------------------------------------------
# Stations() - internal worker
# -----------------------------------------------------------------------------
.stations_br_ana <- function(x,
                             zip_or_mdb = NULL,
                             dest_dir   = "data-raw/BR_ANA",
                             cache_dir  = .br_ana_cache_dir(),
                             update     = FALSE) {
  stopifnot(inherits(x, "hydro_service"))

  cache_file <- .br_ana_cache_file(cache_dir)

  # 1) cache-first
  if (!isTRUE(update) && file.exists(cache_file)) {
    cli::cli_inform(c(
      "i" = "Loaded ANA station catalogue from cache: {cache_file}",
      ">" = "To refresh, download the latest 'InventarioDD_MM_YYYY.zip' from:",
      " " = "   https://www.snirh.gov.br/hidroweb/download",
      " " = "then call stations(..., zip_or_mdb = 'path/to/Inventario*.zip', update = TRUE)."
    ))
    return(readRDS(cache_file))
  }

  # 2) seed-first (one-time initial catalogue you ship)
  seed <- .br_ana_seed_path()
  if (!isTRUE(update) && file.exists(seed) && !file.exists(cache_file)) {
    dir.create(cache_dir, recursive = TRUE, showWarnings = FALSE)
    file.copy(seed, cache_file, overwrite = TRUE)
    cli::cli_inform(c(
      "v" = "Installed initial ANA station catalogue from package seed.",
      " " = "Cache: {cache_file}",
      ">" = "To refresh with current data, download 'InventarioDD_MM_YYYY.zip' and call",
      " " = "   stations(x, zip_or_mdb = '/path/to/Inventario*.zip', update = TRUE)"
    ))
    return(readRDS(cache_file))
  }

  # 3) need a file to build
  if (is.null(zip_or_mdb) || !nzchar(zip_or_mdb)) {
    cli::cli_abort(c(
      "x" = "No cached/seeded catalogue available and no file provided.",
      ">" = "Download the latest 'InventarioDD_MM_YYYY.zip' from:",
      " " = "   https://www.snirh.gov.br/hidroweb/download",
      " " = "Then call: stations(x, zip_or_mdb = '/path/to/Inventario*.zip', update = TRUE)"
    ))
  }

  mdb_path <- .br_ana_resolve_mdb(zip_or_mdb, dest_dir = dest_dir)

  est <- .br_read_mdb_table(mdb_path, "Estacao")
  rio <- .br_read_mdb_table(mdb_path, "Rio")
  est <- .br_fix_chr_utf8(est); rio <- .br_fix_chr_utf8(rio)
  est <- .br_norm_names(est);   rio <- .br_norm_names(rio)

  c_Codigo    <- c("Codigo","Codigo Estacao","CodigoEstacao","Cod Estacao")
  c_Nome      <- c("Nome","Nome Estacao","NomeEstacao")
  c_RioCodigo <- c("RioCodigo","Codigo Rio","Cod Rio","CodigoRio")
  c_Lat       <- c("Latitude","Lat")
  c_Lon       <- c("Longitude","Long","Lon")
  c_Alt       <- c("Altitude","Cota","Alt")
  c_Area      <- c("AreaDrenagem","Area Drenagem","Area Dren","Area de Drenagem")
  c_TDesc     <- c("TipoEstacaoDescLiquida","Tipo Estacao Desc Liquida","Tipo Descarga Liquida")
  c_Level     <- c("TipoEstacaoEscala")
  c_TQual     <- c("TipoEstacaoQualAgua","Tipo Estacao Qual Agua","Tipo Qualidade Agua")

  colE <- list(
    Codigo        = .br_pick_first(est, c_Codigo),
    Nome          = .br_pick_first(est, c_Nome),
    RioCodigo     = .br_pick_first(est, c_RioCodigo),
    Latitude      = .br_pick_first(est, c_Lat),
    Longitude     = .br_pick_first(est, c_Lon),
    Altitude      = .br_pick_first(est, c_Alt),
    AreaDren      = .br_pick_first(est, c_Area),
    TipoDesc      = .br_pick_first(est, c_TDesc),
    TipoEscala    = .br_pick_first(est, c_Level),
    TipoQual      = .br_pick_first(est, c_TQual)
  )
  if (any(vapply(colE, is.na, logical(1)))) {
    miss <- names(colE)[is.na(unlist(colE))]
    cli::cli_abort(c("x" = "Some required Estacao columns not found.",
                     "!" = paste("Missing:", paste(miss, collapse = ", "))))
  }

  est2 <- tibble::tibble(
    Codigo       = est[[colE$Codigo]],
    Nome         = est[[colE$Nome]],
    `Rio Codigo` = est[[colE$RioCodigo]],
    Latitude     = .br_num(est[[colE$Latitude]]),
    Longitude    = .br_num(est[[colE$Longitude]]),
    Altitude     = .br_num(est[[colE$Altitude]]),
    AreaDrenagem = .br_num(est[[colE$AreaDren]]),
    TipoEstacaoDescLiquida = if (!is.na(colE$TipoDesc))  as.character(est[[colE$TipoDesc]])  else NA_character_,
    TipoEstacaoEscala      = if (!is.na(colE$TipoEscala))as.character(est[[colE$TipoEscala]])else NA_character_,
    TipoEstacaoQualAgua    = if (!is.na(colE$TipoQual))  as.character(est[[colE$TipoQual]])  else NA_character_
  )

  cR_Codigo <- c("Codigo","Codigo Rio","Cod Rio","CodigoRio")
  cR_Nome   <- c("Nome","Nome Rio","NomeRio")
  colR <- list(
    Codigo = .br_pick_first(rio, cR_Codigo),
    Nome   = .br_pick_first(rio, cR_Nome)
  )
  if (any(vapply(colR, is.na, logical(1)))) {
    miss <- names(colR)[is.na(unlist(colR))]
    cli::cli_abort(c("x" = "Some required Rio columns not found.",
                     "!" = paste("Missing:", paste(miss, collapse = ", "))))
  }

  rio2 <- tibble::tibble(
    `Rio Codigo` = rio[[colR$Codigo]],
    RioNome      = rio[[colR$Nome]]
  )

  st <- est2 |>
    dplyr::left_join(rio2, by = "Rio Codigo") |>
    dplyr::mutate(
      has_discharge = .br_as_flag(TipoEstacaoDescLiquida),
      has_level     = .br_as_flag(TipoEstacaoEscala),
      has_quality   = .br_as_flag(TipoEstacaoQualAgua),
      Latitude      = dplyr::if_else(Latitude  >= -35 & Latitude  <= 7,  Latitude,  NA_real_),
      Longitude     = dplyr::if_else(Longitude <= -28 & Longitude >= -75, Longitude, NA_real_),
      station_name_ascii = to_ascii(Nome),
      river_ascii        = to_ascii(RioNome),
      country            = x$country,
      provider_id        = x$provider_id,
      provider_name      = x$provider_name
    ) |>
    dplyr::relocate(country, provider_id, provider_name, .before = Codigo) |>
    dplyr::relocate(RioNome, .after = "Rio Codigo") |>
    dplyr::distinct(Codigo, .keep_all = TRUE)

  meta <- .br_inventory_meta(zip_or_mdb, mdb_path)
  attr(st, "inventory_date") <- meta$inventory_date
  attr(st, "source_mdb")     <- meta$mdb_path
  attr(st, "source_size")    <- meta$mdb_size

  st_final <- st |>
    dplyr::mutate(
      country       = x$country,
      provider_id   = x$provider_id,
      provider_name = x$provider_name,
      code          = as.character(Codigo),
      station_final = trimws(as.character(Nome)),
      river_final   = trimws(as.character(RioNome)),
      lat           = Latitude,
      lon           = Longitude,
      area_num      = AreaDrenagem,
      alt0          = suppressWarnings(as.numeric(Altitude))
    ) |>
    dplyr::transmute(
      country,
      provider_id,
      provider_name,
      station_id         = code,
      station_name       = station_final,
      station_name_ascii = to_ascii(station_final),
      river              = river_final,
      river_ascii        = to_ascii(river_final),
      lat,
      lon,
      area               = area_num,
      altitude           = alt0,
      has_discharge,
      has_level,
      has_quality
    ) |>
    dplyr::distinct(station_id, .keep_all = TRUE)

  attributes(st_final)[c("inventory_date","source_mdb","source_size")] <-
    attributes(st)[c("inventory_date","source_mdb","source_size")]

  dir.create(cache_dir, recursive = TRUE, showWarnings = FALSE)
  saveRDS(st_final, cache_file)
  cli::cli_inform(c(
    "v" = "Station catalogue updated to {cache_file}",
    if (!is.null(attr(st_final, "inventory_date")))
      sprintf("   Inventory date: %s  |  N stations: %s",
              format(attr(st_final, "inventory_date")), nrow(st_final))
  ))
  st_final
}

#' ANA stations (Brazil) - cache-first with optional update from inventory
#'
#' Loads the cached ANA station catalogue (if present) or rebuilds it from a
#' locally downloaded SNIRH inventory (`InventarioDD_MM_YYYY.zip` / `.mdb`)
#' when `update = TRUE`.
#'
#' @param x A `hydro_service` created with `hydro_service("BR_ANA")`.
#' @param ... Named arguments:
#' \itemize{
#'   \item \code{zip_or_mdb}: path to \code{InventarioDD_MM_YYYY.zip} or \code{.mdb}
#'   \item \code{dest_dir}: unzip destination (default: \code{"data-raw/BR_ANA"})
#'   \item \code{cache_dir}: cache dir for RDS (default: user cache)
#'   \item \code{update}: \code{TRUE} to rebuild from provided inventory
#' }
#' @return A tibble with ANA station metadata.
#' @export
stations.hydro_service_BR_ANA <- function(x, ...) {
  dots <- rlang::list2(...)
  zip_or_mdb <- dots$zip_or_mdb %||% NULL
  dest_dir   <- dots$dest_dir   %||% "data-raw/BR_ANA"
  cache_dir  <- dots$cache_dir  %||% .br_ana_cache_dir()
  update     <- dots$update     %||% FALSE

  .stations_br_ana(
    x,
    zip_or_mdb = zip_or_mdb,
    dest_dir   = dest_dir,
    cache_dir  = cache_dir,
    update     = isTRUE(update)
  )
}

# -----------------------------------------------------------------------------
# Monthly matrix > daily parser (Vazao_01.._31, Cota_01.._31 [+ _Status])
# -----------------------------------------------------------------------------
.br_ana_parse_monthly_matrix <- function(df, parameter = c("water_discharge","water_level"), tz = "UTC") {
  parameter <- match.arg(parameter)
  prefix <- if (parameter == "water_discharge") "Vazao" else "Cota"

  day_cols    <- grep(paste0("^", prefix, "_\\d{2}$"), names(df), value = TRUE)
  status_cols <- grep(paste0("^", prefix, "_\\d{2}_Status$"), names(df), value = TRUE)
  if (!length(day_cols)) return(tibble::tibble())

  base_col <- NULL
  for (cand in c("Data_Hora_Dado","DataHoraDado","DataLeitura","Data","data")) {
    if (cand %in% names(df)) { base_col <- cand; break }
  }
  if (is.null(base_col)) return(tibble::tibble())

  keep_cols <- c(base_col, day_cols, status_cols)
  d0 <- tibble::as_tibble(df[keep_cols])

  d_val <- tidyr::pivot_longer(
    d0, cols = tidyselect::all_of(day_cols),
    names_to = "day_key", values_to = "value_chr"
  )
  d_val$day <- as.integer(sub("^.*_(\\d{2})$", "\\1", d_val$day_key))

  if (length(status_cols)) {
    d_sta <- tidyr::pivot_longer(
      d0, cols = tidyselect::all_of(status_cols),
      names_to = "status_key", values_to = "quality_code"
    )
    d_sta$day <- as.integer(sub("^.*_(\\d{2})_Status$", "\\1", d_sta$status_key))
    d_val <- dplyr::left_join(
      d_val,
      d_sta[, c(base_col, "day", "quality_code")],
      by = c(base_col, "day")
    )
  } else {
    d_val$quality_code <- NA_character_
  }

  base_dates <- as.Date(substr(d_val[[base_col]], 1, 10))
  d_val$date <- base_dates + (d_val$day - 1L)
  d_val$timestamp <- as.POSIXct(paste0(format(d_val$date, "%Y-%m-%d"), " 00:00:00"), tz = tz)
  d_val$value <- suppressWarnings(as.numeric(gsub(",", ".", d_val$value_chr, fixed = TRUE)))

  dplyr::transmute(d_val, timestamp, value, quality_code) |>
    dplyr::filter(!is.na(timestamp), !is.na(value))
}

# -----------------------------------------------------------------------------
# QA parser (wide matrix > long sub-parameters)
# -----------------------------------------------------------------------------
.br_ana_pick_datetime <- function(df) {
  cands <- c(
    "Data_Hora_Dado","DataHoraDado",
    "dataLeitura","DataLeitura","data_leitura",
    "data","Data","dataHora","DataHora",
    "dataLeituraUTC","DataLeituraUTC","timestamp","datetime","time"
  )
  i <- match(cands, names(df))
  cands[which(!is.na(i))][1] %||% NA_character_
}

.br_ana_parse_qa_matrix <- function(df, tz = "UTC") {
  if (!NROW(df)) return(tibble::tibble())
  d <- tibble::as_tibble(df)
  d$.rowid <- seq_len(nrow(d))

  dt_col <- .br_ana_pick_datetime(d)
  if (is.na(dt_col)) return(tibble::tibble())
  d$timestamp <- suppressWarnings(lubridate::as_datetime(d[[dt_col]], tz = tz))

  vals <- tidyr::pivot_longer(
    d,
    cols      = tidyselect::matches("^\\d+_"),
    names_to  = "key",
    values_to = "value_chr"
  )

  vals$is_status <- grepl("_Status$", vals$key)
  status_df <- vals[vals$is_status, c(".rowid", "key", "value_chr")]
  if (NROW(status_df)) {
    status_df$analyte_id   <- sub("^([0-9]+)_Status$", "\\1", status_df$key)
    status_df$quality_code <- status_df$value_chr
    status_df <- status_df[, c(".rowid", "analyte_id", "quality_code")]
  }

  vals <- vals[!vals$is_status, , drop = FALSE]
  if (!NROW(vals)) return(tibble::tibble())

  vals$analyte_id <- sub("^([0-9]+)_.+$", "\\1", vals$key)

  if (exists("status_df") && NROW(status_df)) {
    vals <- dplyr::left_join(vals, status_df, by = c(".rowid", "analyte_id"))
  } else {
    vals$quality_code <- NA_character_
  }

  last_token <- sub("^.*_", "", vals$key)
  looks_like_unit <- function(tok) {
    grepl(paste0(
      "^(mgl|ugl|mg_pt_col|ufc_100ml|nmp_100ml|us_cm(_a_\\d+c)?|ntu|",
      "perc_saturacao|c|m3s|po4|02|al|ba|cd|cu|cl|n|b|cn|ml|m)$"
    ), tolower(tok))
  }
  unit_guess <- ifelse(looks_like_unit(last_token), last_token, NA_character_)

  sp_raw <- sub("^\\d+_", "", vals$key)
  sub_parameter <- ifelse(
    is.na(unit_guess),
    sp_raw,
    sub(paste0("_", unit_guess, "$"), "", sp_raw)
  )
  sub_parameter <- gsub("_", " ", sub_parameter, fixed = TRUE)

  vals$value <- suppressWarnings(as.numeric(gsub(",", ".", vals$value_chr, fixed = TRUE)))

  out <- tibble::tibble(
    timestamp     = d$timestamp[match(vals$.rowid, d$.rowid)],
    sub_parameter = sub_parameter,
    value         = vals$value,
    unit          = unit_guess,
    quality_code  = vals$quality_code
  ) |>
    dplyr::filter(!is.na(timestamp), !is.na(value))

  out
}

# =============================================================================
# S3: timeseries()
# =============================================================================

#' @export
timeseries.hydro_service_BR_ANA <- function(x,
                                            parameter = c("water_discharge","water_level","water_quality"),
                                            stations = NULL,
                                            start_date = NULL, end_date = NULL,
                                            mode = c("complete","range"),
                                            ...) {
  stopifnot(inherits(x, "hydro_service"))
  parameter <- match.arg(parameter)
  mode      <- match.arg(mode)

  rng  <- resolve_dates(mode, start_date, end_date)
  pmap <- .br_param_map(parameter)

  .check_endpoint <- function(parameter, path) {
    ok <- (parameter == "water_discharge" && grepl("HidroSerieVazao", path)) ||
      (parameter == "water_level"     && grepl("HidroSerieCotas", path))  ||
      (parameter == "water_quality"   && grepl("HidroSerieQA",    path))
    if (!ok) {
      cli::cli_abort(c(
        "x" = "Endpoint/parameter mismatch",
        "!" = "parameter = {parameter}",
        "!" = "mapped path = {path}",
        "i" = "Check `.br_param_map()` mappings."
      ))
    }
  }
  .check_endpoint(parameter, pmap$path)

  station_vec <- stations %||% {
    st <- stations.hydro_service_BR_ANA(x)
    st$station_id
  }
  station_vec <- unique(as.character(station_vec))
  if (!length(station_vec)) return(tibble::tibble())

  .year_slices <- function(a, b) {
    a <- as.Date(a); b <- as.Date(b)
    yrs <- seq.int(as.integer(format(a, "%Y")), as.integer(format(b, "%Y")))
    lapply(yrs, function(y) {
      c(max(as.Date(sprintf("%04d-01-01", y)), a),
        min(as.Date(sprintf("%04d-12-31", y)), b))
    })
  }

  .items <- function(resp) {
    js <- httr2::resp_body_json(resp, simplifyVector = TRUE)
    if (is.list(js) && !is.null(js$items)) js$items else js
  }

  out_list <- vector("list", length(station_vec))
  pb <- progress::progress_bar$new(total = length(station_vec))

  FILTER_TYPE <- "DATA_LEITURA"  # hard-coded
  TZ_LOCAL    <- "UTC"           # hard-coded

  rate_fun <- ratelimitr::limit_rate(
    function(st_id) {
      res_station <- list()

      for (sl in .year_slices(rng$start_date, rng$end_date)) {

        # Build ANA query with ASCII-safe key names
        q <- list(
          estacao   = suppressWarnings(as.integer(st_id)),
          tipo      = FILTER_TYPE,
          data_ini  = format(sl[1], "%Y-%m-%d"),
          data_fim  = format(sl[2], "%Y-%m-%d")
        )
        names(q) <- c(
          .br_ana_key_estacao(),
          .br_ana_key_tipo(),
          .br_ana_key_data_ini(),
          .br_ana_key_data_fim()
        )

        # Perform with auto-refresh on 401
        resp <- try(.br_ana_perform_with_refresh(x, pmap$path, q), silent = TRUE)
        if (inherits(resp, "try-error")) {
          # optional: message and continue
          cli::cli_inform(c("!" = sprintf("ANA slice failed for station %s (%s..%s) - skipping.",
                                          st_id, format(sl[1]), format(sl[2]))))
          next
        }
        if (httr2::resp_status(resp) == 404) next

        items <- .items(resp)
        if (is.null(items) || (is.list(items) && length(items) == 0L)) next

        df <- try(tibble::as_tibble(jsonlite::flatten(items)), silent = TRUE)
        if (inherits(df, "try-error") || !nrow(df)) next

        # ----- water quality branch -----
        if (parameter == "water_quality") {
          qa <- .br_ana_parse_qa_matrix(df, tz = TZ_LOCAL)
          if (nrow(qa)) {
            res_station[[length(res_station) + 1L]] <- tibble::tibble(
              country       = x$country,
              provider_id   = x$provider_id,
              provider_name = x$provider_name,
              station_id    = st_id,
              parameter     = "water_quality",
              sub_parameter = qa$sub_parameter,
              timestamp     = qa$timestamp,
              value         = qa$value,
              unit          = qa$unit,
              quality_code  = qa$quality_code,
              source_url    = paste0(x$base_url, "/", pmap$path)
            )
          }
          next
        }

        # ----- discharge / level branch -----
        daily <- .br_ana_parse_monthly_matrix(df, parameter = parameter, tz = TZ_LOCAL)
        if (nrow(daily)) {
          res_station[[length(res_station) + 1L]] <- tibble::tibble(
            country       = x$country,
            provider_id   = x$provider_id,
            provider_name = x$provider_name,
            station_id    = st_id,
            parameter     = parameter,
            timestamp     = daily$timestamp,
            value         = daily$value,
            unit          = pmap$unit,
            quality_code  = daily$quality_code,
            source_url    = paste0(x$base_url, "/", pmap$path)
          )
          next
        }

        # fallback: point-wise
        dt_col <- .br_ana_pick_datetime(df)
        ts_parsed <- if (!is.na(dt_col)) {
          suppressWarnings(lubridate::as_datetime(df[[dt_col]], tz = TZ_LOCAL))
        } else {
          as.POSIXct(NA_character_, tz = TZ_LOCAL)
        }

        val <- {
          outv <- rep(NA_real_, nrow(df))
          for (k in pmap$value_keys) if (k %in% names(df)) { outv <- suppressWarnings(as.numeric(df[[k]])); break }
          outv
        }
        qf  <- {
          cands <- intersect(c("qualidade","quality","qualityFlag","status","flag"), names(df))
          if (length(cands)) as.character(df[[cands[1]]]) else rep(NA_character_, nrow(df))
        }

        keep <- !is.na(ts_parsed) &
          as.Date(ts_parsed) >= as.Date(sl[1]) &
          as.Date(ts_parsed) <= as.Date(sl[2])
        if (!any(keep, na.rm = TRUE)) next

        res_station[[length(res_station) + 1L]] <- tibble::tibble(
          country       = x$country,
          provider_id   = x$provider_id,
          provider_name = x$provider_name,
          station_id    = st_id,
          parameter     = parameter,
          timestamp     = ts_parsed[keep],
          value         = suppressWarnings(as.numeric(val[keep])),
          unit          = pmap$unit,
          quality_code  = qf[keep],
          source_url    = paste0(x$base_url, "/", pmap$path)
        )
      }

      if (length(res_station)) dplyr::bind_rows(res_station) else tibble::tibble()
    },
    rate = ratelimitr::rate(n = x$rate_cfg$n, period = x$rate_cfg$period)
  )

  for (i in seq_along(station_vec)) {
    pb$tick()
    out_list[[i]] <- rate_fun(station_vec[[i]])
  }

  out <- dplyr::bind_rows(out_list)
  if (!nrow(out)) return(out)

  out |>
    dplyr::filter(
      !is.na(timestamp),
      as.Date(timestamp) >= as.Date(rng$start_date),
      as.Date(timestamp) <= as.Date(rng$end_date)
    ) |>
    dplyr::arrange(station_id, timestamp)
}

Try the hydrodownloadR package in your browser

Any scripts or data that you put into this service are public.

hydrodownloadR documentation built on Feb. 25, 2026, 5:08 p.m.