R/utils_API.R

Defines functions to_observations create_odata_res_version create_odata_obs_version data_frame_to_api_helper

data_frame_to_api_helper <- function(directory, config, metadata, data) {

  skip_resource <- check_null(metadata$skip_Resource)
  if (is.na(skip_resource)) {
    resource <- to_resource(config, metadata, directory)
    version_res <- getLatestVersion(location = list(collection = "PDS",
                                                    instance = "Covid-19",
                                                    table = "Resource_Metadata"),
                                    server = CONFIG$stats_odata_api$environment)
    writeDatastore(resource,
                   location = list(collection = "PDS",
                                   instance = "Covid-19",
                                   table = "Resource_Metadata"),
                   version = version_res,
                   server = CONFIG$stats_odata_api$environment)
  } else {
    metadata$ResourceID <- str_remove(metadata$ResourceID, "\\.\\d+") # Stripping off decimal id
  }

  observations <- to_observations(config, metadata, directory, data)
  observations <- observations[!is.na(observations$Value), ]
  version_obs <- getLatestVersion(location = list(collection = "PDS",
                                                  instance = "Covid-19",
                                                  table = "Observation"),
                                  server = CONFIG$stats_odata_api$environment)
  writeDatastore(observations,
                 location = list(collection = "PDS",
                                 instance = "Covid-19",
                                 table = "Observation"),
                 version = version_obs,
                 server = CONFIG$stats_odata_api$environment)
}

create_odata_obs_version <- function() {
  observation_dummy <- tibble(ResourceID = "xxxx",
                              Period = "1800-01-01",
                              Value = 0,
                              Unit = "xxxx",
                              Measure = "xxxx",
                              Multiplier = 0)

  writeDatastore(observation_dummy,
                 location = list(collection = "PDS",
                                 instance = "Covid-19",
                                 table = "Observation"),
                 server = CONFIG$stats_odata_api$environment)
}

create_odata_res_version <- function() {
  resource_dummy <- tibble(LatestDataTable = "xxxx",
                           ResourceID = "xxxx",
                           Subject = "xxxx",
                           Title = "xxxx",
                           Description = "xxxx",
                           Frequency = "xxxx",
                           Var1 = "xxxx",
                           Var2 = "xxxx",
                           Var3 = "xxxx",
                           Var4 = "xxxx",
                           Var5 = "xxxx",
                           Var6 = "xxxx",
                           Modified = "1800-01-01")

  writeDatastore(resource_dummy,
                 location = list(collection = "PDS",
                                 instance = "Covid-19",
                                 table = "Resource_Metadata"),
                 server = CONFIG$stats_odata_api$environment)
}

to_observations <- function(config, metadata, directory, data) {
  # the EPIC database will silently drop any rows with NA in these columns
  COMPULSORY_COLS <- c(
    "ResourceID", "Period", "Value", "Unit", "Measure", "Multiplier"
  )

  if (is.na(check_null(metadata$data_format_long))) {
    names(data) <- c("parameter", config$value_names)
    data <- reshape2::melt(data, id.vars = "parameter")
  }

  Observations <- tibble(
    ResourceID = rep(metadata$ResourceID, nrow(data)),
    Geo = get_label(data, check_null(metadata$Geo), nrow(data)),
    GeoUnit = rep(check_null(metadata$GeoUnit), nrow(data)),
    Duration = rep(check_null(metadata$Duration), nrow(data)),
    Period = get_period(data, config, metadata, directory, nrow(data)),
    Label1 = get_label(data, check_null(metadata$Label1), nrow(data)),
    Label2 = get_label(data, check_null(metadata$Label2), nrow(data)),
    Label3 = get_label(data, check_null(metadata$Label3), nrow(data)),
    Label4 = get_label(data, check_null(metadata$Label4), nrow(data)),
    Label5 = get_label(data, check_null(metadata$Label5), nrow(data)),
    Label6 = get_label(data, check_null(metadata$Label6), nrow(data)),
    Value = as.numeric(as.character(data$value)),
    Unit = rep(check_null(metadata$Unit), nrow(data)),
    Measure = rep(check_null(metadata$Measure), nrow(data)),
    NullReason = NA,
    Multiplier = rep(check_null(metadata$Multiplier), nrow(data)),
    Status = NA
  )

  Observations_na <- Observations %>%
    filter(if_any(all_of(COMPULSORY_COLS), ~ is.na(.)))
  if (nrow(Observations_na) > 0) {
    Observations_na_str <- paste(capture.output(print(Observations_na)),
                                 collapse = "\n")
    group_names_str <- glue::glue_collapse(unique(unlist(config$group_names)),
                                           sep = ",")
    warning(paste0(
      "There is ", nrow(Observations_na), " (out of ", nrow(data),
      ") rows with NA in EPIC-mandatory columns for:\n",
      ">  ", metadata$ResourceID, " (", metadata$Title, ")\n",
      ">> ", group_names_str, "\n",
      Observations_na_str
    ))
  }

  return(Observations)
}

