R/adapter_FR_HUBEAU.R

Defines functions .do_temp_chunk .do_hm_elab_chunk .do_hm_rt_chunk .fr_empty_ts timeseries.hydro_service_FR_HUBEAU stations.hydro_service_FR_HUBEAU .fr_try_req .fr_pick_col .fr_load_precomputed_meta .fr_site_cache_write .fr_site_cache_read .fr_site_cache_path .fr_cache_dir .fr_http_get_html .fr_html_extract_site_datum .fr_fetch_site_areas .fr_fetch_station_meta .fr_fetch_site_meta .fr_html_extract_station_gauge_zero .fr_html_extract_site_altitude .fr_html_extract_gauge_zero .fr_html_extract_area .fr_num_fr .fr_used_url .fr_parse_json .fr_param_map .fr_supported_params .fr_conv_lps_to_m3 .fr_conv_mm_to_cm timeseries_parameters.hydro_service_FR_HUBEAU register_FR_HUBEAU

# R/adapter_FR_HUBEAU.R
# France - Hub'Eau (Eaufrance) adapter
# Hydrometrie v2 + Temperature v1
# Docs (hydrometrie): https://hubeau.eaufrance.fr/page/api-hydrometrie
# Docs (temperature): https://hubeau.eaufrance.fr/page/api-temperature-continu
# Units/time: H=mm, Q=l/s (convert), UTC timestamps. [see docs]

# ---- registration ------------------------------------------------------------

#' @keywords internal
#' @noRd
register_FR_HUBEAU <- function() {
  register_service_usage(
    provider_id   = "FR_HUBEAU",
    provider_name = "Hub'Eau (Eaufrance) API",
    country       = "France",
    base_url      = "https://hubeau.eaufrance.fr",
    rate_cfg      = list(n = 3, period = 1),
    auth          = list(type = "none")
  )
}

# keep parameter list discoverable
#' @export
timeseries_parameters.hydro_service_FR_HUBEAU <- function(x, ...) {
  c("water_discharge", "water_level", "water_temperature")
}

# ---- parameter mapping -------------------------------------------------------

# Canonical unit converters
.fr_conv_mm_to_cm  <- function(x) x / 10      # H: mm -> cm
.fr_conv_lps_to_m3 <- function(x) x / 1000    # Q: l/s -> m^3/s

# For a tidy error message
.fr_supported_params <- function() c("water_discharge", "water_level", "water_temperature")

# Consistent switch-based mapper (no 'resolution' arg)
.fr_param_map <- function(parameter) {
  switch(parameter,
         water_discharge = list(
           api           = "hydrometrie",
           stations_path = "/api/v2/hydrometrie/referentiel/stations",
           ts_path       = "/api/v2/hydrometrie/obs_elab",
           fixed_qs      = list(grandeur_hydro_elab = "QmnJ"),  # daily mean discharge
           unit          = "m^3/s",
           convert       = .fr_conv_lps_to_m3
         ),

         water_level = list(
           api           = "hydrometrie",
           stations_path = "/api/v2/hydrometrie/referentiel/stations",
           ts_path       = "/api/v2/hydrometrie/observations_tr",  # realtime H
           fixed_qs      = list(grandeur_hydro = "H"),
           unit          = "cm",
           convert       = .fr_conv_mm_to_cm
         ),

         water_temperature = list(
           api           = "temperature",
           stations_path = "/api/v1/temperature/station",
           ts_path       = "/api/v1/temperature/chronique",
           fixed_qs      = list(),
           unit          = "\u00B0C",
           convert       = function(x) x
         ),

         # default (unsupported parameter)
         rlang::abort(paste0(
           "FR_HUBEAU supports ", paste(.fr_supported_params(), collapse = ", "), "."
         ))
  )
}


.fr_parse_json <- function(res) {
  txt <- tryCatch(
    if (is.raw(res$body)) rawToChar(res$body) else as.character(res$body),
    error = function(e) ""
  )
  if (!nzchar(txt)) return(NULL)
  Encoding(txt) <- "UTF-8"          # mark as UTF-8 (no recode)
  #   txt <- enc2utf8(txt)              # normalize to UTF-8 (safe no-op if already UTF-8)
  # simplifyVector=TRUE -> data frames where possible
  tryCatch(jsonlite::fromJSON(txt, simplifyVector = TRUE), error = function(e) NULL)
}

# Best-effort URL (perform_request may provide $url; else NA)
.fr_used_url <- function(res, req) {
  `%^%` <- function(a,b) if (!is.null(a) && !is.na(a) && nzchar(a)) a else b
  (res$url %^% req$url) %^% NA_character_
}

# --- number normalization (FR) -----------------------------------------------
.fr_num_fr <- function(x) {
  if (is.null(x) || !length(x)) return(NA_real_)
  x <- as.character(x)

  # Normalize whitespace (incl. NBSP/thin NBSP)
  x <- gsub("\u202F|\u00A0", " ", x)
  x <- trimws(x)

  # Keep only digits, sign, and separators; drop all letters/symbols (km, m, etc.)
  x2 <- gsub("[^0-9,.-]", "", x)
  if (!nzchar(x2)) return(NA_real_)

  # Decide decimal separator by the last separator pattern
  if (grepl(",\\d{1,3}$", x2)) {
    # decimal comma to drop thousands dots, then comma to dot
    x2 <- gsub("\\.", "", x2)
    x2 <- sub(",", ".", x2)
  } else if (grepl("\\.\\d{1,3}$", x2)) {
    # decimal dot to drop thousands commas
    x2 <- gsub(",", "", x2)
  } else {
    # looks like an integer; drop any stray separators
    x2 <- gsub("[^0-9-]", "", x2)
  }

  suppressWarnings(as.numeric(x2))
}


