R/adapter_BE_WAL.R

Defines functions timeseries.hydro_service_BE_WAL .wal_empty_ts .wal_discover_ts_map .wal_perform_with_retry stations.hydro_service_BE_WAL .wal_parse_header_matrix0 .wal_param_map timeseries_parameters.hydro_service_BE_WAL register_BE_WAL

# ==== Belgium (Wallonia, SPW Hydrom\u00E9trie - KiWIS) adapter =====================
# Provider: BE_WAL
# Base URL: https://hydrometrie.wallonie.be/services/KiWIS/KiWIS
# Scope: water_discharge (daily), water_level (daily)
# Groups: Q = 7256919, H = 7255151
# Notes:
# - datasource=4 (tabular JSON: columns + data matrix)
# - vertical_datum: "NA" for water_level (no clear datum exposed)
# - polite rate limit: ~1 req / 5 s, with retry/backoff for 429/5xx
# - batching + progress for value fetches
# - helpers expected from core: build_request(), perform_request(), col_or_null(),
#   normalize_utf8(), resolve_dates(), `%||%`

# ---- registration ------------------------------------------------------------

#' @keywords internal
#' @noRd
register_BE_WAL <- function() {
  register_service_usage(
    provider_id   = "BE_WAL",
    provider_name = "SPW Hydrom\u00E9trie (Wallonia, KiWIS)",
    country       = "Belgium",
    base_url      = "https://hydrometrie.wallonie.be/services/KiWIS/KiWIS",
    rate_cfg      = list(n = 1, period = 5),  # polite: 1 req / 5 s
    auth          = list(type = "none")
  )
}

#' @export
timeseries_parameters.hydro_service_BE_WAL <- function(x, ...) {
  c("water_discharge","water_level")
}

# ---- parameter mapping -------------------------------------------------------

.wal_param_map <- function(parameter) {
  switch(
    parameter,
    water_discharge = list(par = "Q", unit = "m^3/s", group_id = "7256919"),
    water_level     = list(par = "H", unit = "m",     group_id = "7255151"),
    rlang::abort("BE_WAL supports only 'water_discharge' and 'water_level'.")
  )
}


.wal_parse_header_matrix0 <- function(js) {
  # js is a 2D array where the first row contains column names
  if (is.null(dim(js)) || nrow(js) < 2) return(tibble::tibble())
  hdr <- as.character(js[1, , drop = TRUE])
  dat <- js[-1, , drop = FALSE]
  df  <- as.data.frame(dat, stringsAsFactors = FALSE, check.names = FALSE, optional = TRUE)
  names(df) <- hdr
  tibble::as_tibble(df, .name_repair = "minimal")
}

#' @export
stations.hydro_service_BE_WAL <- function(x, ...) {
  # Ask KIWIS for stations via datasource=0; include ca_sta & ca_sta_returnfields
  q <- list(
    service              = "kisters",
    type                 = "queryServices",
    request              = "getStationList",
    datasource           = 0,
    format               = "json",
    returnfields           = "station_no,station_name,station_no,station_latitude,station_longitude,site_name,ca_sta",
    # leave empty to return all available station custom attributes
    ca_sta_returnfields    = "")

  req  <- build_request(x, path = "", query = q)
  resp <- perform_request(req)

  # datasource=0 returns a header-first 2D array/matrix
  js <- httr2::resp_body_json(resp, simplifyVector = TRUE)
  df <- .wal_parse_header_matrix0(js)

  # Extract fields (be tolerant to slight naming diffs)
  st_id_raw <- col_or_null(df, "station_no") %||% col_or_null(df, "station_id")
  st_name   <- normalize_utf8(col_or_null(df, "station_name"))
  riv       <- col_or_null(df, "river_name")
  lat_chr   <- col_or_null(df, "station_latitude")
  lon_chr   <- col_or_null(df, "station_longitude")
  area_km2  <- parse_area_km2(col_or_null(df, "CATCHMENT_SIZE"))
  alt_chr   <- col_or_null(df, "station_gauge_datum")
  vdatum_raw <- col_or_null(df, "station_gauge_datum_unit")

  # Empty strings -> NA, then numeric
  to_num <- function(z) suppressWarnings(as.numeric(ifelse(is.na(z) | z == "", NA, z)))
  lat <- to_num(lat_chr)
  lon <- to_num(lon_chr)
  alt <- to_num(alt_chr)

  out <- tibble::tibble(
    country       = x$country,
    provider_id   = x$provider_id,
    provider_name = x$provider_name,
    station_id    = as.character(st_id_raw),
    station_name  = st_name,
    station_name_ascii  = to_ascii(st_name),
    river         = riv,
    river_ascii   = to_ascii(riv),
    lat           = lat,  # EPSG:4326
    lon           = lon,  # EPSG:4326
    area          = area_km2,
    altitude      = alt,
    vertical_datum = vdatum_raw
  )

  # Throw out stations without coordinates
  dplyr::filter(out, !is.na(.data$lat), !is.na(.data$lon), !is.na(.data$river), nzchar(.data$river))
}