to_resource <- function(config, metadata, directory) {
  Resource <- tibble(
    LatestDataTable = "",
    ResourceID = metadata$ResourceID,
    Subject = metadata$Subject,
    Title = metadata$Title,
    Description = check_null(metadata$Description),
    Notes = check_null(metadata$Notes),
    Caveats = check_null(metadata$Caveats),
    Source = check_null(metadata$Source),
    SourceURL = check_null(metadata$SourceURL),
    Modified = as.Date(file.info(paste0(directory, config$filename))$mtime, tz = "NZ"),
    Frequency = check_null(metadata$Frequency),
    Var1 = check_null(metadata$Var1),
    Var2 = check_null(metadata$Var2),
    Var3 = check_null(metadata$Var3),
    Var4 = check_null(metadata$Var4),
    Var5 = check_null(metadata$Var5),
    Var6 = check_null(metadata$Var6)
  )
  return(Resource)
}


get_api_df <- function(table_name, ResourceIDs, extra_filter = "") {
  #' Query either of the Covid-19Indicators tables (Resources or Observations).
  #'
  #' The API has limit on the length of URL it can accept. This function uses
  #' a (trial-and-error) limit on the number of IDs to meet this constraint and
  #' sends multiple request (split on ResourceIDs) if necessary.
  #'
  #' TODO: change so it checks the length of URL to determine number of requests
  VALID_TABLE_VALS <- c("Resources", "Observations")
  if (!(table_name %in% VALID_TABLE_VALS)) {
    stop(paste(
      table, "is not a valid 'table_name' argument for get_api_df.",
      "Valid values are:", paste(VALID_TABLE_VALS, collapse = ", ")
    ))
  }
  NUM_IDS_PER_REQUEST <- 150
  num_requests <- ceiling(length(ResourceIDs) / NUM_IDS_PER_REQUEST)

  table_df <- NULL
  for (i in 1:num_requests) {
    ids_subset <- ResourceIDs[(150 * (i - 1)):(min(150 * i, length(ResourceIDs)))]
    ids_subset_joined <- paste(shQuote(ids_subset), collapse = ",")

    odata_url <-
      paste0(
        ODATA_URL,
        "Covid-19Indicators/", table_name,
        "?$filter=(ResourceID in (", ids_subset_joined, "))",
        extra_filter
      ) %>%
      URLencode()

    while (!is.null(odata_url)) {
      response <- odata_url %>%
        httr::GET(httr::add_headers("Ocp-Apim-Subscription-Key" = ODATA_TOKEN))
      if (response$status_code != 200) {
        warning_msg <- paste0(
          "API ERROR\n",
          "  Status code: ", response$status_code, "\n",
          "  Message: ", http_status(response)$message, "\n",
          "  Category: ", http_status(response)$category, "\n",
          "  Request URL: ", odata_url, "\n"
        )
        warning(warning_msg)
        return(NULL)
      }
      response_content <- response %>%
        httr::content(as = "text", encoding = "UTF-8") %>%
        jsonlite::fromJSON(flatten = TRUE)

      table_df <- rbind(table_df, response_content$value)
      odata_url <- response_content$'@odata.nextLink'
    }
  }
  return(table_df)
}