# --- HTML extractors (prefer xml2; regex fallback) ---------------------------
.fr_html_extract_area <- function(html_txt) {
  if (!nzchar(html_txt)) return(NA_real_)
  if (requireNamespace("xml2", quietly = TRUE)) {
    doc <- tryCatch(xml2::read_html(html_txt, encoding = "UTF-8"), error = function(e) NULL)
    if (!is.null(doc)) {
      node <- xml2::xml_find_first(
        doc,
        "//span[normalize-space(.)='Surface de bassin versant topographique']/following-sibling::span[1]"
      )
      if (!is.null(node)) {
        valtxt <- xml2::xml_text(node, trim = TRUE)
        valtxt_ascii <- gsub("\\u00B2", "2", valtxt, perl = TRUE)  #
        valtxt_num   <- sub("\\s*km\\s*2\\s*$", "", valtxt_ascii, perl = TRUE)
        num <- .fr_num_fr(valtxt_num)
        if (!is.na(num)) return(num)
      }
    }
  }

  rx <- "(?s)Surface de bassin versant topographique.*?<span>\\s*([0-9][0-9\\s.,\u00A0\u202F]*)\\s*km(?:\\s*\\u00B2|2)"
  m  <- regexec(rx, html_txt, perl = TRUE)
  g  <- regmatches(html_txt, m)
  if (length(g) && length(g[[1]]) >= 2) return(.fr_num_fr(g[[1]][2]))
  NA_real_
}

.fr_html_extract_gauge_zero <- function(html_txt) {
  if (!nzchar(html_txt)) return(NA_real_)
  if (requireNamespace("xml2", quietly = TRUE)) {
    doc <- tryCatch(xml2::read_html(html_txt, encoding = "UTF-8"), error = function(e) NULL)
    if (!is.null(doc)) {
      # xml2 resolves &#039; to an apostrophe ' in text()
      node <- xml2::xml_find_first(
        doc,
        "//span[normalize-space(.)=\"Cote du z\u00E9ro d'\u00E9chelle\"]/following-sibling::span[1]"
      )
      if (!is.null(node)) {
        valtxt <- xml2::xml_text(node, trim = TRUE)
        # value typically like "12,34 m" or "12.34 m"
        valtxt <- sub("\\s*m\\s*$", "", valtxt)
        num <- .fr_num_fr(valtxt)
        if (!is.na(num)) return(num)
      }
    }
  }
  # regex fallback - accept either literal ' or &#039;
  rx <- "(?s)Cote du z[\u00E9e]ro d(?:'|&#039;)\u00E9chelle.*?<span>\\s*([0-9][0-9\\s.,\u00A0\u202F]*)\\s*m"
  m  <- regexec(rx, html_txt, perl = TRUE)
  g  <- regmatches(html_txt, m)
  if (length(g) && length(g[[1]]) >= 2) return(.fr_num_fr(g[[1]][2]))
  NA_real_
}

# Prefer xml2 DOM; fallback to regex. All return numeric meters.

# Site altitude: look for exact label "Altitude" on the site fiche.
.fr_html_extract_site_altitude <- function(html_txt) {
  if (!nzchar(html_txt)) return(NA_real_)
  if (requireNamespace("xml2", quietly = TRUE)) {
    doc <- tryCatch(xml2::read_html(html_txt, encoding = "UTF-8"), error = function(e) NULL)
    if (!is.null(doc)) {
      # 1) exact "Altitude"
      node <- xml2::xml_find_first(doc, "//span[normalize-space(.)='Altitude']/following-sibling::span[1]")
      if (!is.null(node)) {
        val <- xml2::xml_text(node, trim = TRUE)
        val <- sub("\\s*m\\s*$", "", val)
        num <- .fr_num_fr(val)
        if (!is.na(num)) return(num)
      }
      # 2) fallbacks previously seen on some fiches
      labels <- c("Altitude du site", "Altitude du rep\u00E8re", "Altitude du rep\u00E8re g\u00E9od\u00E9sique")
      for (lb in labels) {
        node2 <- xml2::xml_find_first(doc, sprintf("//span[normalize-space(.)='%s']/following-sibling::span[1]", lb))
        if (!is.null(node2)) {
          val <- xml2::xml_text(node2, trim = TRUE)
          val <- sub("\\s*m\\s*$", "", val)
          num <- .fr_num_fr(val)
          if (!is.na(num)) return(num)
        }
      }
    }
  }
  # Regex fallback
  rx <- "(?s)Altitude[^<]*?</span>\\s*<span>\\s*([0-9][0-9\\s.,\u00A0\u202F]*)\\s*m"
  m  <- regexec(rx, html_txt, perl = TRUE)
  g  <- regmatches(html_txt, m)
  if (length(g) && length(g[[1]]) >= 2) return(.fr_num_fr(g[[1]][2]))
  NA_real_
}

# Station altitude (gauge zero): label "Cote du z\u00E9ro d'\u00E9chelle" on station fiche.# Gauge zero ("Cote du z\u00E9ro d'\u00E9chelle") from station fiche
.fr_html_extract_station_gauge_zero <- function(html_txt) {
  if (!nzchar(html_txt)) return(NA_real_)

  # 1) DOM path (preferred)
  if (requireNamespace("xml2", quietly = TRUE)) {
    doc <- tryCatch(xml2::read_html(html_txt, encoding = "UTF-8"), error = function(e) NULL)
    if (!is.null(doc)) {
      # Find the *label span* then take its immediate following value span
      # Normalize NBSP (\u00A0) and thin NBSP (\u202F) to regular spaces inside the XPath
      xp <- "//span[contains(@class,'identity__label') and normalize-space(translate(., '\u00A0\u202F', '  '))=\"Cote du z\u00E9ro d'\u00E9chelle\"]/following-sibling::span[1]"
      node <- xml2::xml_find_first(doc, xp)
      if (!inherits(node, "xml_missing") && !is.null(node)) {
        txt <- xml2::xml_text(node, trim = TRUE)
        # Missing value?
        if (!nzchar(txt) || grepl("Non renseign", txt, ignore.case = TRUE)) return(NA_real_)
        # Expect formats like "282,11 m", "282 m"
        num_txt <- sub("\\s*m\\s*$", "", txt)
        val <- .fr_num_fr(num_txt)
        return(if (!is.na(val)) val else NA_real_)
      }
    }
  }

  # 2) Regex fallback (only the immediate sibling span of the label)
  rx <- "(?is)Cote du z[\u00E9e]ro d(?:'|&#039;)\u00E9chelle\\s*</span>\\s*<span>\\s*([0-9][0-9\\s.,\u00A0\u202F]*)\\s*m\\s*</span>"
  m  <- regexec(rx, html_txt, perl = TRUE)
  g  <- regmatches(html_txt, m)
  if (length(g) && length(g[[1]]) >= 2) {
    val <- .fr_num_fr(g[[1]][2])
    return(if (!is.na(val)) val else NA_real_)
  }

  NA_real_
}