# ---- retry + discovery helpers ----------------------------------------------

.wal_perform_with_retry <- function(req) {
  httr2::req_retry(
    req,
    max_tries    = 6,
    backoff      = ~ min(120, 2^.x) + stats::runif(1, 0, 0.5),
    is_transient = function(resp) {
      s <- httr2::resp_status(resp)
      s == 429L || (s >= 500L && s < 600L)
    }
  ) |>
    httr2::req_perform()
}

.wal_discover_ts_map <- function(x, stations, parameter) {
  pm <- .wal_param_map(parameter)
  q <- list(service="kisters", type="queryServices", request="getTimeseriesList",
            timeseriesgroup_id=pm$group_id, datasource=0, format="json",
            returnfields="station_no,ts_id")
  req  <- build_request(x, path = "", query = q)
  resp <- perform_request(req)
  js   <- httr2::resp_body_json(resp, simplifyVector = TRUE)

  hdr <- as.character(js[1, , drop = TRUE]); dat <- js[-1, , drop = FALSE]
  if (NROW(dat) == 0) return(tibble::tibble(station_id=character(), ts_id=character()))
  df <- tibble::as_tibble(`colnames<-`(as.data.frame(dat, stringsAsFactors=FALSE, check.names=FALSE), hdr))

  out <- tibble::tibble(
    station_id = as.character(col_or_null(df, "station_no")),
    ts_id      = as.character(col_or_null(df, "ts_id"))
  )
  out <- out[!is.na(out$station_id) & !is.na(out$ts_id) & nzchar(out$ts_id), , drop=FALSE]

  if (!is.null(stations) && length(stations)) {
    out <- dplyr::filter(out, .data$station_id %in% unique(as.character(stations)))
  }
  dplyr::distinct(out, .data$station_id, .data$ts_id, .keep_all = TRUE)
}

.wal_empty_ts <- function(x, parameter, unit) {
  tibble::tibble(
    country        = x$country,
    provider_id    = x$provider_id,
    provider_name  = x$provider_name,
    station_id     = character(0),
    parameter      = character(0),
    timestamp      = as.POSIXct(character(0), tz = "UTC"),
    value          = numeric(0),
    unit           = character(0),
    vertical_datum = character(0),
    quality_code   = character(0),
    quality_name   = character(0),
    quality_description = character(0),
    source_url     = character(0)
  )
}

# ---- timeseries() ------------------------------------------------------------

