R/adapter_RW_RWB.R

Defines functions timeseries.hydro_service_RW_RWB .rw_fetch_field_visits .rw_pick_fieldvisit_table .rw_station_path stations.hydro_service_RW_RWB .rw_param_map timeseries_parameters.hydro_service_RW_RWB register_RW_RWB

# ==== Rwanda (RWB Water Portal) adapter ======================================
# Base: https://waterportal.rwb.rw
#
# Data access:
#   - Station list (HTML table):
#       https://waterportal.rwb.rw/data/surface_water
#     Columns (as of 2025-11): "Location name", "Identifier", "Location type",
#     "Longitude", "Latitude", "SRID".
#
#   - Per-station page with short, irregular field-visit data:
#       https://waterportal.rwb.rw/location_ng_info/{Identifier}
#     The field-visit table has columns like:
#       Date | Parameter | Unit | Value
#     With Parameter values "Discharge" (m^3/s) and "Stage" (m), etc. :contentReference[oaicite:1]{index=1}
#
# Notes:
#   - This adapter ONLY scrapes the field-visit table.
#     It ignores the "Stage - Historical" / "Stage - Observers" download links.
#   - Time stamps are sub-daily instants; we keep them as instants
#     (no daily aggregation).
#   - HTML layout is brittle; if the portal changes, this may break.

# -----------------------------------------------------------------------------
# Registration
# -----------------------------------------------------------------------------

#' @keywords internal
#' @noRd
register_RW_RWB <- function() {
  register_service_usage(
    provider_id   = "RW_RWB",
    provider_name = "Rwanda Water Resources Board (Water Portal)",
    country       = "Rwanda",
    base_url      = "https://waterportal.rwb.rw",
    rate_cfg      = list(n = 1, period = 1),  # be polite: 1 request / second
    auth          = list(type = "none")
  )
}

#' @export
timeseries_parameters.hydro_service_RW_RWB <- function(x, ...) {
  c(
    "water_discharge",
    "water_level",
    "water_temperature",
    "electrical_conductivity",
    "salinity",
    "total_dissolved_solids",
    "ph",
    "dissolved_oxygen_saturation",
    "total_suspended_solids",
    "turbidity"
  )
}
# -----------------------------------------------------------------------------
# Parameter mapping
# -----------------------------------------------------------------------------

.rw_param_map <- function(parameter) {
  # Map hydrodownloadR "parameter" -> field-visit "Parameter" + unit(s).
  #
  # pm$field_params is a character vector of acceptable Parameter strings
  # pm$unit can be a single string or a character vector of allowed units
  # (matching is done case-insensitively).

  switch(
    parameter,

    # existing ones -----------------------------------------------------------
    water_discharge = list(
      field_params = c("Discharge", "Flow"),
      unit         = "m^3/s"
    ),

    water_level = list(
      field_params = c("Stage", "Water level", "Water Level"),
      unit         = "m"
    ),

    water_temperature = list(
      field_params = c("Water Temp", "Water temperature", "Temperature"),
      unit         = "\u00B0C"
    ),

    # new ones ---------------------------------------------------------------
    electrical_conductivity = list(
      # 2  Cond                uS/cm
      field_params = c("Cond", "Conductivity", "Cond.", "Electric Cond"),
      unit         = c("\u00B5S/cm", "us/cm")
    ),

    salinity = list(
      # 3  Salinity            mg/l
      field_params = c("Salinity"),
      unit         = c("mg/l")
    ),

    total_dissolved_solids = list(
      # 4  TDS                 mg/l
      field_params = c("TDS", "Total Dissolved Solids"),
      unit         = c("mg/l")
    ),

    ph = list(
      # 5  pH                  pH Units
      field_params = c("pH", "pH value"),
      unit         = c("pH Units", "pH")
    ),

    dissolved_oxygen_saturation = list(
      # 6  Dis Oxygen Sat      %
      field_params = c("Dis Oxygen Sat", "Dissolved Oxygen Sat", "DO Sat"),
      unit         = c("%")
    ),

    total_suspended_solids = list(
      # 7  TSS                 mg/l
      field_params = c("TSS", "Total Suspended Solids"),
      unit         = c("mg/l")
    ),

    turbidity = list(
      # 8  Turbidity, Nephelom _NTU
      field_params = c("Turbidity, Nephelom", "Turbidity"),
      unit         = c("_NTU", "NTU")
    ),

    # fallback ---------------------------------------------------------------
    rlang::abort(
      paste(
        "RW_RWB supports",
        "'water_discharge', 'water_level', 'water_temperature',",
        "'electrical_conductivity', 'salinity', 'total_dissolved_solids',",
        "'ph', 'dissolved_oxygen_saturation',",
        "'total_suspended_solids', 'turbidity'."
      )
    )
  )
}