# --- Site meta fetcher (area, gauge zero, site altitude) ---------------------
# Speedups: curl multi (waves), bounded concurrency, cache w/ TTL
.fr_fetch_site_meta <- function(code_sites,
                                rate = list(n = 3, period = 1),
                                max_conns = NULL,
                                use_cache = TRUE,
                                cache_ttl_days = 90,
                                sequential_retry_on_403 = TRUE,
                                jitter_sec = 0.25,
                                warmup = TRUE) {
  code_sites <- unique(stats::na.omit(as.character(code_sites)))
  if (!length(code_sites)) {
    return(tibble::tibble(code_site = character(), area = double(), altitude_site = double()))
  }

  # ----- cache gate -----------------------------------------------------------
  cache <- if (use_cache) .fr_site_cache_read() else tibble::tibble(code_site=character(), area=double(), altitude_site=double(), fetched_at=as.POSIXct(character()))
  ttl_cutoff <- Sys.time() - cache_ttl_days*24*3600
  in_cache <- cache$code_site %in% code_sites
  fresh <- in_cache & cache$fetched_at >= ttl_cutoff & !is.na(cache$area) & !is.na(cache$altitude_site)
  cached_ok <- cache[in_cache & fresh, c("code_site","area","altitude_site")]
  need_ids  <- setdiff(code_sites, cached_ok$code_site)
  if (!length(need_ids)) return(dplyr::arrange(cached_ok, match(cached_ok$code_site, code_sites)))

  # ----- concurrency + headers + cookies -------------------------------------
  if (is.null(max_conns)) max_conns <- max(1L, min(rate$n, 6L))  # be extra polite
  chunks <- split(need_ids, ceiling(seq_along(need_ids) / max_conns))

  ua  <- sprintf("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) hydrodownloadR/%s Safari/537.36",
                 as.character(utils::packageVersion("curl")))
  jar <- file.path(.fr_cache_dir(), "hydro_eaufrance.cookies.txt")

  # warm-up: set cookies once per run (homepage)
  if (warmup) {
    invisible(.fr_http_get_html("https://www.hydro.eaufrance.fr/", ua, jar))
    Sys.sleep(0.3)
  }

  pb <- progress::progress_bar$new(
    total = length(need_ids),
    format = "FR_HUBEAU site-meta [:bar] :current/:total (:percent) :eta",
    clear = FALSE, width = 70
  )

  parse_one <- function(html_txt, cs) {
    tibble::tibble(
      code_site     = cs,
      area          = .fr_html_extract_area(html_txt),
      altitude_site = .fr_html_extract_site_altitude(html_txt),
      vertical_datum_site = .fr_html_extract_site_datum(html_txt),
      fetched_at    = Sys.time()
    )
  }

  fetched_rows <- list()
  failed_403   <- character()

  for (chunk in chunks) {
    pool <- curl::new_pool()
    res_chunk  <- vector("list", length(chunk))
    code_chunk <- as.character(chunk)
    status_vec <- integer(length(chunk))

    for (i in seq_along(code_chunk)) {
      cs   <- code_chunk[[i]]
      url  <- sprintf("https://www.hydro.eaufrance.fr/sitehydro/%s/fiche", cs)
      h    <- curl::new_handle()
      curl::handle_setopt(h,
                          cookiefile = jar, cookiejar = jar,
                          http_version = 2L, timeout = 20
      )
      curl::handle_setheaders(h,
                              "User-Agent" = ua,
                              "Accept" = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                              "Accept-Language" = "fr-FR,fr;q=0.9,en;q=0.5",
                              "Referer" = "https://www.hydro.eaufrance.fr/"
      )

      curl::curl_fetch_multi(
        url,
        done = (function(ix, cs_id) {
          force(ix); force(cs_id)
          function(res) {
            status_vec[[ix]] <<- res$status_code
            html <- tryCatch(rawToChar(res$content), error=function(e) "")
            Encoding(html) <- "UTF-8"
            res_chunk[[ix]] <<- tryCatch(parse_one(html, cs_id),
                                         error=function(e) tibble::tibble(code_site = cs_id, area=NA_real_, altitude_site=NA_real_, fetched_at=Sys.time()))
            pb$tick()
          }
        })(i, cs),
        fail = (function(ix, cs_id) {
          force(ix); force(cs_id)
          function(err) {
            status_vec[[ix]] <<- 0L
            res_chunk[[ix]] <<- tibble::tibble(code_site = cs_id, area=NA_real_, altitude_site=NA_real_, fetched_at=Sys.time())
            pb$tick()
          }
        })(i, cs),
        pool = pool,
        handle = h
      )
    }

    curl::multi_run(pool = pool)

    # record any 403s from this wave
    if (any(status_vec == 403L, na.rm = TRUE)) {
      failed_403 <- c(failed_403, code_chunk[which(status_vec == 403L)])
    }

    fetched_rows[[length(fetched_rows)+1L]] <- dplyr::bind_rows(res_chunk)

    # pacing: respect n/period and add a tiny jitter
    Sys.sleep(rate$period + stats::runif(1, 0, jitter_sec))
  }

  new_tbl <- dplyr::bind_rows(fetched_rows)

  # ----- sequential retry for 403s (very gentle) ------------------------------
  if (sequential_retry_on_403 && length(failed_403)) {
    for (cs in unique(failed_403)) {
      Sys.sleep(rate$period + stats::runif(1, 0, jitter_sec))
      url <- sprintf("https://www.hydro.eaufrance.fr/sitehydro/%s/fiche", cs)
      got <- .fr_http_get_html(url, ua, jar)
      if (!is.na(got$status) && got$status == 200L) {
        row <- tryCatch(parse_one(got$html, cs),
                        error=function(e) tibble::tibble(code_site = cs, area=NA_real_, altitude_site=NA_real_, fetched_at=Sys.time()))
        # replace the NA row for this cs
        idx <- which(new_tbl$code_site == cs)
        if (length(idx)) new_tbl[idx[1], ] <- row else new_tbl <- dplyr::bind_rows(new_tbl, row)
      }
    }
  }

  # ----- cache update and ordered return --------------------------------------
  if (use_cache) {
    merged <- dplyr::bind_rows(cache, new_tbl)
    merged <- merged[order(merged$code_site, merged$fetched_at, decreasing = TRUE), ]
    merged <- merged[!duplicated(merged$code_site), ]
    .fr_site_cache_write(merged)
    out <- merged
  } else {
    out <- new_tbl
  }

  out <- out[out$code_site %in% code_sites,
             c("code_site","area","altitude_site","vertical_datum_site")]
  dplyr::arrange(out, match(out$code_site, code_sites))
}