#' @export
timeseries.hydro_service_BE_WAL <- function(x,
                                            parameter = c("water_discharge","water_level"),
                                            stations = NULL,
                                            start_date = NULL, end_date = NULL,
                                            mode = c("complete","range"),
                                            exclude_quality = NULL,
                                            ...) {
  parameter <- match.arg(parameter)
  mode      <- match.arg(mode)
  rng       <- resolve_dates(mode, start_date, end_date)
  pm        <- .wal_param_map(parameter)

  start_utc <- if (inherits(rng$start, "POSIXt")) rng$start else as.POSIXct(rng$start, tz = "UTC")
  end_utc   <- if (inherits(rng$end,   "POSIXt")) rng$end   else as.POSIXct(rng$end,   tz = "UTC")
  end_utc   <- end_utc + 24*3600 - 1  # include full end day (23:59:59)

  # 1) pull stations (for vd_map and default station universe)
  st_all <- stations.hydro_service_BE_WAL(x)

  # if user passed stations, restrict to them (and keep only those we know)
  if (is.null(stations) || !length(stations)) {
    st <- st_all
  } else {
    stations <- unique(as.character(stations))
    st <- dplyr::filter(st_all, .data$station_id %in% stations)
  }

  # safety: if nothing left, return empty
  if (!nrow(st)) return(.wal_empty_ts(x, parameter, pm$unit))

  # per-station vertical datum lookup (named vector)
  vd_map <- rlang::set_names(st$vertical_datum, st$station_id)

  # 2) discover ts_ids for chosen parameter
  map_df   <- .wal_discover_ts_map(x, unique(st$station_id), parameter)
  df_valid <- dplyr::filter(map_df, !is.na(.data$ts_id), nzchar(.data$ts_id))
  if (!nrow(df_valid)) return(.wal_empty_ts(x, parameter, pm$unit))

  st_by_ts <- df_valid$station_id; names(st_by_ts) <- df_valid$ts_id
  ts_ids   <- unname(df_valid$ts_id)

  # 3) worker to fetch a single ts_id
  fetch_vals <- function(tsid) {
    st_id <- st_by_ts[[tsid]] %||% NA_character_
    q <- list(
      service      = "kisters",
      type         = "queryServices",
      request      = "getTimeseriesValues",
      datasource   = 0,
      format       = "json",
      ts_id        = tsid,
      from         = format(start_utc, "%Y-%m-%dT%H:%M:%SZ", tz = "UTC"),
      to           = format(end_utc,   "%Y-%m-%dT%H:%M:%SZ", tz = "UTC"),
      returnfields = "Timestamp,Value,Quality Code,Quality Code Name,Quality Code Description"
    )

    req  <- build_request(x, path = "", query = q)
    resp <- .wal_perform_with_retry(req)
    resp <- .wal_perform_with_retry(req)
    js   <- httr2::resp_body_json(resp, simplifyVector = TRUE)

    # ---- Parse values robustly (datasource=0 or columns+data) ----------------
    parse_values <- function(js) {
      # Case A: KiWIS columns+data
      if (!is.null(js$columns) && !is.null(js$data)) {
        mat <- if (is.list(js$data) && length(js$data) == 1 && is.matrix(js$data[[1]])) js$data[[1]]
        else if (is.matrix(js$data)) js$data else as.matrix(js$data)
        if (is.null(dim(mat)) || nrow(mat) == 0) return(tibble::tibble())
        df <- as.data.frame(mat, stringsAsFactors = FALSE, check.names = FALSE)
        cols <- js$columns
        if (is.character(cols) && length(cols) == 1L) cols <- strsplit(cols, "\\s*,\\s*")[[1]]
        names(df) <- if (is.character(cols) && length(cols) == ncol(df)) cols else paste0("V", seq_len(ncol(df)))
        return(tibble::as_tibble(df, .name_repair = "minimal"))
      }

      # Case B: datasource=0 - header-first 2D array/matrix
      if (!is.null(dim(js)) && nrow(js) >= 2) {
        hdr <- as.character(js[1, , drop = TRUE])
        dat <- js[-1, , drop = FALSE]
        df  <- as.data.frame(dat, stringsAsFactors = FALSE, check.names = FALSE, optional = TRUE)
        names(df) <- hdr
        return(tibble::as_tibble(df, .name_repair = "minimal"))
      }

      tibble::tibble()
    }

    df <- parse_values(js)
    if (!nrow(df)) return(.wal_empty_ts(x, parameter, pm$unit))

    # Column name helpers (case/space tolerant)
    pick_col <- function(df, candidates) {
      # candidates: vector of preferred names (ordered)
      nms <- names(df)
      for (cand in candidates) {
        hit <- which(tolower(trimws(nms)) == tolower(trimws(cand)))
        if (length(hit)) return(nms[hit[1]])
      }
      NULL
    }

    # 1) Timestamp
    ts_col <- pick_col(df, c("Timestamp","Time","Date","Datetime","Date Time"))
    if (is.null(ts_col)) {
      # If no timestamp column, nothing useful to return
      return(.wal_empty_ts(x, parameter, pm$unit))
    }
    ts_raw <- df[[ts_col]]

    # 2) Value - prefer explicit "Value", else first non-timestamp numeric-ish column
    val_col <- pick_col(df, c("Value","value"))
    if (is.null(val_col)) {
      # find first column that is not the timestamp and looks numeric
      other_cols <- setdiff(names(df), ts_col)
      # try to coerce each candidate and pick the first with at least one numeric
      val_col <- NULL
      for (cname in other_cols) {
        vv <- suppressWarnings(as.numeric(df[[cname]]))
        if (any(!is.na(vv))) { val_col <- cname; break }
      }
    }

    # If still no value column or only 1 column total, bail safely
    if (is.null(val_col) || ncol(df) < 2) return(.wal_empty_ts(x, parameter, pm$unit))

    # 3) Quality fields (optional)
    qc_col <- pick_col(df, c("Quality Code","quality","Quality"))
    qn_col <- pick_col(df, c("Quality Code Name","quality_name"))
    qd_col <- pick_col(df, c("Quality Code Description","quality_description"))

    # normalize + filter window
    ts_chr   <- gsub("T"," ", as.character(ts_raw), fixed = TRUE)
    ts_chr   <- sub("\\..*$", "", ts_chr)
    ts_chr   <- sub(" [+-]\\d{2}:?\\d{2}$", "", ts_chr)
    ts_posix <- suppressWarnings(as.POSIXct(ts_chr, format = "%Y-%m-%d %H:%M:%S", tz = "UTC"))

    val_num <- suppressWarnings(as.numeric(df[[val_col]]))
    qc_chr  <- if (!is.null(qc_col)) as.character(df[[qc_col]]) else rep(NA_character_, length(ts_posix))
    qn_chr  <- if (!is.null(qn_col)) as.character(df[[qn_col]]) else rep(NA_character_, length(ts_posix))
    qd_chr  <- if (!is.null(qd_col)) as.character(df[[qd_col]]) else rep(NA_character_, length(ts_posix))

    keep <- !is.na(ts_posix) & ts_posix >= start_utc & ts_posix <= end_utc
    if (!is.null(exclude_quality)) keep <- keep & !(qc_chr %in% exclude_quality)

    if (!any(keep)) return(.wal_empty_ts(x, parameter, pm$unit))

    # NEW: station-specific vertical datum mapping
    vdatum <- dplyr::if_else(parameter == "water_level", vd_map[[st_id]], NA_character_)

    tibble::tibble(
      country        = x$country,
      provider_id    = x$provider_id,
      provider_name  = x$provider_name,
      station_id     = st_id,
      parameter      = parameter,
      timestamp      = ts_posix[keep],
      value          = val_num[keep],
      unit           = pm$unit,
      vertical_datum = vdatum,
      quality_code   = qc_chr[keep],
      quality_name   = qn_chr[keep],
      quality_description = qd_chr[keep],
      source_url     = paste0(
        x$base_url,
        "?service=kisters&type=queryServices&request=getTimeseriesValues",
        "&ts_id=", utils::URLencode(tsid),
        "&format=json&datasource=0",
        "&returnfields=", utils::URLencode("Timestamp,Value,Quality Code,Quality Code Name,Quality Code Description"),
        "&from=", utils::URLencode(q$from),
        "&to=",   utils::URLencode(q$to)
      )
    )
  }

  # batching with progress + polite rate limiting
  idx <- seq_along(ts_ids)
  pb  <- progress::progress_bar$new(format="values    [:bar] :current/:total (:percent) eta: :eta",
                                    total=length(idx))
  rl_vals  <- ratelimitr::limit_rate(
    function(i){ on.exit(Sys.sleep(5 + stats::runif(1,0,0.5)), add=TRUE); fetch_vals(ts_ids[[i]]) },
    rate = ratelimitr::rate(n=1, period=5)
  )

  res <- lapply(idx, function(i){ pb$tick(); rl_vals(i) }) |> dplyr::bind_rows()
  if (!nrow(res)) return(.wal_empty_ts(x, parameter, pm$unit))
  dplyr::arrange(res, .data$station_id, .data$timestamp)
}

Try the hydrodownloadR package in your browser

Any scripts or data that you put into this service are public.

hydrodownloadR documentation built on Feb. 25, 2026, 5:08 p.m.