R/01_weather_era5.R

Defines functions era5.collect_weather era5.rename_era5_to_global_df era5.rename_era5_to_global era5.rename_global_to_era5 era5.corr_weather_vars

era5.corr_weather_vars <- function() {
  # dict[era5_var] = weather_variable
  list(
    "temp_min" = "air_temp_min",
    "temp_max" = "air_temp_max",
    "sp" = "atmos_pres",
    "wd" = "wd",
    "ws" = "ws",
    "total_precip" = "precip",
    # "humid" = "humidity", # Not valid
    "dewpoint_temp" = "dewpoint_temp",
    "pbl_min" = "pbl_min",
    "pbl_max" = "pbl_max"
  )
}


era5.rename_global_to_era5 <- function(global_var,
                                       only_keep_existing_vars = F) {
  corr <- era5.corr_weather_vars()
  corr_reverse <- as.list(setNames(names(corr), corr))
  x <- recode(global_var, !!!corr_reverse)
  if (only_keep_existing_vars) x <- x[x %in% names(corr)]
  x
}


era5.rename_era5_to_global <- function(era5_var) {
  corr_vars <- era5.corr_weather_vars()
  recode(era5_var, !!!corr_vars)
}


era5.rename_era5_to_global_df <- function(data) {
  data %>%
    rename_all(era5.rename_era5_to_global)
}


#' Adding ERA5 weather data to the weather data frame
#'
#' @param dates: a list of dates for which we want weather data
#' @param locations: a dataframe with location_id and geometry columns
#' @param vars: the list of variables we are interested in
#'
#' @return a data frame with location_id, geometry, date, temp_min, temp_max ...
#' @export
#'
#' @examples
era5.collect_weather <- function(location_dates,
                                 weather_vars,
                                 update = T) {
  dates <- location_dates %>%
    as.data.frame() %>%
    group_by(location_id) %>%
    dplyr::mutate(date = list(seq.Date(date(date_from), date(date_to), by = "1 day"))) %>%
    select(date) %>%
    tidyr::unnest(date) %>%
    pull(date) %>%
    unique()

  if(update){
    print("=== Refreshing ERA5 files ===")
    era5.refresh_files(dates)
    print("=== Done ===")  
  }
  
  locations_sf <- st_as_sf(location_dates %>%
    ungroup() %>%
    dplyr::select(location_id, geometry) %>%
    dplyr::distinct(location_id, .keep_all = T))


  dir_era5 <- era5.folder_era5()
  print(glue("Found {length(list.files(dir_era5, '\\\\.tif'))} tif files in ERA5 folder")) 
  weather_vars_era5 <- era5.rename_global_to_era5(weather_vars,
    only_keep_existing_vars = T
  )

  print(glue("=== Extracting ERA5 for {length(dates)} dates at {nrow(locations_sf)} locations ==="))

  coords <- terra::vect(locations_sf)
  era5_data <- pbapply::pblapply(dates, function(date) {
    tryCatch(
      {
        tif_file <- era5.date_to_filepaths(date, dir_era5)
        if (!file.exists(tif_file)) {
          return(NULL)
        }
        tif <- terra::rast(tif_file)
        # Only keep the variables we are interested in
        tif <- terra::subset(tif, names(tif) %in% weather_vars_era5)
        tb <- tibble("date" = date)
        extracted_values <- terra::extract(tif, coords)
        
        # Determine the intersection of the specified variables and weather_vars_era5
        temp_vars <- intersect(c("temp_min", "temp_max"), weather_vars_era5)
        pressure_vars <- intersect(c("sp"), weather_vars_era5)
        precip_vars <- intersect(c("total_precip"), weather_vars_era5)
        
        dplyr::bind_cols(location_id = coords$location_id,
                         tb,
                         as.data.frame(extracted_values) %>%
                           dplyr::select(-c(ID))) %>%
                           # K to C for temp_min and temp_max
          dplyr::mutate_at(all_of(temp_vars), ~ . - 273.15) %>%
          # Pa to millibar for atmos_pres
          dplyr::mutate_at(all_of(pressure_vars), ~ . / 1e2) %>%
          # m to mm for precip
          dplyr::mutate_at(all_of(precip_vars), ~ . * 1e3)
      },
      error = function(error) {
        return(NULL)
      }
    )
  }) %>%
    bind_rows()


  weather <- era5_data %>%
    era5.rename_era5_to_global_df() %>%
    select(c(
      "location_id", "date",
      intersect(names(.), weather_vars)
    )) %>%
    group_by(location_id) %>%
    tidyr::nest() %>%
    rename(weather = data)
  print("=== Done ===")

  return(weather)
}