# Fetch gauge-zero altitude (m) from /stationhydro/{station_id}/fiche
.fr_fetch_station_meta <- function(station_ids, rate = list(n = 3, period = 1)) {
  station_ids <- unique(stats::na.omit(as.character(station_ids)))
  if (!length(station_ids)) {
    return(tibble::tibble(station_id = character(), altitude_station = double()))
  }

  limiter <- ratelimitr::limit_rate(function(url) httr::GET(url, httr::timeout(20)),
                                    rate = do.call(ratelimitr::rate, rate))
  pb <- progress::progress_bar$new(
    total = length(station_ids),
    format = "FR_HUBEAU station-meta [:bar] :current/:total (:percent) :eta",
    clear = FALSE, width = 70
  )

  rows <- lapply(station_ids, function(sid) {
    on.exit(pb$tick(), add = TRUE)
    url <- sprintf("https://www.hydro.eaufrance.fr/stationhydro/%s/fiche", sid)
    r <- tryCatch(limiter(url), error = function(e) NULL)
    if (is.null(r) || httr::http_error(r)) {
      return(tibble::tibble(station_id = sid, altitude_station = NA_real_))
    }
    html <- tryCatch(httr::content(r, as = "text", encoding = "UTF-8"), error = function(e) "")
    tibble::tibble(
      station_id       = sid,
      altitude_station = .fr_html_extract_station_gauge_zero(html) # m (Cote du z\u00E9ro d'\u00E9chelle)
    )
  })

  dplyr::bind_rows(rows)
}



# Fetch areas for a vector of code_site values (character), rate-limited and with progress
.fr_fetch_site_areas <- function(code_sites, rate = list(n = 3, period = 1)) {
  code_sites <- unique(stats::na.omit(code_sites))
  if (!length(code_sites)) return(tibble::tibble(code_site = character(), area = double()))

  # Simple rate limiter wrapper around httr::GET
  limiter <- ratelimitr::limit_rate(function(url) {
    httr::GET(url, httr::timeout(20))
  }, rate = do.call(ratelimitr::rate, rate))

  pb <- progress::progress_bar$new(
    total = length(code_sites),
    format = "FR_HUBEAU area [:bar] :current/:total (:percent) :eta",
    clear = FALSE, width = 70
  )

  res_list <- lapply(code_sites, function(cs) {
    on.exit(pb$tick(), add = TRUE)
    url <- sprintf("https://www.hydro.eaufrance.fr/sitehydro/%s/fiche", cs)
    r <- tryCatch(limiter(url), error = function(e) NULL)
    if (is.null(r) || httr::http_error(r)) {
      return(list(code_site = cs, area = NA_real_))
    }
    html_txt <- tryCatch(httr::content(r, as = "text", encoding = "UTF-8"), error = function(e) "")
    area_km2 <- .fr_html_extract_area(html_txt)
    list(code_site = cs, area = area_km2)
  })

  dplyr::bind_rows(lapply(res_list, tibble::as_tibble))
}

# Site-level vertical datum: label after "Syst\u00E8me altim\u00E9trique"
.fr_html_extract_site_datum <- function(html_txt) {
  if (!nzchar(html_txt)) return(NA_character_)
  if (requireNamespace("xml2", quietly = TRUE)) {
    doc <- tryCatch(xml2::read_html(html_txt, encoding = "UTF-8"), error = function(e) NULL)
    if (!is.null(doc)) {
      # Attempt 1: exact match after converting NBSP + NNBSP to regular space
      xp1 <- "//span[normalize-space(translate(., '\u00A0\u202F', '  '))='Syst\u00E8me altim\u00E9trique']/following-sibling::span[1]"
      node <- xml2::xml_find_first(doc, xp1)
      if (!inherits(node, "xml_missing") && !is.null(node)) {
        val <- xml2::xml_text(node, trim = TRUE)
        if (nzchar(val)) return(val)
      }

      # Attempt 2: looser 'contains' on the label, then take the 2nd span in the field
      xp2 <- "//div[contains(@class,'identity__field')][span[contains(translate(., '\u00A0\u202F', '  '), 'Syst\u00E8me') and contains(., 'altim')]]/span[position()=2]"
      node <- xml2::xml_find_first(doc, xp2)
      if (!inherits(node, "xml_missing") && !is.null(node)) {
        val <- xml2::xml_text(node, trim = TRUE)
        if (nzchar(val)) return(val)
      }

      # Attempt 3: R-side scan of label/value pairs
      labs <- xml2::xml_find_all(doc, "//div[contains(@class,'identity__field')]/span[1]")
      if (length(labs)) {
        lab_txt <- xml2::xml_text(labs, trim = TRUE)
        lab_norm <- gsub("[\u00A0\u202F]", " ", lab_txt)       # NBSP/NNBSP -> space
        lab_norm <- gsub("\\s+", " ", lab_norm)                # collapse
        hit <- which(lab_norm == "Syst\u00E8me altim\u00E9trique")
        if (length(hit)) {
          sib <- xml2::xml_find_first(labs[[hit[1]]], "following-sibling::span[1]")
          val <- xml2::xml_text(sib, trim = TRUE)
          if (nzchar(val)) return(val)
        }
      }
    }
  }

  # Regex fallback
  rx <- "(?is)Syst[\u00E8e]me\\s*altim[\u00E9e]trique\\s*</span>\\s*<span>\\s*([^<]+?)\\s*</span>"
  m  <- regexec(rx, html_txt, perl = TRUE)
  g  <- regmatches(html_txt, m)
  if (length(g) && length(g[[1]]) >= 2) return(trimws(g[[1]][2]))
  NA_character_
}



