db_writing <- function(){
# Create one
try(dotenv::load_dot_env(file = ".env"), silent = T)
try(readRenviron(".Renviron"), silent = T)
db_url <- Sys.getenv("CREA_DB_URL")
if(db_url==""){
stop("Missing database url. Please define CREA_DB_URL in your environment")
}
db <- dbx::dbxConnect(url=Sys.getenv("CREA_DB_URL"))
return(db)
}
create_new_process_id <- function(preferred_id=NULL){
id_0 <- ifelse(!is.null(preferred_id),
preferred_id,
paste0('process_', format(Sys.time(), "%Y%m%d_%H%M%S")))
existing_ids <- processes() %>%
dplyr::distinct(id) %>%
dplyr::pull(id)
id <- id_0
v <- 2
while(id %in% existing_ids){
id <- paste0(id_0,"_v",v)
v <- v+1
}
return(id)
}
#' Retrieve or create process in database
#'
#' After processing data, you may want to upload it in crea database. Yet you need
#' a process record to define what processing the measurements went through.
#' This function helps you either retrieve or create a new process with given parameters.
#'
#' @param filter json definition of filter used
#' @param agg_spatial json definition of spatial aggregation
#' @param agg_temp json definition of temporal aggregation
#' @param deweather json definition of deweathering applied
#'
#' @return id (string) of the process found or created
#' @export
#'
retrieve_or_create_process <- function(filter, agg_spatial, agg_temp, deweather,
preferred_name=NULL, processes=NULL){
if(is.null(processes)){
processes <- processes()
}
safe_fromJSON <- function(x){
if(is.na(x)){return(NA)}
if(x=='null'){return(NULL)}
jsonlite::fromJSON(x)
}
equal_lists <- function(l1, l2) {
if(is.null(l1) && length(l2)==1 && is.null(l2[[1]])){return(T)}
if(is.null(l2) && length(l1)==1 && is.null(l1[[1]])){return(T)}
# Test it two named lists are equal, ignoring order
length(setdiff(l1,l2)) + length(setdiff(l2,l1)) ==0
}
# Check existing process
p <- processes %>%
dplyr::mutate(
filter=purrr::map(filter, safe_fromJSON),
agg_spatial=purrr::map(agg_spatial, safe_fromJSON),
agg_temp=purrr::map(agg_temp, safe_fromJSON),
deweather=purrr::map(deweather, safe_fromJSON)) %>%
dplyr::mutate(
filter_equal=purrr::map_lgl(filter, equal_lists, l2=!!safe_fromJSON(filter)),
agg_spatial_equal=purrr::map_lgl(agg_spatial, equal_lists, l2=!!safe_fromJSON(agg_spatial)),
agg_temp_equal=purrr::map_lgl(agg_temp, equal_lists, l2=!!safe_fromJSON(agg_temp)),
deweather_equal=purrr::map_lgl(deweather, equal_lists, l2=!!safe_fromJSON(deweather))) %>%
dplyr::filter(
filter_equal & agg_spatial_equal &agg_temp_equal & deweather_equal
) %>%
dplyr::select(id, filter, agg_spatial, agg_temp, deweather)
if(nrow(p)==0){
db <- db_writing()
id <- create_new_process_id(preferred_name)
p <- tibble::tibble(id=id,filter=filter,agg_spatial=agg_spatial,agg_temp=agg_temp,deweather=deweather)
dbx::dbxInsert(db, "processes", p)
dbx::dbxDisconnect(db)
return(id)
}else if(nrow(p)==1){
return(p$id[1])
}else{
warning("Found several processes matching filters. Should never happen. Returning first one nonetheless")
return(p$id[1])
}
}
#' Upsert processed measurements in crea database
#'
#' @param meas
#'
#' @return
#' @export
#'
#' @examples
upsert_meas_old <- function(meas){
meas <- meas %>% dplyr::rename(pollutant=poll)
required_cols <- c("date","pollutant","unit","location_id","process_id","source","value")
if(!all(required_cols %in% colnames(meas))){
stop(paste("Missing columns ", paste(setdiff(required_cols, colnames(meas)))))
}
db <- db_writing()
tryCatch({
# We upload by chunks to avoid SSL EOF error
ms <- split(meas, (seq(nrow(meas))-1) %/% 100000)
upload_chunk <- function(m){
dbx::dbxUpsert(db, "measurements_new", m %>% dplyr::select(all_of(required_cols)),
where_cols=c('date', 'pollutant', 'unit', 'process_id', 'location_id', 'source'))
}
lapply(ms, upload_chunk)
}, error=function(err){
dbx::dbxDisconnect(db)
stop(paste("Upserting failed:",err))
})
dbx::dbxDisconnect(db)
}
upsert_meas <- function(meas){
meas <- meas %>% dplyr::rename(pollutant=poll)
required_cols <- c("date","pollutant","unit","location_id","process_id","source","variable","value")
conflict_cols <- c("date","pollutant","unit","location_id","process_id","source","variable")
if(!all(required_cols %in% colnames(meas))){
stop(paste("Missing columns ", paste(setdiff(required_cols, colnames(meas)))))
}
db <- db_writing()
tryCatch({
# We uplad the whole df in a temporary table
table.name <- paste0("temp_meas_",substr(uuid::UUIDgenerate(), 1, 8))
RPostgres::dbWriteTable(
conn=db,
name=table.name,
value=meas,
row.names = FALSE,
overwrite = FALSE,
append = FALSE,
field.types = NULL,
temporary = TRUE,
copy = TRUE
)
query <- sprintf(
"INSERT INTO measurements(%s)
SELECT %s FROM %s
ON CONFLICT (%s)
DO UPDATE SET value = EXCLUDED.value;",
paste(required_cols,collapse=","),
paste(required_cols,collapse=","),
table.name,
paste(conflict_cols,collapse=",")
)
dbx::dbxExecute(db, query)
RPostgres::dbRemoveTable(db, table.name) # Not sure this is required
}, error=function(err){
dbx::dbxDisconnect(db)
stop(paste("Upserting failed:",err))
})
dbx::dbxDisconnect(db)
}
#' #' Upsert locations in crea database
#' OBSOLETE
#' #'
#' #' @param meas
#' #'
#' #' @return
#' #' @export
#' #'
#' #' @examples
#' upsert_locations <- function(locs){
#'
#' required_cols <- c("id","name","city","country","timezone","source","geometry","type")
#' if(!all(required_cols %in% colnames(locs))){
#' stop(paste("Missing columns ", paste(setdiff(required_cols, colnames(locs)))))
#' }
#'
#' db <- db_writing()
#' tryCatch({
#' dbx::dbxUpsert(db, "locations", locs %>% dplyr::select(all_of(required_cols)),
#' where_cols=c('id'))
#' }, error=function(err){
#' dbx::dbxDisconnect(db)
#' stop(paste("Upserting failed:",err))
#' })
#' dbx::dbxDisconnect(db)
#' }
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.