era5.date_to_filepath <- function(date, extension = "tiff") {
  fname <- paste0("era5_", strftime(as.Date(date), "%Y-%m-%d"), ".", extension)
  file.path(era5.folder_era5(), fname)
}

era5.folder_era5 <- function() {
  dir_era5 <- utils.get_env("DIR_ERA5")
  if (dir_era5 == "") {
    stop("Missing DIR_ERA5 folder")
  }
  return(dir_era5)
}

extract_date <- function(fname) {
  return(strsplit(strsplit(fname, split = "[_]")[[1]][2], split = "[.]")[[1]][1])
}

era5.processed_dates <- function() {
  dir_era5 <- era5.folder_era5()
  ds <- list.files(
    path = dir_era5,
    pattern = "*.tif",
    full.names = F,
    recursive = TRUE
  )
  dates <- unique(lubridate::ymd(lapply(as.list(ds), extract_date)))
  # dates <- unique(lapply(as.list(ds), extract_date))
  date <- dates[!is.na(dates)]
  return(c(date))
}

era5.processable_dates <- function(dates) {
  # Returns list of dates (in dates argument) that are available on CDS
  # All dates are available on CDS (few exceptions, but no easy way to get in advance)
  return(dates[dates < lubridate::today() - 3])
}


#' Build global daily tif files with temp_min, temp_max, temp_mean, RH_min, RH_max etc.
#' And result files are saved in dir_era5
#'
#' @param date
#' @param cache_rs
#'
#' @return
#' @export
#'
#' @examples
era5.process_date <- function(date, force_redownload_nc = F, remove_nc = T, min_layers=24, time_out=NULL) {
  tryCatch(
    {
      era5_long_vars <- c(
        "boundary_layer_height",
        "10m_u_component_of_wind",
        "10m_v_component_of_wind",
        "2m_temperature",
        "2m_dewpoint_temperature",
        "surface_pressure",
        "total_precipitation"
      )
      
      if(date > ymd('2024-11-05')){ # after 2024-11-05, the file extension is zip
        fname <- paste0("era5_", date, ".zip")
      } else {
        fname <- paste0("era5_", date, ".nc")
      }
      
      path <- file.path(era5.folder_era5(), fname)
      
      # Downlad nc file if need be
      file_ncs <- era5.download_nc(force=force_redownload_nc,
                       date=date,
                       era5_long_vars=era5_long_vars,
                       folder=era5.folder_era5(),
                       min_layers=min_layers,
                       time_out=time_out)
      
      
      # file_ncs could be either one general nc or two ncs: instant and accum
      # if two, then tp is in accum and others are in instant
      
      fpath_accum <- ifelse(length(file_ncs)==1, file_ncs, file_ncs[grep("accum", file_ncs)])
      fpath_instant <- ifelse(length(file_ncs)==1, file_ncs, file_ncs[grep("instant", file_ncs)])
      layers_dict <- list()

      
      calc_raster <- function(fpath, var, fun) {
        brick <-raster::brick(fpath, var = var)
        if(raster::nlayers(brick) < min_layers){
          stop("Not enough layers")
        }
        raster::calc(brick, fun = fun, na.rm = T)
      }

      layers_dict["dewpoint_temp"] <- calc_raster(fpath_instant, "d2m", mean)
      layers_dict["pbl"] <- calc_raster(fpath_instant, "blh", mean)
      layers_dict["pbl_min"] <- calc_raster(fpath_instant, "blh", min)
      layers_dict["pbl_max"] <- calc_raster(fpath_instant, "blh", max)
      layers_dict["temp"] <- calc_raster(fpath_instant, "t2m", mean)
      layers_dict["temp_max"] <- calc_raster(fpath_instant, "t2m", max)
      layers_dict["temp_min"] <- calc_raster(fpath_instant, "t2m", min)

      # Wind direction and wind speed calculation
      # We do it hour by hour and then average wd and ws
      layer_names <- terra::rast(fpath_instant) %>% names()
      
      
      u10_lyrs <- grep("u10_", layer_names, value=T) %>% sort()
      v10_lyrs <- grep("v10_", layer_names, value=T) %>% sort()
  
      uvs <- terra::rast(fpath_instant, lyrs=c(u10_lyrs, v10_lyrs))
      
      uv2wdws <- function(uv) {
        degrees <- function(radians) 180 * radians / pi
        mathdegs <- degrees(atan2(uv[[2]][], uv[[1]][]))
        wdcalc <- case_when(mathdegs > 0 ~ mathdegs, T ~mathdegs + 360)
        wd <- case_when(wdcalc < 270 ~ 270 - wdcalc, T ~270 - wdcalc + 360)
        ws <- sqrt(uv[[1]][]^2 + uv[[2]][]^2)
        
        uv[] <- cbind(wd, ws)
        names(uv) <- c('wd' ,'ws')
        return(uv)
      }

      uvs2wdws <- function(uvs){
        
        suffixes <- gsub("u10_", "", grep("u10_", names(uvs), value=T))
        # For each hour
        wdwss <- pbapply::pblapply(suffixes, function(suffix){
          uv2wdws(uvs[[c(paste0("u10_", suffix), paste0("v10_", suffix))]])
        })
        
        # Average per day
        wdwss = terra::rast(wdwss)
        ws <- terra::app(wdwss['^ws$'], mean)
        
        # For wind direction, we sum u and v across all hours and take the resulting direction
        wd <- uv2wdws(terra::rast(list(terra::app(uvs['u10'], sum), terra::app(uvs['v10'], sum))))['wd']
        
        return(list(wd=wd, ws=ws))
      }
      
      wdws <- uvs2wdws(uvs)

      layers_dict["wd"] <- raster::raster(wdws$wd)
      layers_dict["ws"] <- raster::raster(wdws$ws)

      # # Humidity calculation: https://confluence.ecmwf.int/pages/viewpage.action?pageId=171411214
      # Rdry <- 287.0597 # Constant for dry air J kg−1 K−1
      # Rvap <- 461.5250 # Constant for water vapor J kg−1 K−1
      # # a1, a3, a4, and T0:
      # # Parameters for Teten's formula according to Buck (1981)
      # # Buck, A. L. (1981). New equations for computing vapor pressure and
      # # enhancement factor. J. Appl. Meteorol., 20, 1527–1532
      # a1 <- 611.21 # units: Pa
      # a3 <- 17.502 # units: K
      # a4 <- 32.19 # units: K
      # T0 <- 273.16 # units: K
      # 
      # # Written by Stefanos. Hubert: doesn't seem to give decent values
      # # let's use dewpoint temp instead
      # humidity_fun <- function(t2msp) {
      #   E <- exp(17.502 * ((t2msp[1] - 273.16) / (t2msp[1] - 32.19)))
      #   huss <- (0.6219808 * E) / (t2msp[2] - ((1 - Rdry / Rvap) * E))
      #   # ps-((1-Rdry/Rvap)*E)
      #   return(huss)
      # }
      # huss <- raster::calc(t2msp, humidity_fun)
      # layers_dict["humid"] <- huss

      sp <- calc_raster(fpath_instant, "sp", mean)
      tp <- calc_raster(fpath_accum, "tp", sum) 
      t2m <- layers_dict["temp"]
      t2msp <- raster::stack(c(t2m, sp))

      layers_dict["sp"] <- sp
      layers_dict["total_precip"] <- tp

      output_path <- file.path(era5.folder_era5(),
                               paste0("era5_", date, ".tiff"))

      tif <- raster::stack(layers_dict)
      names(tif) <- names(layers_dict)

      terra_ras <- terra::rast(tif)
      t <- terra::writeRaster(terra_ras, output_path, overwrite = TRUE)
      if(remove_nc){
        sapply(file_ncs, file.remove)
      } 
    },
    error = function(error) {
      # QUICKFIX try print as well
      print(paste("Failed for date", date, error))
      warning(paste("Failed for date", date, error))
      return(NULL)
    }
  )
}