.fr_http_get_html <- function(url, ua, jar, timeout = 20) {
  h <- curl::new_handle()
  curl::handle_setopt(h,
                      cookiefile = jar, cookiejar = jar,
                      http_version = 2L,                  # HTTP/2 if available
                      timeout = timeout
  )
  curl::handle_setheaders(h,
                          "User-Agent" = ua,
                          "Accept" = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                          "Accept-Language" = "fr-FR,fr;q=0.9,en;q=0.5",
                          "Referer" = "https://www.hydro.eaufrance.fr/"
  )
  res <- tryCatch(curl::curl_fetch_memory(url, handle = h), error = function(e) NULL)
  if (is.null(res)) return(list(status = NA_integer_, html = ""))
  html <- tryCatch(rawToChar(res$content), error = function(e) "")
  Encoding(html) <- "UTF-8"
  list(status = res$status_code, html = html)
}


# ---- simple cache helpers ----------------------------------------------------
.fr_cache_dir <- function() {
  dir <- tools::R_user_dir("hydrodownloadR", "cache")
  if (!dir.exists(dir)) dir.create(dir, recursive = TRUE, showWarnings = FALSE)
  dir
}
.fr_site_cache_path <- function() file.path(.fr_cache_dir(), "fr_hubeau_site_meta.rds")

.fr_site_cache_read <- function() {
  p <- .fr_site_cache_path()
  if (file.exists(p)) {
    x <- tryCatch(readRDS(p), error = function(e) NULL)
    if (is.null(x)) tibble::tibble(code_site=character(), area=double(), altitude_site=double(), fetched_at=as.POSIXct(character()))
    else tibble::as_tibble(x)
  } else {
    tibble::tibble(code_site=character(), area=double(), altitude_site=double(), fetched_at=as.POSIXct(character()))
  }
}
.fr_site_cache_write <- function(tbl) {
  p <- .fr_site_cache_path()
  # keep one row per code_site (latest fetched_at wins)
  tbl <- tbl[order(tbl$code_site, tbl$fetched_at, decreasing = TRUE), ]
  tbl <- tbl[!duplicated(tbl$code_site), ]
  saveRDS(tbl, p)
  invisible(tbl)
}

.fr_load_precomputed_meta <- function() {
  # 1) try lazy-loaded object (when installed/loaded)
  meta <- get0("fr_hubeau_meta", envir = asNamespace("hydrodownloadR"), ifnotfound = NULL)
  if (!is.null(meta)) {
    return(list(
      tab  = tibble::as_tibble(meta),
      date = attr(meta, "metadata_date", exact = TRUE) %||%
        tryCatch(as.character(as.Date(max(meta$retrieved_at, na.rm = TRUE))), error = function(e) NA_character_)
    ))
  }

  # 2) try loading the .rda directly if it exists in this source tree
  fp <- file.path(system.file(package = "hydrodownloadR"), "data", "fr_hubeau_meta.rda")
  if (!nzchar(fp) || !file.exists(fp)) {
    # fallback for dev mode: when running from source, system.file() can be empty.
    # Look on disk relative to project root:
    fp <- "data/fr_hubeau_meta.rda"
  }
  if (file.exists(fp)) {
    env <- new.env(parent = emptyenv())
    try(load(fp, envir = env), silent = TRUE)
    meta <- get0("fr_hubeau_meta", envir = env, ifnotfound = NULL)
    if (!is.null(meta)) {
      return(list(
        tab  = tibble::as_tibble(meta),
        date = attr(meta, "metadata_date", exact = TRUE) %||%
          tryCatch(as.character(as.Date(max(meta$retrieved_at, na.rm = TRUE))), error = function(e) NA_character_)
      ))
    }
  }

  # 3) last resort: do nothing
  NULL
}

.fr_pick_col <- function(df, choices) {
  for (nm in choices) if (nm %in% names(df)) return(nm)
  NULL
}

.fr_try_req <- function(do_req, retries = 3, base_sleep = 0.5) {
  for (i in seq_len(retries)) {
    res <- try(do_req(), silent = TRUE)
    if (!inherits(res, "try-error")) return(res)
    Sys.sleep(base_sleep * (2^(i - 1)) + stats::runif(1, 0, 0.2))
  }
  do_req() # last attempt; let errors surface
}


# ---- stations() --------------------------------------------------------------