get_period <- function(data, config, metadata, directory, len) {
  config_data_type <- check_null(config$data_type)
  metadata_data_type <- check_null(metadata$data_type)
  metadata_period <- check_null(metadata$Period)
  # Some surveys require peroid to be a hard coded date
  if (!is.na(as.Date(as.character(metadata_period), format = "%d/%m/%Y"))) {
    return(rep(metadata_period, len))
  }
  # trade period is spread across two columns
  else if (!is.na(metadata_period) & metadata_period == "trade_period") {
    return(paste0(data$parameter, "-", data$variable))
  }
  # some config barcharts are actually timeseries
  else if (metadata_data_type == "TimeSeries" & !is.na(metadata_data_type)) {
    return(data$parameter)
  }

  else if (config_data_type == "BarChart" & !is.na(config_data_type)) {
    update_date <- as.Date(file.info(paste0(directory, config$filename))$mtime, tz = "NZ")
    return(rep(update_date, len))
  }
  return(data$parameter)
}


get_api_query_str <- function(indicator, group_name) {
  #' Return appropriate URL-encoded query for Stats OData API
  #'
  #' Note: returned value does not have domain (odata_url) attached
  group_name <- group_name %>%
    stringr::str_replace("'", "''") %>%  # escape single quotes
    stringr::str_replace("&", "%26")     # escape ampersand, more work done below

  if (!is.null(indicator$group_names)) {
    # ESFS indicators have more dimensions than other indicators
    esfs_filter <- dplyr::if_else(
      !is.null(indicator$esfs_indicator),
      paste0(" and ", indicator$esfs_indicator, " eq ",
             sQuote(indicator$indicator_name)),
      ""
    )

    query_str <- paste0(
      "Covid-19Indicators/Observations",
      "?$filter=ResourceID eq ", sQuote(indicator$api_resource_id),
      " and ", indicator$group_names, " eq ", sQuote(group_name),
      esfs_filter,
      "&$select=", indicator$parameter, ",", indicator$value_names, ",Value",
      "&$orderby=", indicator$parameter, ",", indicator$value_names
    )
  } else if (!is.null(indicator$value_names)) {
    grp_idx <- which(sapply(indicator$groups, function(x) x$name) == group_name)
    if (length(indicator$groups) > 1 && length(grp_idx) == 1) {
      id_in_group_level <- !is.null(indicator$groups[[grp_idx]]$api_resource_id)
    } else {
      id_in_group_level <- FALSE
    }
    group_filter <- dplyr::if_else(
      group_name != "undefined_name" && !id_in_group_level,
      paste0(" and ", indicator$value_names, " eq ", sQuote(group_name)),
      ""
    )

    query_str <- paste0(
      "Covid-19Indicators/Observations",
      "?$filter=ResourceID eq ", sQuote(indicator$api_resource_id),
      group_filter,
      "&$select=", indicator$parameter, ",", indicator$value_names, ",Value",
      "&$orderby=", indicator$parameter, ",", indicator$value_names
    )
  } else {
    query_str <- paste0(
      "Covid-19Indicators/Observations",
      "?$filter=ResourceID eq ", sQuote(indicator$api_resource_id),
      "&$select=", indicator$parameter, ",Value",
      "&$orderby=", indicator$parameter
    )
  }
  # Has to undo the url-encoding of the manually inserted '%26' from above
  url_encoded_query_str <- query_str %>%
    URLencode(repeated = TRUE) %>%
    stringr::str_replace("%2526", "%26")
  return(url_encoded_query_str)
}


