R/validate-landings.R

Defines functions validate_landings

Documented in validate_landings

#' Validate landings
#'
#' Downloads the preprocessed version of the data from cloud storage services and
#' validates a range of information so that it can be safely used for analysis.
#' By default the function uses the method of the median absolute deviation (MAD)
#' for outliers identification.
#'
#' The parameters needed in the config file are those required for
#' `preprocess_landings_step_1()` or `preprocess_landings_step_2()`,
#' `preprocess_metadata_tables()` ` combined,
#' as well as parameters needed to outliers identification
#'  that are  `hrs`, `method` and `k`.
#'
#' @param log_threshold
#' @inheritParams ingest_landings_v1v3
#' @inheritParams validate_surveys_time
#' @inheritParams validate_catch_price
#' @keywords workflow
#' @return no outputs. This function is used for it's side effects
#'
#' @importFrom rlang .data
#' @export
#'
validate_landings <- function(log_threshold = logger::DEBUG) {
  logger::log_threshold(log_threshold)

  pars <- read_config()
  metadata <- get_preprocessed_sheets(pars)
  landings <- get_merged_landings(pars, "_weight")

  # read arguments for outliers identification
  default_max_limit <- pars$validation$landings$default$max
  default_method <- pars$validation$landings$default$method
  default_k <- pars$validation$landings$default$k
  cook_dist <- pars$validation$landings$cook_dist

  # deployed_imeis <- get_deployed_imeis(metadata)
  # for now using all the deployed imeis
  deployed_imeis <- na.omit(metadata$devices$device_imei)

  logger::log_info("Validating IMEIs...")
  imei_alerts <- landings$`trip_group/IMEI` %>%
    rlang::set_names(landings$`_id`) %>%
    purrr::imap(
      validate_this_imei,
      deployed_imeis
    ) %>%
    purrr::map_dfr(tibble::as_tibble)

  logger::log_info("Validating surveys trips...")
  surveys_time_alerts <- validate_surveys_time(
    data = landings,
    hrs = pars$validation$landings$survey_time$max_duration %||% default_max_limit,
    submission_delay = pars$validation$landings$survey_time$submission_delay
  )
  logger::log_info("Validating catches values...")
  regular_landings <- validate_landing_regularity(landings)
  regular_landings_data <- regular_landings$regular_landings
  regularity_alerts <- regular_landings$regularity_alerts
  surveys_price_alerts <- validate_catch_price(
    data = regular_landings_data,
    method = pars$validation$landings$prices$method %||% default_method,
    k = pars$validation$landings$prices$k %||% default_k
  )
  logger::log_info("Validating catches parameters...")
  surveys_catch_alerts <- validate_catch_params(
    regular_landings_data,
    k_ind = pars$validation$landings$catch$n_individuals$k
  )
  logger::log_info("Generating catches parameters bounds table...")
  bounds_table <- get_bounds_table(
    data = regular_landings_data,
    metadata_table = metadata,
    k_ind = pars$validation$landings$catch$n_individuals$k
  )
  price_weight_alerts <- validate_price_weight(
    catch_alerts = surveys_catch_alerts,
    price_alerts = surveys_price_alerts,
    non_regular_ids = regularity_alerts,
    cook_dist = cook_dist,
    price_weight_min = pars$validation$landings$price_per_weight$min_limit,
    price_weight_max = pars$validation$landings$price_per_weight$max_limit
  )
  vessel_type_alerts <- validate_vessel_type(
    landings,
    metadata$vessel_types
  )
  gear_type_alerts <- validate_gear_type(
    landings,
    metadata$gear_types
  )
  site_alerts <- validate_sites(
    landings,
    metadata$stations, metadata$reporting_unit
  )
  n_fishers_alerts <- validate_n_fishers(
    landings,
    method = pars$validation$landings$n_fishers$method %||% default_method,
    k = pars$validation$landings$n_fishers$k %||% default_k
  )
  habitat_alerts <- validate_habitat(
    landings,
    metadata$habitat
  )
  mesh_alerts <- validate_mesh(landings,
    mesh_limit = pars$validation$landings$mesh
  )
  gleaners_alerts <- validate_gleaners(
    landings,
    method = default_method,
    k_gleaners = pars$validation$landings$gleaners$k
  )
  fuel_alerts <- validate_fuel(
    landings,
    method = default_method,
    k_fuel = pars$validation$landings$fuel$k
  )
  conservation_alerts <- validate_conservation(
    landings,
    metadata_conservation = metadata$conservation
  )
  happiness_alerts <- validate_happiness(
    landings
  )



  # CREATE VALIDATED OUTPUT -----------------------------------------------

  landings_ids <-
    landings %>%
    dplyr::select(
      submission_id = .data$`_id`
    ) %>%
    dplyr::mutate(submission_id = as.integer(.data$submission_id))

  logger::log_info("Renaming data fields")
  validated_landings <-
    list(
      imei_alerts,
      surveys_time_alerts$validated_dates,
      surveys_time_alerts$validated_duration,
      price_weight_alerts,
      vessel_type_alerts,
      gear_type_alerts,
      site_alerts,
      n_fishers_alerts,
      habitat_alerts,
      mesh_alerts,
      gleaners_alerts,
      fuel_alerts,
      conservation_alerts,
      happiness_alerts
    ) %>%
    purrr::map(~ dplyr::select(.x, -alert_number)) %>%
    purrr::reduce(dplyr::left_join, by = "submission_id") %>%
    dplyr::left_join(landings_ids, by = "submission_id") %>%
    dplyr::mutate(
      species_group = purrr::map(
        .x = .data$species_group, .f = purrr::modify_at,
        .at = "length_individuals",
        purrr::map, dplyr::select,
        length = .data$mean_length,
        number_of_fish = .data$n_individuals,
        catch = .data$weight,
        .data$Selenium_mu:.data$Vitamin_A_mu
      ),
      species_group = purrr::map(
        .x = .data$species_group, .f = dplyr::select,
        catch_taxon = .data$species,
        catch_use = .data$food_or_sale,
        length_type = .data$length_type,
        length_frequency = .data$length_individuals
      )
    ) %>%
    dplyr::select(
      landing_id = .data$submission_id,
      landing_date = .data$date,
      tracker_imei = .data$imei,
      trip_length = .data$trip_length,
      landing_catch = .data$species_group,
      catch_price = .data$total_catch_value,
      landing_site = .data$station_name,
      municipality = .data$reporting_region,
      habitat = .data$habitat_type,
      tidyselect::starts_with("fisher_number"),
      gear = .data$gear_type,
      .data$mesh_size,
      propulsion_gear = .data$vessel_type,
      .data$n_gleaners,
      .data$fuel,
      catch_preservation = .data$conservation_place,
      .data$happiness
    )

  validated_landings_filename <- paste(pars$surveys$merged_landings$file_prefix,
    "validated",
    sep = "_"
  ) %>%
    add_version(extension = "rds")
  readr::write_rds(
    x = validated_landings,
    file = validated_landings_filename,
    compress = "gz"
  )
  logger::log_info("Uploading {validated_landings_filename} to cloud sorage")
  upload_cloud_file(
    file = validated_landings_filename,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )
  # HANDLE FLAGS ------------------------------------------------------------

  alerts <-
    list(
      imei_alerts,
      surveys_time_alerts$validated_dates,
      surveys_time_alerts$validated_duration,
      price_weight_alerts,
      vessel_type_alerts,
      gear_type_alerts,
      site_alerts,
      n_fishers_alerts,
      habitat_alerts,
      mesh_alerts,
      gleaners_alerts
    ) %>%
    purrr::map(~ dplyr::arrange(., .data$submission_id)) %>%
    purrr::map(~ dplyr::select(.x, alert_number, submission_id)) %>%
    dplyr::bind_cols() %>%
    dplyr::select(tidyselect::contains("alert")) %>%
    tidyr::unite(col = "alert", sep = "-", na.rm = TRUE)

  # Wrangle a bot landings, alerts and flags data frames to fit the workflow
  landings_info <-
    landings %>%
    dplyr::rename(
      submission_id = .data$`_id`,
      submission_date = .data$`_submission_time`
    ) %>%
    dplyr::mutate(
      submission_id = as.integer(.data$submission_id),
      submission_date = lubridate::as_date(.data$submission_date)
    ) %>%
    dplyr::select(.data$submission_id, .data$submission_date) %>%
    dplyr::arrange(.data$submission_id)

  ## Google sheets validation pipeline ##
  alerts_df <-
    dplyr::bind_cols(landings_info, alerts) %>%
    dplyr::arrange(.data$submission_date, .data$submission_id) %>%
    dplyr::mutate(
      submission_id = as.integer(.data$submission_id),
      alert = ifelse(.data$alert == "", "0", .data$alert),
      flag_date = lubridate::today("GMT"),
      validated = rep(FALSE, nrow(.)),
      comments = NA_character_,
      validated_when_ymd = NA_real_,
      validated_when_ymd = as.Date(.data$validated_when_ymd)
    ) %>%
    dplyr::select(
      .data$submission_id, .data$submission_date,
      .data$flag_date, .data$alert, .data$validated,
      .data$validated_when_ymd, .data$comments
    )

  logger::log_info("Authenticating for google drive")
  googlesheets4::gs4_auth(
    path = pars$storage$google$options$service_account_key,
    use_oob = TRUE
  )

  logger::log_info("Retriving validation sheet and arrange by submission date")

  peskas_alerts <-
    googlesheets4::range_read(
      ss = pars$validation$google_sheets$sheet_id,
      sheet = pars$validation$google_sheets$flags_table,
      col_types = "iDDclDc"
    )

  logger::log_info("Upload backup validation sheet to GC")
  alerts_filename <-
    pars$validation$google_sheets$file_prefix %>%
    add_version(extension = "rds")
  readr::write_rds(
    x = peskas_alerts,
    file = alerts_filename,
    compress = "gz"
  )
  upload_cloud_file(
    file = alerts_filename,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )

  new_flags_ids <- setdiff(alerts_df$submission_id, peskas_alerts$submission_id)
  new_flags_obs <- alerts_df %>% dplyr::filter(.data$submission_id %in% new_flags_ids)
  old_flags_df <- alerts_df %>% dplyr::filter(!.data$submission_id %in% new_flags_ids)

  if (nrow(old_flags_df) < nrow(peskas_alerts)) {
    stop("The table is shorter than remote table")
  }

  logger::log_info("Updating flags table")

  sync_table <-
    dplyr::left_join(old_flags_df, peskas_alerts, by = "submission_id") %>%
    dplyr::arrange(
      dplyr::desc(.data$submission_date.x),
      dplyr::desc(.data$submission_id)
    ) %>%
    dplyr::transmute(
      submission_id = .data$submission_id,
      submission_date = .data$submission_date.x,
      flag_date = dplyr::case_when(
        .data$alert.x == .data$alert.y ~
          .data$flag_date.y, TRUE ~ .data$flag_date.x
      ),
      alert = .data$alert.x,
      validated = .data$validated.y,
      validated_when_ymd = .data$validated_when_ymd.y,
      comments = .data$comments.y
    ) %>%
    dplyr::ungroup()

  logger::log_info("New {nrow(new_flags_obs)} submissions flags to upload")
  if (nrow(new_flags_obs) > 0) {
    logger::log_info("Appending new {nrow(new_flags_obs)} flags")

    sync_table <- dplyr::bind_rows(sync_table, new_flags_obs)

    googlesheets4::sheet_write(
      data = sync_table,
      ss = pars$validation$google_sheets$sheet_id,
      sheet = pars$validation$google_sheets$flags_table
    )
  } else {
    logger::log_info("No new flags to append")
  }
}
get_validation_tables <- function(pars) {
  validation_rds <- cloud_object_name(
    prefix = paste(pars$validation$airtable$name, sep = "_"),
    provider = pars$storage$google$key,
    extension = "rds",
    version = pars$validation$version$preprocess,
    options = pars$storage$google$options
  )
  logger::log_info("Downloading {validation_rds}...")
  download_cloud_file(
    name = validation_rds,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )
  readr::read_rds(file = validation_rds)
}