#' @export
stations.hydro_service_FR_HUBEAU <- function(
    x,
    update = FALSE,
    verbose = TRUE,
    include_hydrometry   = TRUE,
    include_temperature  = TRUE,
    ...
) {
  hm_path <- "/api/v2/hydrometrie/referentiel/stations"
  tp_path <- "/api/v1/temperature/station"

  get_paged <- function(path, page_size = 10000L) {
    out <- list(); page <- 1L
    repeat {
      req <- build_request(x, path = path, query = list(format = "json", size = page_size, page = page))
      res <- perform_request(req)
      payload <- .fr_parse_json(res)
      dat <- if (!is.null(payload)) payload[["data"]] else NULL
      if (is.null(dat) || !length(dat)) break
      out[[length(out) + 1L]] <- dat
      page <- page + 1L
      if (is.null(payload[["next"]])) break
    }
    if (!length(out)) return(NULL)
    dplyr::bind_rows(out)
  }

  # pull referentials (optionally)
  hm <- if (include_hydrometry)  tibble::as_tibble(get_paged(hm_path) %||% tibble::tibble()) else tibble::tibble()
  tp <- if (include_temperature) tibble::as_tibble(get_paged(tp_path) %||% tibble::tibble()) else tibble::tibble()

  # hydrometry rows (note: API altitude is in mm to m)
  hm_tbl <- if (nrow(hm)) tibble::tibble(
    code_site    = col_or_null(hm, "code_site"),
    station_id   = col_or_null(hm, "code_station"),
    station_name = normalize_utf8(col_or_null(hm, "libelle_site")),
    river        = normalize_utf8(col_or_null(hm, "libelle_cours_eau")),
    lat          = suppressWarnings(as.numeric(col_or_null(hm, "latitude_station"))),
    lon          = suppressWarnings(as.numeric(col_or_null(hm, "longitude_station"))),
    altitude_api = suppressWarnings(as.numeric(col_or_null(hm, "altitude_ref_alti_station"))),
    area         = NA_real_,
    source_api   = "hydrometrie",
    has_discharge   = TRUE,
    has_level       = TRUE,
    has_temperature = FALSE
  ) else tibble::tibble()

  # temperature rows
  tp_tbl <- if (nrow(tp)) tibble::tibble(
    code_site    = col_or_null(tp, "code_site"),
    station_id   = col_or_null(tp, "code_station"),
    station_name = normalize_utf8(col_or_null(tp, "libelle_station")),
    river        = normalize_utf8(col_or_null(tp, "libelle_cours_eau")),
    lat          = suppressWarnings(as.numeric(col_or_null(tp, "latitude"))),
    lon          = suppressWarnings(as.numeric(col_or_null(tp, "longitude"))),
    altitude_api = suppressWarnings(as.numeric(col_or_null(tp, "altitude"))),
    area         = NA_real_,
    source_api   = "temperature",
    has_discharge   = FALSE,
    has_level       = FALSE,
    has_temperature = TRUE
  ) else tibble::tibble()

  both <- dplyr::bind_rows(hm_tbl, tp_tbl)

  if (!nrow(both)) {
    return(tibble::tibble(
      country = x$country, provider_id = x$provider_id, provider_name = x$provider_name,
      station_id=character(), station_name=character(), station_name_ascii=character(),
      river=character(), river_ascii=character(),
      lat=double(), lon=double(), area=double(),
      altitude_api=double(), altitude_site=double(), altitude_station=double(),
      source_api=character(), has_discharge=logical(), has_level=logical(), has_temperature=logical()
    ))
  }

  # dedupe by station_id (defensive - hydrometry vs temperature are disjoint in practice)
  both <- dplyr::distinct(both, .data$station_id, .keep_all = TRUE)

  # ----------------------- precomputed meta (fast path) -----------------------
  if (!isTRUE(update)) {
    meta <- .fr_load_precomputed_meta()
    if (!is.null(meta)) {
      if (isTRUE(verbose)) {
        packageStartupMessage(sprintf(
          "FR_HUBEAU: using precomputed station metadata (built %s). For the freshest values, call stations(..., update = TRUE).",
          meta$date %||% "unknown date"
        ))
      }
      both <- dplyr::left_join(
        both,
        dplyr::select(meta$tab, code_site, station_id, area, altitude_api, altitude_site, altitude_station),
        by = c("code_site", "station_id"),
        multiple = "first", relationship = "many-to-one"
      )

      # coalesce area (prefer precomputed if present)
      if (all(c("area.x","area.y") %in% names(both))) {
        both$area <- dplyr::coalesce(both$area.y, both$area.x)
        both <- dplyr::select(both, -dplyr::any_of(c("area.x","area.y")))
      }

      # coalesce altitude_api if we happen to have both
      if (any(c("altitude_api.x","altitude_api.y") %in% names(both))) {
        both$altitude_api <- dplyr::coalesce(both$altitude_api.x, both$altitude_api.y)
        both <- dplyr::select(both, -dplyr::any_of(c("altitude_api.x","altitude_api.y")))
      }

      attr(both, "fr_hubeau_meta_date")   <- meta$date
      attr(both, "fr_hubeau_meta_source") <- "precomputed"
    } else if (isTRUE(verbose)) {
      packageStartupMessage("FR_HUBEAU: no precomputed metadata found; may fetch live if update=TRUE.")
    }
  }

  # ------------------------- live meta (slow path) ----------------------------
  need_live <- isTRUE(update) ||
    !all(c("altitude_site","altitude_station") %in% names(both))

  if (need_live) {
    # Site-level scrape (area + site altitude)
    site_meta <- .fr_fetch_site_meta(
      stats::na.omit(both$code_site),
      rate = list(n = 3, period = 1),
      max_conns = 3, use_cache = TRUE, cache_ttl_days = 90
    )

    if (nrow(site_meta)) {
      both <- dplyr::left_join(
        both, site_meta, by = "code_site",
        multiple = "first", relationship = "many-to-one"
      )
      # coalesce area again if join created .x/.y
      if (all(c("area.x","area.y") %in% names(both))) {
        both$area <- dplyr::coalesce(both$area.y, both$area.x)
        both <- dplyr::select(both, -dplyr::any_of(c("area.x","area.y")))
      }
    } else {
      both$altitude_site <- both$altitude_site %||% NA_real_
    }

    # Station-level scrape (gauge-zero altitude)
    st_meta <- .fr_fetch_station_meta(stats::na.omit(both$station_id))
    if (nrow(st_meta)) {
      both <- dplyr::left_join(
        both, st_meta, by = "station_id",
        multiple = "first", relationship = "many-to-one"
      )
    } else {
      both$altitude_station <- both$altitude_station %||% NA_real_
    }

    if (isTRUE(verbose)) {
      packageStartupMessage("FR_HUBEAU: fetched live metadata (slow path).")
    }
    attr(both, "fr_hubeau_meta_date")   <- as.character(Sys.Date())
    attr(both, "fr_hubeau_meta_source") <- "live"
  }

  # --------------------------- finishing touches ------------------------------
  both$station_name_ascii <- to_ascii(normalize_utf8(both$station_name))
  both$river_ascii        <- to_ascii(normalize_utf8(both$river))

  out <- tibble::tibble(
    country            = "FR",
    provider_id        = "FR_HUBEAU",
    provider_name      = "France - Hub'Eau (Eaufrance) API",
    station_id         = both$station_id,
    station_name       = both$station_name,
    station_name_ascii = both$station_name_ascii,
    river              = both$river,
    river_ascii        = both$river_ascii,
    lat                = both$lat,
    lon                = both$lon,
    area               = both$area,             # km\\u00B2 when available
    altitude_station   = both$altitude_station, # gauge zero (m)
    altitude_api       = both$altitude_api,     # API referential (m)
    altitude_site      = both$altitude_site,    # site fiche (m)
    source_api         = both$source_api,
    has_discharge      = both$has_discharge,
    has_level          = both$has_level,
    has_temperature    = both$has_temperature
  )

  # propagate meta attributes to the returned tibble if we set them on `both`
  for (nm in c("fr_hubeau_meta_date","fr_hubeau_meta_source")) {
    v <- attr(both, nm, exact = TRUE)
    if (!is.null(v)) attr(out, nm) <- v
  }

  out
}