# -----------------------------------------------------------------------------
# Stations (S3 method)
# -----------------------------------------------------------------------------

#' @export
stations.hydro_service_RW_RWB <- function(x, ...) {
  req <- build_request(x, path = "/data/surface_water")
  resp <- perform_request(req)

  html <- xml2::read_html(httr2::resp_body_string(resp))

  tables <- rvest::html_elements(html, "table")
  if (!length(tables)) {
    rlang::abort("RW_RWB: no table found on /data/surface_water (layout change?).")
  }

  raw_tbl <- rvest::html_table(tables[[1]], fill = TRUE)
  names(raw_tbl) <- trimws(names(raw_tbl))

  # Safe extraction with fallbacks
  loc_name   <- col_or_null(raw_tbl, "Location name") %||%
    col_or_null(raw_tbl, "Location") %||%
    col_or_null(raw_tbl, "Name")

  identifier <- col_or_null(raw_tbl, "Identifier") %||%
    col_or_null(raw_tbl, "ID") %||%
    col_or_null(raw_tbl, "Code")


  lon    <- col_or_null(raw_tbl, "Longitude")

  lat    <- col_or_null(raw_tbl, "Latitude")


  loc_name_clean <- normalize_utf8(loc_name)

  lon_num <- suppressWarnings(as.numeric(lon))
  lat_num <- suppressWarnings(as.numeric(lat))

  out <- tibble::tibble(
    country            = x$country,
    provider_id        = x$provider_id,
    provider_name      = x$provider_name,
    station_id         = as.character(identifier),
    station_name       = loc_name_clean,
    river              = NA_character_,
    lat                = lat_num,
    lon                = lon_num,
    area               = NA_real_,
    altitude           = NA_real_
  )

  out <- out[!is.na(out$lat),]
}

# -----------------------------------------------------------------------------
# Internal helpers for time series
# -----------------------------------------------------------------------------

.rw_station_path <- function(station_id) {
  # Normalize and URL-encode station id for use as a path segment.
  id_clean <- trimws(as.character(station_id))
  id_clean <- gsub("\\s+", " ", id_clean)
  id_enc   <- curl::curl_escape(id_clean)
  paste0("location_ng_info/", id_enc)
}

.rw_pick_fieldvisit_table <- function(html) {
  # Find the table that looks like the field-visit table
  # (has "Date" + "Parameter"/"Unit"/"Value" headers).
  nodes <- rvest::html_elements(html, "table")
  if (!length(nodes)) return(NULL)

  tbls <- lapply(nodes, function(nd) {
    out <- try(rvest::html_table(nd, fill = TRUE), silent = TRUE)
    if (inherits(out, "try-error")) return(NULL)
    if (!NROW(out)) return(NULL)
    out
  })
  tbls <- Filter(Negate(is.null), tbls)
  if (!length(tbls)) return(NULL)

  idx <- which(vapply(tbls, function(df) {
    nm <- trimws(names(df))
    "Date" %in% nm &&
      ("Parameter" %in% nm || "Unit" %in% nm || "Value" %in% nm)
  }, logical(1)))

  if (!length(idx)) {
    # fallback: any table containing a "Date" column
    idx <- which(vapply(tbls, function(df) {
      "Date" %in% trimws(names(df))
    }, logical(1)))
  }
  if (!length(idx)) return(NULL)

  tbl <- tbls[[idx[1]]]
  names(tbl) <- trimws(names(tbl))
  tbl
}

