R/read_det_interpolate.R

Defines functions read_det_interpolate

Documented in read_det_interpolate

#' Read deterministic forecast files and interpolate to stations.
#'
#' `r lifecycle::badge("deprecated")`
#' This function was deprecated as \link{read_forecast} is much more flexible.
#'
#' @keywords internal
#'
#' @param start_date Date of the first forecast to be read in. Should be in
#'   YYYYMMDDhh format. Can be numeric or charcter.
#' @param end_date Date of the last forecast to be read in. Should be in
#'   YYYYMMDDhh format. Can be numeric or charcter.
#' @param det_model The name of the deterministic model. May be expressed as a
#'   vector if more than one model is wanted.
#' @param parameter The parameters to read as a character vector. For reading
#'   from vfld files, set to NULL to read all parameters, or set
#'   \code{veritcal_coordinate = NA} to only read surface parameters. See
#'   \code{show_harp_parameters} to get the possible parameters.
#' @param lead_time The lead times to read as a numeric vector.
#' @param by The time between forecasts. Should be a string of a number followed
#'   by a letter (the defualt is "6h"), where the letter gives the units - may
#'   be d for days, h for hours or m for minutes.
#' @param file_path The path for the forecast files to read. file_path will, in
#'   most cases, form part of the file template.
#' @param file_format The format of the files to read. Can be "vfld", "grib",
#'   "netcdf", "fa", or "fatar".
#' @param file_template The file template for the files to be read. For
#'   available built in templates see \code{\link{show_file_templates}}. If
#'   anything else is passed, it is returned unmodified, or with substitutions
#'   made for dynamic values. Available substitutions are {YYYY} for year,
#'   \{MM\} for 2 digit month with leading zero, \{M\} for month with no leading
#'   zero, and similarly \{DD\} or \{D\} for day, \{HH\} or \{H\} for hour,
#'   \{mm\} or \{m\} for minute. Also \{LDTx\} for lead time where x is the
#'   length of the string including leading zeros - can be omitted or 2, 3 or 4.
#'   Note that the full path to the file will always be file_path/template.
#' @param stations A data frame of stations with columns SID, lat, lon, elev. If
#'   this is supplied the forecasts are interpolated to these stations. In the
#'   case of vfld files, this is ignored and all stations found in the vfld are
#'   used. In the case of gridded files (e.g. grib, netcdf, FA), if no data
#'   frame of stations is passed a default list of stations is used. This list
#'   can be accessed via \code{station_list}.
#' @param correct_t2m A logical value to tell the function whether to height
#'   correct the 2m temperature forecast, if it is included in the
#'   \code{parameter} argument, from the model elevation to the observation
#'   elevation. The default is TRUE.
#' @param keep_model_t2m A logical value to tell the function whether to keep
#'   the original 2m temperature from the model as well as the height corrected
#'   values. If set to TRUE the this parameter gets the name "T2m_uncorrcted".
#'   The default is FALSE.
#' @param lapse_rate The lapse rate, in Kelvins per meter, to use when height
#'   correcting the 2m temperature. The defaul is 0.0065 K/m.
#' @param vertical_coordinate If upper air for multiple levels are to be read,
#'   the vertical coordinate of the data is given here. The default is
#'   "pressure", but can also be "model" for model levels, or "height" for
#'   height above ground /sea level.
#' @param clim_file The name of a file containing information about the model
#'   domain. Must include orography (surface geopotential), but can also include
#'   land-sea mask.
#' @param clim_format The file format of \code{clim_file}. Can be "grib", "fa",
#'   "fatar", or "netcdf".
#' @param interpolation_method The interpolation method to use. Available
#'   methods are "bilinear", "bicubic", or "nearest". The default is "nearest",
#'   which takes the nearest grid points to the stations.
#' @param use_mask A logical value to tell the function whether to include a
#'   land-sea mask in the interpolation. The land-sea mask must exist in either
#'   the \code{clim_file} or in the forecast files.
#' @param sqlite_path If not NULL, sqlite files are generated and written to the
#'   directory specified here.
#' @param sqlite_template The template for the filenames of the fctable files.
#'   See \code{\link{show_file_templates}} for available built in templates -
#'   for forecast sqlite files, these are templates beginning "fctable_". The
#'   default is "fctable_det".
#' @param sqlite_synchronous The synchronus setting for sqlite files. The
#'   defualt is "off", but could also be "normal", "full", or "extra". See
#'   \url{https://www.sqlite.org/pragma.html#pragma_synchronous} for more
#'   information.
#' @param sqlite_journal_mode The journal mode for the sqlite files. The default
#'   is "delete", but can also be "truncate", "persist", "memory", "wal", or
#'   "off". See \url{https://www.sqlite.org/pragma.html#pragma_journal_mode} for
#'   more information.
#' @param return_data A logical indicating whether to return the read data to
#'   the calling environment. The default is FALSE to avoid memory overload.
#' @param ... Extra options depending on file format.
#'
#' @return A tibble with columns det_model, fcdate, lead_time,
#'   member, SID, lat, lon, <parameter>.
#' @export
#'
read_det_interpolate <- function(
  start_date,
  end_date,
  det_model,
  parameter,
  lead_time            = seq(0, 48, 3),
  by                   = "6h",
  file_path            = "",
  file_format          = "vfld",
  file_template        = "vfld_det",
  stations             = NULL,
  correct_t2m          = TRUE,
  keep_model_t2m       = FALSE,
  lapse_rate           = 0.0065,
  vertical_coordinate  = c("pressure", "model", "height", NA),
  clim_file            = NULL,
  clim_format          = file_format,
  interpolation_method = "nearest",
  use_mask             = FALSE,
  sqlite_path          = NULL,
  sqlite_template      = "fctable_det",
  sqlite_synchronous   = c("off", "normal", "full", "extra"),
  sqlite_journal_mode  = c("delete", "truncate", "persist", "memory", "wal", "off"),
  return_data          = FALSE,
  ...
) {

  lifecycle::deprecate_stop(
    "0.1.0",
    "read_det_interpolate()",
    "read_forecast()"
  )

  if (any(is.na(vertical_coordinate))) vertical_coordinate <- as.character(vertical_coordinate)
  vertical_coordinate <- match.arg(vertical_coordinate)
  sqlite_synchronous  <- match.arg(sqlite_synchronous)
  sqlite_journal_mode <- match.arg(sqlite_journal_mode)

  # Loop over dates to prevent excessive data volumes in memory

  all_dates <- seq_dates(start_date, end_date, by)

  if (return_data) {
    function_output <- list()
    list_counter    <- 0
  }

  # initialise interpolation weights
  # if no clim file given, use something from data_files
  # find first existing file (if none: give an error)
  # use that to get domain
  # TODO: maybe for GRIB, we would want to pass a FA climfile for initialisation?
  #       so should we use the same file_format?

  if (is.null(stations)) {
    warning(
      "No stations specified. Default station list used.",
      call.      = FALSE,
      immediate. = TRUE
    )
    stations <- get("station_list")
  }

  if (!is.null(clim_file)) {
    message("Initialising interpolation.")
    init <- initialise_interpolation(
      file_name    = clim_file,
      file_format = clim_format,
      correct_t2m = correct_t2m,
      method      = interpolation_method,
      use_mask    = use_mask,
      stations    = stations
    )
  } else {
    init <- list(stations = stations)
    if (is.element("T2m", parameter) && correct_t2m && file_format != "vfld") {
      warning(
        "'correct_t2m = TRUE' but no 'clim_file' specified. Cannot height correct 2m temperature.",
        call.      = FALSE,
        immediate. = TRUE
      )
      correct_t2m <- FALSE
    }
  }

  for (fcst_date in all_dates) {

    if (return_data) list_counter <- list_counter + 1

    # Get the file names NEEED TO TEST FROM HERE!

    message("Generating file names.")

    data_files <- purrr::map2(
      det_model, file_template,
      ~ get_filenames(
        file_path      = file_path,
        start_date     = fcst_date,
        end_date       = fcst_date,
        by             = by,
        parameter      = parameter,
        det_model      = .x,
        lead_time      = lead_time,
        file_template  = .y,
        filenames_only = FALSE
      )
    ) %>%
      dplyr::bind_rows()


    # if init$weights == NULL (no clim file), run init with the first (existing) file name
    # you could do this in an explicit loop, but that may be inefficient
    if (is.null(init$weights) && ! file_format %in% c("vfld", "netcdf")) {
      # NOTE: even if there is no "topo" or "lsm" field, we will should able to derive the domain
      ff <- file.exists(data_files$file_name)
      if (any(ff)) {
        message("Initialising interpolation from forecast file.")
        init <- initialise_interpolation(
          file_name    = data_files$file_name[which(ff)[1]],
          file_format = file_format,
          correct_t2m = correct_t2m,
          method      = interpolation_method,
          use_mask    = use_mask,
          stations    = stations
        )
      }
    }

    # Get the data
    message("Reading data.")

    read_function <- get(paste("read", file_format, "interpolate", sep = "_"))
    forecast_data <- data_files %>%
      dplyr::mutate(
        fcdate = str_datetime_to_unixtime(.data$fcdate)
      ) %>%
      dplyr::mutate(validdate = .data$fcdate + .data$lead_time * 3600) %>%
      dplyr::group_by(.data$file_name)
    if (tidyr_new_interface()) {
      forecast_data <- tidyr::nest(forecast_data, metadata = -tidyr::one_of("file_name"))
    } else {
      forecast_data <- tidyr::nest(forecast_data, .key = "metadata")
    }
    forecast_data <- forecast_data %>%
      dplyr::mutate(
        forecast = purrr::map2(
          .data$file_name,
          .data$metadata,
          function(x, y) read_function(
            file_name           = x,
            parameter           = parameter,
            lead_time           = y$lead_time,
            vertical_coordinate = vertical_coordinate,
            init                = init,
            method              = interpolation_method,
            ...
          )
        )
      )

    # Remove empty rows and join the forecast data to the metadata

    param_units <- purrr::map_dfr(forecast_data$forecast, "units") %>%
      dplyr::distinct()

    message("Joining data.")

    forecast_data <- forecast_data %>%
      dplyr::mutate(forecast = purrr::map(.data$forecast, "fcst_data")) %>%
      dplyr::filter(purrr::map_lgl(.data$forecast, ~ !is.null(.x)))

    if (nrow(forecast_data) < 1) next

    forecast_data <- purrr::map2_df(
      forecast_data$metadata,
      forecast_data$forecast,
      dplyr::inner_join,
      by = "lead_time"
    ) %>%
      tidyr::drop_na(.data$forecast)

    # Put data into tidy long format and correct T2m if required

    sqlite_params <- unique(forecast_data$parameter)

    if (any(tolower(sqlite_params) == "t2m") && correct_t2m) {

      t2m_df <- forecast_data %>%
        dplyr::filter(tolower(.data$parameter) == "t2m") %>%
        dplyr::inner_join(init$stations, by = "SID", suffix = c("", ".station")) %>%
        dplyr::mutate(
          forecast = .data$forecast + lapse_rate * (.data$model_elevation - .data$elev)
        ) %>%
        dplyr::filter(.data$elev > -9999) %>%
        dplyr::select(
          -dplyr::contains(".station"),
          -.data$elev,
          -.data$name
        )

      if (keep_model_t2m) {
        forecast_data <- forecast_data %>%
          dplyr::mutate(
            parameter = dplyr::case_when(
              tolower(parameter) == "t2m" ~ paste0(.data$parameter, "_uncorrected"),
              TRUE                        ~ .data$parameter
            )
          )
        t2m_param   <- paste0(param_units$parameter[tolower(param_units$parameter) == "t2m"], "_uncorrected")
        t2m_units   <- param_units$units[tolower(param_units$parameter) == "t2m"]
        param_units <- rbind(param_units, c(t2m_param, t2m_units))
      } else {
        forecast_data <- forecast_data %>%
          dplyr::filter(tolower(.data$parameter) != "t2m")
      }

      forecast_data <- dplyr::bind_rows(forecast_data, t2m_df)

    }

    forecast_data <- forecast_data %>%
      tidyr::drop_na(.data$forecast) %>%
      dplyr::left_join(param_units, by = "parameter")

    # If sqlite_path is passed write data to sqlite files

    if (!is.null(sqlite_path)) {

      message("Preparing data to write.")
      group_cols <- c("fcdate", "parameter", "lead_time", "det_model")
      if (tidyr_new_interface()) {
        sqlite_data <- tidyr::nest(forecast_data, data = -tidyr::one_of(group_cols))
      } else {
        sqlite_data <- forecast_data %>%
          dplyr::group_by(!!!rlang::syms(group_cols)) %>%
          tidyr::nest()
      }
      sqlite_data <- sqlite_data %>%
        dplyr::mutate(
          file_path = sqlite_path,
          YYYY      = unixtime_to_str_datetime(.data$fcdate, lubridate::year),
          MM        = formatC(unixtime_to_str_datetime(.data$fcdate, lubridate::month), width = 2, flag = "0"),
          HH        = formatC(unixtime_to_str_datetime(.data$fcdate, lubridate::hour), width = 2, flag = "0"),
          LDT3      = formatC(lead_time, width = 3, flag = "0")
        )

      sqlite_data <- sqlite_data %>%
        dplyr::mutate(
          file_name = as.vector(glue::glue_data(sqlite_data, get_template(sqlite_template)))
        )
      if (tidyr_new_interface()) {
        sqlite_data <- tidyr::unnest(sqlite_data, tidyr::one_of("data"))
      } else {
        sqlite_data <- tidyr::unnest(sqlite_data)
      }

      sqlite_primary_key <- c("fcdate", "leadtime", "SID")

      fixed_trasmute_cols <- c("SID", "lat", "lon")
      if (is.element("model_elevation", colnames(forecast_data))) {
        fixed_trasmute_cols <- c(fixed_trasmute_cols, "model_elevation")
      }

      vertical_coordinate_colnames <- c("p", "ml", "z")
      vertical_coordinate_col      <- which(vertical_coordinate_colnames %in% colnames(forecast_data))
      if (length(vertical_coordinate_col) > 0) {
        fixed_trasmute_cols <- c(fixed_trasmute_cols, vertical_coordinate_colnames[vertical_coordinate_col])
        sqlite_primary_key  <- c(sqlite_primary_key, vertical_coordinate_colnames[vertical_coordinate_col])
      }

      fixed_trasmute_cols <- rlang::syms(fixed_trasmute_cols)

      sqlite_data <- sqlite_data %>%
        dplyr::transmute(
          !!! fixed_trasmute_cols,
          fcdate    = as.integer(.data$fcdate),
          leadtime  = .data$lead_time,
          validdate = as.integer(.data$validdate),
          member    = paste(.data$det_model, "det", sep = "_"),
          .data$forecast,
          .data$parameter,
          .data$units,
          .data$file_name
        ) %>%
        dplyr::group_by(.data$file_name)
      if (tidyr_new_interface()) {
        sqlite_data <- tidyr::nest(sqlite_data, data = -tidyr::one_of("file_name"))
      } else {
        sqlite_data <- tidyr::nest(sqlite_data)
      }

      purrr::walk2(
        sqlite_data$data,
        sqlite_data$file_name,
        write_fctable_to_sqlite,
        primary_key  = sqlite_primary_key,
        synchronous  = sqlite_synchronous,
        journal_mode = sqlite_journal_mode
      )

    }

    if (return_data) {
      if (is.element("member", names(forecast_data))) {
        function_output[[list_counter]] <- dplyr::select(forecast_data, -.data$member)
      } else {
        function_output[[list_counter]] <- forecast_data
      }
    }
  }
  if (return_data) dplyr::bind_rows(function_output)

}
andrew-MET/harpIO documentation built on March 7, 2024, 7:48 p.m.