#' @export
timeseries.hydro_service_FR_HUBEAU <- function(x,
                                              parameter   = c("water_discharge","water_level","water_temperature"),
                                              stations    = NULL,
                                              start_date  = NULL,
                                              end_date    = NULL,
                                              mode        = c("complete","range"),
                                              exclude_quality = NULL,                 # kept for API parity; not used here
                                              max_stations = getOption("hydrodownloadR.max_stations", 500L),
                                              ...) {
  parameter <- match.arg(parameter)
  mode      <- match.arg(mode)
  rng       <- resolve_dates(mode, start_date, end_date)  # list(start, end)
  pm        <- .fr_param_map(parameter)

  # --- station universe -------------------------------------------------------
  st_all <- stations.hydro_service_FR_HUBEAU(x)
  if (nrow(st_all) == 0) {
    return(.fr_empty_ts(x, parameter, pm$unit))
  }

  if (is.null(stations) || !length(stations)) {
    if (nrow(st_all) > max_stations) {
      warning(sprintf(
        "FR_HUBEAU: %d stations available; limiting to first %d. Pass 'stations=' or increase 'max_stations=' (or set options(hydrodownloadR.max_stations=...)).",
        nrow(st_all), as.integer(max_stations)
      ))
    }
    st <- utils::head(st_all, max_stations)
  } else {
    stations <- unique(as.character(stations))
    st <- dplyr::filter(st_all, .data$station_id %in% stations)
    if (length(setdiff(stations, st$station_id))) {
      bad <- setdiff(stations, st$station_id)
      warning(
        "FR_HUBEAU: dropping unknown station ids: ",
        paste(head(bad, 5L), collapse = ", "),
        if (length(bad) > 5) sprintf(" (+%d more)", length(bad) - 5) else ""
      )
    }
  }
  if (!nrow(st)) return(.fr_empty_ts(x, parameter, pm$unit))

  # --- chunking + progress (tick by stations, not chunks) --------------------
  chunk_size <- getOption("hydrodownloadR.chunk_size", 50L)
  chunks <- split(st$station_id, ceiling(seq_along(st$station_id) / chunk_size))
  pb <- progress::progress_bar$new(
    total = nrow(st),
    format = "FR_HUBEAU [:bar] :current/:total (:percent) :eta",
    clear = FALSE, width = 70
  )

  # --- per-call rate override (fallback to x$rate_cfg) -----------------------
  dots     <- list(...)
  rate_cfg <- dots$rate_cfg %||% getOption("hydrodownloadR.rate_cfg") %||% x$rate_cfg
  limiter  <- ratelimitr::limit_rate(
    function(req) perform_request(req),
    rate = do.call(ratelimitr::rate, rate_cfg)
  )

  # --- runners per API family -------------------------------------------------
  results <- lapply(chunks, function(ids) {
    # tick by number of stations in this chunk
    on.exit(pb$tick(length(ids)), add = TRUE)
    if (pm$api == "hydrometrie") {
      if (identical(pm$ts_path, "/api/v2/hydrometrie/observations_tr")) {
        .do_hm_rt_chunk(x, ids, pm, rng, limiter)
      } else {
        .do_hm_elab_chunk(x, ids, pm, rng, limiter)
      }
    } else {
      .do_temp_chunk(x, ids, pm, rng, limiter)
    }
  })

  out <- dplyr::bind_rows(results)
  if (!nrow(out)) return(.fr_empty_ts(x, parameter, pm$unit))
  out
}

# ---------- helpers: empty tibble ---------------------------------------------
.fr_empty_ts <- function(x, parameter, unit) {
  tibble::tibble(
    country        = "FR",
    provider_id    = "FR_HUBEAU",
    provider_name  = "France - Hub'Eau (Eaufrance) API",
    station_id     = character(),
    parameter      = parameter,
    timestamp      = as.POSIXct(character(), tz = "UTC"),
    value          = double(),
    unit           = unit,
    quality_code   = NA_character_,
    vertical_datum = NA_character_,
    source_url     = character()
  )
}

# ---------- hydrom\u00E9trie realtime (H) with cursor ------------------------------
.do_hm_rt_chunk <- function(x, ids, pm, rng, limiter) {

  # optional fallback for vertical datum from precomputed meta (if present)
  vdatum_fallback <- local({
    meta <- .fr_load_precomputed_meta()
    if (is.null(meta) || !"vertical_datum_site" %in% names(meta$tab)) {
      structure(character(), names = character())
    } else {
      tab <- meta$tab
      stats::setNames(as.character(tab$vertical_datum_site), as.character(tab$station_id))
    }
  })

  fetch_one <- function(st_id) {
    q    <- c(list(code_entite = st_id, size = 5000L), pm$fixed_qs)
    path <- pm$ts_path
    req  <- build_request(x, path = path, query = q)
    res  <- limiter(req)
    url_used <- .fr_used_url(res, req)

    p   <- .fr_parse_json(res)
    dat <- if (!is.null(p)) p[["data"]] else NULL
    all <- list()
    if (length(dat)) all[[1]] <- dat
    cur <- if (!is.null(p)) p[["next"]] else NULL

    # Follow 'next' verbatim (avoid double-encoding)
    while (!is.null(cur) && nzchar(cur)) {
      res2 <- .fr_try_req(function() httr::GET(cur, httr::timeout(20)))   # GET the next URL as-is
      p2   <- tryCatch(jsonlite::fromJSON(httr::content(res2, as = "text", encoding = "UTF-8"), simplifyVector = TRUE),
                       error = function(e) NULL)
      d2   <- if (!is.null(p2)) p2[["data"]] else NULL
      if (!length(d2)) break
      all[[length(all) + 1L]] <- d2
      cur <- p2[["next"]]
    }

    if (!length(all)) return(NULL)
    df <- tibble::as_tibble(dplyr::bind_rows(all))

    # Keep full time (H:M:S)
    df <- dplyr::mutate(
      df,
      ts  = as.POSIXct(.data$date_obs, format = "%Y-%m-%dT%H:%M:%SZ", tz = "UTC"),
      val = suppressWarnings(as.numeric(.data$resultat_obs)),
      q_fr = col_or_null(df, "libelle_qualification_obs")
    )
    df <- df[!is.na(df$ts) &
               df$ts >= as.POSIXct(rng$start) &
               df$ts <= as.POSIXct(rng$end), , drop = FALSE]
    if (!nrow(df)) return(NULL)

    # Vertical datum only for water level (H)
    vd_col  <- "code_systeme_alti_serie"
    vd_site <- unname(vdatum_fallback[st_id])
    if (!length(vd_site)) vd_site <- NA_character_
    vdatum  <- if (vd_col %in% names(df)) as.character(df[[vd_col]]) else rep(vd_site, nrow(df))

    tibble::tibble(
      country        = x$country,
      provider_id    = x$provider_id,
      provider_name  = x$provider_name,
      station_id     = df$code_station %||% st_id,
      parameter      = "water_level",
      timestamp      = df$ts,
      value          = pm$convert(df$val),   # mm -> cm
      unit           = pm$unit,              # "cm"
      quality_code   = df$q_fr,              # keep FR label (as agreed)
      vertical_datum = vdatum,
      source_url     = url_used
    )
  }

  dplyr::bind_rows(lapply(ids, fetch_one))
}