.rw_fetch_field_visits <- function(x,
                                   station_id,
                                   parameter,
                                   pm,
                                   rng,
                                   mode) {
  path <- .rw_station_path(station_id)

  req <- httr2::request(x$base_url) |>
    httr2::req_user_agent(
      "hydrodownloadR (+https://github.com/your-org/hydrodownloadR)"
    ) |>
    httr2::req_url_path_append(path)

  resp <- try(perform_request(req), silent = TRUE)
  if (inherits(resp, "try-error")) {
    rlang::warn(paste0("RW_RWB: request failed for station ", station_id))
    return(tibble::tibble())
  }

  status <- httr2::resp_status(resp)
  if (status >= 400) {
    if (status %in% c(404L, 410L)) {
      return(tibble::tibble())
    }
    rlang::warn(paste0(
      "RW_RWB: HTTP ", status, " for station ", station_id,
      " - skipping."
    ))
    return(tibble::tibble())
  }

  html <- try(
    xml2::read_html(httr2::resp_body_string(resp)),
    silent = TRUE
  )
  if (inherits(html, "try-error")) {
    rlang::warn(paste0("RW_RWB: HTML parse failed for station ", station_id))
    return(tibble::tibble())
  }

  tbl <- .rw_pick_fieldvisit_table(html)
  if (is.null(tbl)) {
    # No field-visit table; nothing to return.
    return(tibble::tibble())
  }

  date_raw   <- col_or_null(tbl, "Date")
  param_raw  <- col_or_null(tbl, "Parameter")
  unit_raw   <- col_or_null(tbl, "Unit")
  value_raw  <- col_or_null(tbl, "Value")

  if (is.null(date_raw) || is.null(value_raw)) {
    return(tibble::tibble())
  }

  n <- length(date_raw)
  keep <- rep(TRUE, n)

  # Filter by parameter strings (if we have that column)
  if (!is.null(param_raw)) {
    prm_norm <- tolower(trimws(as.character(param_raw)))
    prm_ok   <- prm_norm %in% tolower(pm$field_params)
    keep <- keep & prm_ok
  }

  # Filter by unit (if present)
  if (!is.null(pm$unit) && !is.null(unit_raw)) {
    unit_norm <- tolower(trimws(as.character(unit_raw)))
    wanted    <- tolower(trimws(as.character(pm$unit)))
    u_ok      <- unit_norm %in% wanted
    keep <- keep & u_ok
  }


  if (!any(keep)) {
    return(tibble::tibble())
  }

  date_use  <- date_raw[keep]
  value_use <- value_raw[keep]

  ts_parsed <- suppressWarnings(lubridate::as_datetime(date_use))

  # Date-range filter (if requested)
  keep2 <- !is.na(ts_parsed)
  if (mode == "range") {
    tmin <- as.POSIXct(rng$start_date)
    tmax <- as.POSIXct(rng$end_date) + 86399
    keep2 <- keep2 & ts_parsed >= tmin & ts_parsed <= tmax
  }

  if (!any(keep2)) {
    return(tibble::tibble())
  }

  ts_final   <- ts_parsed[keep2]
  val_num    <- suppressWarnings(as.numeric(value_use[keep2]))

  tibble::tibble(
    country       = x$country,
    provider_id   = x$provider_id,
    provider_name = x$provider_name,
    station_id    = as.character(station_id),
    parameter     = parameter,
    timestamp     = ts_final,
    value         = val_num,
    unit          = pm$unit,
    quality_code  = NA_character_,
    source_url    = paste0(x$base_url, "/", path)
  )
}

# -----------------------------------------------------------------------------
# Time series (S3 method)
# -----------------------------------------------------------------------------

#' @export
timeseries.hydro_service_RW_RWB <- function(x,
                                            parameter = c(
                                              "water_discharge",
                                              "water_level",
                                              "water_temperature",
                                              "electrical_conductivity",
                                              "salinity",
                                              "total_dissolved_solids",
                                              "ph",
                                              "dissolved_oxygen_saturation",
                                              "total_suspended_solids",
                                              "turbidity"
                                            ),
                                            stations    = NULL,
                                            start_date  = NULL,
                                            end_date    = NULL,
                                            mode        = c("complete", "range"),
                                            exclude_quality = NULL,
                                            ...) {
  parameter <- match.arg(parameter)
  mode      <- match.arg(mode)
  rng       <- resolve_dates(mode, start_date, end_date)
  pm        <- .rw_param_map(parameter)

  # station_id vector
  station_vec <- if (is.null(stations)) {
    st <- stations.hydro_service_RW_RWB(x)
    st$station_id
  } else {
    stations
  }

  station_vec <- unique(as.character(station_vec))
  if (!length(station_vec)) return(tibble::tibble())

  batches <- chunk_vec(station_vec, 20L)

  pb <- progress::progress_bar$new(
    total  = length(batches),
    format = "RW_RWB [:bar] :current/:total (:percent) eta: :eta"
  )

  fetch_one <- function(st_id) {
    .rw_fetch_field_visits(
      x          = x,
      station_id = st_id,
      parameter  = parameter,
      pm         = pm,
      rng        = rng,
      mode       = mode
    )
  }

  limited <- ratelimitr::limit_rate(
    fetch_one,
    rate = ratelimitr::rate(n = x$rate_cfg$n, period = x$rate_cfg$period)
  )

  out <- lapply(batches, function(batch) {
    pb$tick()
    dplyr::bind_rows(lapply(batch, limited))
  })

  dplyr::bind_rows(out)
}

Try the hydrodownloadR package in your browser

Any scripts or data that you put into this service are public.

hydrodownloadR documentation built on Feb. 25, 2026, 5:08 p.m.