surface_group_level_info <- function(indicator, group_name) {
  #' Translates raw indicator definition (from json element) to more use-able
  #' object by surfacing group-level info to top-level.
  #'
  #' e.g. If API ID is specified within groups, the ID corresponding to
  #'      `group_name` will be placed at top-level
  if (length(indicator$groups) <= 1) {
    # No (proper) groups
    return(indicator)
  }
  grp_idx <- which(sapply(indicator$groups, function(x) x$name) == group_name)
  if (length(grp_idx) == 0) {
    return(indicator)
  }

  if (!is.null(indicator$groups[[grp_idx]]$api_resource_id)) {
    indicator$api_resource_id <- indicator$groups[[grp_idx]]$api_resource_id
  }
  if (!is.null(indicator$groups[[grp_idx]]$parameter)) {
    indicator$parameter <- indicator$groups[[grp_idx]]$parameter
  }
  if (!is.null(indicator$groups[[grp_idx]]$value_names)) {
    indicator$value_names <- indicator$groups[[grp_idx]]$value_names
  }
  return(indicator)
}


# If the indicator config specifies parameter then time series parameter is added as the label
# If the indicator config specifies variable then variable dimension is added to the label
# Else return the config label for all observations
get_label <- function(data, label, len) {
  if (label == "parameter" & !is.na(label)) {
    return(data$parameter)
  }
  else if (label == "variable" & !is.na(label)) {
    return(data$variable)
  }
  else if (label == "new_source" & !is.na(label)) {
    return(data$new_source)
  }
  else if (label == "series" & !is.na(label)) {
    return(data$series)
  }
  return(rep(label, len))
}

check_null <- function(value) {
  if (is.null(value)) {
    return(NA)
  }
  return(value)
}

writeDatastore <- function(data, location, version = NULL, server = "prd") {
  if (server == "uat") {
    baseURL <- "https://epl-uat/statsnz-epl-data/api/v1/collections/"
  }
  else{
    baseURL <- "https://epl-prd/statsnz-epl-data/api/v1/collections/"
  }
  # POST /api/v1/collections/{collectionCode}/{collectionInstanceCode}/datasets/{tableName}
  if (is.null(version)) {
    theUrl <- paste0(baseURL,
                     location$collection,
                     "/",
                     location$instance,
                     "/datasets/",
                     location$table)
  } else {
    # add versions.
    theUrl <- paste0(baseURL,
                     location$collection,
                     "/",
                     location$instance,
                     "/datasets/",
                     location$table,
                     "/versions/",
                     version)
  }
  result <- httr::POST(url = theUrl,
                       httr::use_proxy(""),
                       httr::config(http_version = 2L),
                       httr::config(ssl_verifypeer = 0L),
                       httr::authenticate("", "", type = "gssnegotiate"),
                       httr::content_type_json(),
                       body = toJSON(data), encode = "raw" )
  # boolean success code
  if (http_error(result)) {
    errorMessage <- httr::content(result, "text", encoding = "UTF-8")
    message(errorMessage)
  } else {
    errorMessage <- "success"
  }
  errorMessage
}


#' @title Get latest version
#' @description Gets the latest version of a datastore table and adds one
#'
#' @param location   The datastore location, a list with collecton, instance and table
#' @param server     prd or uat
#'
#' @return version   version number
#'
#' @export
getLatestVersion <- function(location, server = "prd") {
  # set-up version to write to
  # GET statsnz-epl-metadata/api/v1/collections/{collection}/{collectionInstance}/tables/{tableName}/versions
  if (server == "uat") {
    baseURL <- "https://epl-uat/statsnz-epl-metadata/api/v1/collections/"
  }
  else {
    baseURL <- "https://epl-prd/statsnz-epl-metadata/api/v1/collections/"
  }
  theUrl <- paste0( baseURL,
                    location$collection,
                    "/",
                    location$instance,
                    "/tables/",
                    location$table,
                    "/versions")
  # get current version
  result <- httr::GET(url = theUrl,
                      httr::use_proxy(""),
                      httr::config(http_version = 2L),
                      httr::config(ssl_verifypeer = 0L),
                      httr::authenticate("", "", type = "gssnegotiate"))

  version <- jsonlite::fromJSON(httr::content(result, "text", encoding = "UTF-8"))
  if (length(version) > 0) {
    version <- version$VersionNumber[1]
  } else{
    version <- 0
  }
  version
}
xaviermiles/portalLite documentation built on Jan. 28, 2022, 9:10 a.m.