#' Generate the missing files for indicated dates
#'
#' @param dates
#'
#' @return
#' @export
#'
#' @examples
era5.refresh_files <- function(dates) {
  processed <- era5.processed_dates()
  processable <- era5.processable_dates(dates)
  to_process <- processable[!processable %in% processed]

  cache_rs <- NULL
  for (i in seq_along(to_process)) {
    d <- to_process[i]
    print(paste("Processing", d))
    era5.process_date(d)
  }
}

era5.date_to_filepaths <- function(date, dir_era5) {
  # returns the path to the daily file for date `date`
  return(file.path(dir_era5, paste0("era5_", date, ".tiff")))
}

era5.reprocess_files <- function(date_from='2015-01-01', date_to=lubridate::today(), force = F){
  
  dates <- seq.Date(as.Date(date_from), as.Date(date_to), by = "1 day")
  dir_era5 <- era5.folder_era5()
  
  n_layers <- pbapply::pbsapply(dates, function(date){
    tif_file <- era5.date_to_filepaths(date, dir_era5)
    if (!file.exists(tif_file)) {
      return(0)
    }
    tif <- terra::rast(tif_file)
    return(length(names(tif)))
  })

  dates_to_process <- dates[force | n_layers < 11]
  pbapply::pblapply(dates_to_process, era5.process_date, remove_nc = T, time_out=3600)
  
}