# ---------- hydrom\u00E9trie daily elaborated (QmnJ) -------------------------------
.do_hm_elab_chunk <- function(x, ids, pm, rng, limiter) {
  fetch_one <- function(st_id) {
    size <- 20000L
    acc  <- list()
    next_date <- as.Date(rng$start)

    repeat {
      q <- c(
        list(
          code_entite         = st_id,
          size                = size,
          date_debut_obs_elab = format(next_date, "%Y-%m-%d"),
          date_fin_obs_elab   = format(as.Date(rng$end), "%Y-%m-%d")
        ),
        pm$fixed_qs
      )

      req <- build_request(x, path = pm$ts_path, query = q)
      res <- .fr_try_req(function() limiter(req))
      url_used <- .fr_used_url(res, req)

      p   <- .fr_parse_json(res)
      dat <- if (!is.null(p)) p[["data"]] else NULL
      if (is.null(dat) || !length(dat)) break

      df <- tibble::as_tibble(dat)
      n  <- nrow(df)
      if (!n) break

      acc[[length(acc) + 1L]] <- df

      if (n == size) {
        # advance from last date returned
        last_date <- tryCatch(as.Date(df$date_obs_elab[n]), error = function(e) NA)
        if (is.na(last_date)) break
        next_date <- last_date + 1
        next
      } else {
        break
      }
    }

    if (!length(acc)) return(NULL)
    out <- dplyr::bind_rows(acc)

    out <- dplyr::mutate(
      out,
      ts  = as.POSIXct(.data$date_obs_elab, tz = "UTC"),
      val = suppressWarnings(as.numeric(.data$resultat_obs_elab))
    )
    out <- out[!is.na(out$ts) &
                 out$ts >= as.POSIXct(rng$start) &
                 out$ts <= as.POSIXct(rng$end), , drop = FALSE]
    if (!nrow(out)) return(NULL)

    # Dedup boundary if we overlapped a day at split
    out <- dplyr::distinct(out, .data$ts, .keep_all = TRUE)

    tibble::tibble(
      country        = x$country,
      provider_id    = x$provider_id,
      provider_name  = x$provider_name,
      station_id     = out$code_station %||% st_id,
      parameter      = "water_discharge",
      timestamp      = out$ts,
      value          = pm$convert(out$val),     # l/s -> m^3/s
      unit           = pm$unit,                 # "m^3/s"
      quality_code   = col_or_null(out, "libelle_qualification"),
      vertical_datum = NA_character_,
      source_url     = url_used
    )
  }

  dplyr::bind_rows(lapply(ids, fetch_one))
}

# ---------- temperature (v1) --------------------------------------------------
.do_temp_chunk <- function(x, ids, pm, rng, limiter) {
  fetch_one <- function(st_id) {
    q <- list(
      code_station = st_id,
      date_debut   = format(as.Date(rng$start), "%Y-%m-%d"),
      date_fin     = format(as.Date(rng$end),   "%Y-%m-%d"),
      size         = 20000L
    )
    req <- build_request(x, path = pm$ts_path, query = q)
    res <- .fr_try_req(function() limiter(req))
    url_used <- .fr_used_url(res, req)

    p   <- .fr_parse_json(res)
    dat <- if (!is.null(p)) p[["data"]] else NULL
    if (is.null(dat) || !length(dat)) return(NULL)

    df <- tibble::as_tibble(dat)

    # build POSIXct timestamp (UTC) from available fields
    if ("date_mesure" %in% names(df) && any(grepl("T", df$date_mesure, fixed = TRUE))) {
      # ISO8601 already present, e.g. "2009-08-10T00:12:00Z"
      ts <- as.POSIXct(df$date_mesure, format = "%Y-%m-%dT%H:%M:%SZ", tz = "UTC")
    } else {
      # Combine date + time columns: "date_mesure_temp" + "heure_mesure_temp"
      d <- as.character(df[["date_mesure_temp"]] %||% NA_character_)
      h <- as.character(df[["heure_mesure_temp"]] %||% NA_character_)

      # default missing/empty times to midnight
      h[is.na(h) | !nzchar(h)] <- "00:00:00"

      # some APIs return "H:MM:SS" -> left-pad to "HH:MM:SS"
      h <- sub("^([0-9]):", "0\\1:", h)

      ts <- as.POSIXct(paste(d, h), format = "%Y-%m-%d %H:%M:%S", tz = "UTC")
    }

    df <- df %>%
      dplyr::mutate(
        ts  = ts,
        val = suppressWarnings(as.numeric(.data$resultat))
      ) %>%
      dplyr::filter(
        !is.na(.data$ts),
        .data$ts >= as.POSIXct(rng$start, tz = "UTC"),
        .data$ts <= as.POSIXct(rng$end,   tz = "UTC")
      )


    if (!nrow(df)) return(NULL)

    tibble::tibble(
      country        = x$country,
      provider_id    = x$provider_id,
      provider_name  = x$provider_name,
      station_id     = df$code_station %||% st_id,
      parameter      = "water_temperature",
      timestamp      = df$ts,
      value          = pm$convert(df$val),
      unit           = pm$unit,
      quality_code   = df$libelle_qualification,
      vertical_datum = NA_character_,
      source_url     = url_used
    )
  }

  dplyr::bind_rows(lapply(ids, fetch_one))
}

Try the hydrodownloadR package in your browser

Any scripts or data that you put into this service are public.

hydrodownloadR documentation built on Feb. 25, 2026, 5:08 p.m.