get_preprocessed_landings <- function(pars) {
  landings_rds <- cloud_object_name(
    prefix = paste(pars$surveys$landings$file_prefix, "preprocessed", sep = "_"),
    provider = pars$storage$google$key,
    extension = "rds",
    version = pars$surveys$landings$version$preprocess,
    options = pars$storage$google$options
  )
  logger::log_info("Downloading {landings_rds}...")
  download_cloud_file(
    name = landings_rds,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )
  readr::read_rds(file = landings_rds)
}

get_preprocessed_metadata <- function(pars) {
  metadata_rds <- cloud_object_name(
    prefix = paste(pars$metadata$airtable$name, "preprocessed", sep = "_"),
    provider = pars$storage$google$key,
    extension = "rds",
    options = pars$storage$google$options
  )
  logger::log_info("Downloading {metadata_rds}...")
  download_cloud_file(
    name = metadata_rds,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )
  readr::read_rds(file = metadata_rds)
}

#' Download merged landings
#'
#' Download validated surveys landings and PDS trips.
#'
#' @param pars Configuration file.
#' @param suffix A character indicating dataframe version. Use "_weight" to download
#' version with calculated catch weight.
#'
#' @return A dataframe.
#' @export
get_merged_landings <- function(pars, suffix = "") {
  landings_rds <- cloud_object_name(
    prefix = paste0(pars$surveys$merged_landings$file_prefix, suffix),
    provider = pars$storage$google$key,
    extension = "rds",
    version = pars$surveys$merged_landings$version,
    options = pars$storage$google$options,
    exact_match = TRUE
  )
  logger::log_info("Downloading {landings_rds}...")
  download_cloud_file(
    name = landings_rds,
    provider = pars$storage$google$key,
    options = pars$storage$google$options
  )
  readr::read_rds(file = landings_rds)
}
WorldFishCenter/peskas.timor.data.pipeline documentation built on April 14, 2025, 1:47 p.m.