#' Download nc file(s) from ECMWFR for a single date
#' NOTE: it can return one or two nc files depending on the date
#'  (they started splitting between instant and accum in November 2024)
#'
#' @param force 
#' @param date 
#' @param folder 
#' @param era5_long_vars 
#' @param min_layers 
#' @param time_out 
#'
#' @return
#' @export
#'
#' @examples
era5.download_nc <- function(force,
                              date,
                              folder,
                              era5_long_vars,
                              min_layers=NULL,
                              time_out=NULL
                             )
{
  
  
  # Files could take several format: one .nc or two (instant & accum) depending on the date
  file_path_glob <- paste0("era5_", date, ".*\\.nc")
  files_found <- list.files(folder, pattern = file_path_glob, full.names = T)
  do_download <- force | (length(files_found)==0)

  
  # Check if existing files are complete or proper nc files (some may actually be zip)
  if(!is.null(min_layers) & length(files_found) >0){
    for(f in files_found){
      tryCatch({
        n_layers <- suppressWarnings(raster::nlayers(raster::brick(f)))
        if(n_layers < min_layers){
          message(glue("Redownloading {f} as it seems incomplete"))
          do_download <- do_download | TRUE
        }    
      }, error=function(e){
        message(glue("Redownloading {f} as it seems corrupted"))
        do_download <<-TRUE
      })
    }
  }
  
  if(!do_download){
    return(files_found)
  }
  
  if(do_download){
    year <- strsplit(as.character(date), split = "[-]")[[1]][1]
    month <- strsplit(as.character(date), split = "[-]")[[1]][2]
    day <- strsplit(as.character(date), split = "[-]")[[1]][3]
    request <- list(
      dataset_short_name = "reanalysis-era5-single-levels",
      product_type = "reanalysis",
      format = "netcdf",
      variable = era5_long_vars,
      year = year,
      month = month,
      day = day,
      time = c(
        "00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00", "07:00", "08:00",
        "09:00", "10:00", "11:00", "12:00", "13:00", "14:00", "15:00", "16:00", "17:00",
        "18:00", "19:00", "20:00", "21:00", "22:00", "23:00"
      ),
      # area is specified as N, W, S, E
      area = c(90, -180, -90, 180),
      target = paste0("era5_", date)
    )
    
    if (!"ecmwfr" %in% keyring::keyring_list()$keyring) {
      keyring::keyring_create(keyring = "ecmwfr", password = utils.get_env("KEYRING_PASSWORD"))
    }
    
    try(keyring::keyring_unlock(keyring = "ecmwfr", password = utils.get_env("KEYRING_PASSWORD")))
    
    ecmwfr::wf_set_key(
      user = utils.get_env("CDS_UID", error_if_not_found = T),
      key = utils.get_env("CDS_TOKEN", error_if_not_found = T)
    )
    
    # Timeout: short if recent date, long otherwise
    # as it is likely to be unavailable
    if(is.null(time_out)){
      if(lubridate::today() - as.Date(date) < 5){
        time_out <- 300
      } else {
        time_out <- 900
      }  
    }
 
    file_path <- ecmwfr::wf_request(
      user = utils.get_env("CDS_UID"),
      request = request,
      transfer = TRUE,
      path = folder,
      time_out = time_out,
      verbose = TRUE
    )
    
    # Check if it is a zip
    is_zip <- tryCatch({
      unzip(file_path, list=T)
      T
    }, error=function(e){
      return(FALSE)
    })
    
    is_nc <- tryCatch({
      ncdf4::nc_open(file_path)
      T
    }, error=function(e){
      return(FALSE)
    })
    
    if(!xor(is_nc, is_zip)){
      stop("ERA5 file is neither zip or nc.")
    }
    
    if(is_nc){
      # Append .nc
      file_path_new <- paste0(file_path, ".nc")
      file.rename(
        file_path,
        file_path_new
      )
      return(file_path_new)
    }
    
    if(is_zip){
      # Unzip
      file_names <- unzip(file_path, list=T)$Name
      to_new <- function(x) gsub("data_stream-oper_stepType", paste0("era5_", date), x)
      file_names_new <- sapply(file_names, to_new, USE.NAMES = F)
      file_paths_new <- file.path(folder, file_names_new)
      # file_names_new <- 
      unzip(file_path, exdir=folder)
      file.remove(file_path)
      sapply(file_names, function(f){
        file.rename(
          file.path(folder, f),
          file.path(folder, to_new(f))
        )
      })
      return(file_paths_new)
    }
  }
}
energyandcleanair/covid_impact documentation built on Jan. 17, 2025, 12:38 a.m.