Nothing
#' @name writeWorkflowJobDataResource
#' @aliases writeWorkflowJobDataResource
#' @title writeWorkflowJobDataResource
#' @description \code{writeWorkflowJobDataResource} allows to transform datasource into different formats
#'
#' @usage writeWorkflowJobDataResource(entity, config, obj,
#' useFeatures, resourcename, useUploadSource,
#' createIndexes, overwrite, append, chunk.size,
#' type)
#'
#' @param entity a entity object as read by \code{geoflow_entity}
#' @param config a configuration object as read by \code{initWorkflow}
#' @param obj a sf file
#' @param useFeatures a boolean condition to define if features must be attach to obj file
#' @param resourcename name of data input
#' @param useUploadSource a boolean condition to define if resourcename is same as uploadSource information
#' @param createIndexes a boolean condition for possibility to create indexes for each column
#' @param overwrite a boolean condition for writing to DB. Default is \code{TRUE}
#' @param append a boolean condition for appending to DB existing table. Default is \code{FALSE}
#' @param chunk.size an object of class \code{integer} giving the size of chunks to apply for DB upload.
#' Default is equal to \code{OL}, meaning DB upload will be done without chunking.
#' @param type format to convert into. Formats supported: shp, csv, gpkg, parquet, dbtable
#'
#' @author Alexandre Bennici, \email{bennicialexandre@@gmail.com}
#' @export
#'
writeWorkflowJobDataResource <- function(entity, config, obj=NULL,
useFeatures=FALSE, resourcename=NULL, useUploadSource=FALSE,
createIndexes=FALSE, overwrite=TRUE, append = FALSE, chunk.size = 0L,
type = c("shp", "csv", "gpkg", "parquet", "dbtable")){
config$logger$INFO("------------------------------------------------------------------------------")
config$logger$INFO("Write data resource start")
if(is.null(obj) && !useFeatures){
errMsg<-"Error: specify at least an object or useFeatures = TRUE"
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(useFeatures){
config$logger$INFO("No object specified, use entity$data$features by default")
obj<-entity$data$features
}
if(is.null(resourcename) && !useUploadSource){
errMsg<-"Error: specify a resourcename or useUploadSource = TRUE"
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(useUploadSource){
config$logger$INFO("No resourcename specified, use entity$data$uploadSource by default")
resourcename<-entity$data$uploadSource[[1]]
}
if(!type %in% c("shp","dbtable","csv","gpkg", "parquet")){
errMsg<-"Error: unrecognized type, specify a type at this list : 'csv','shp','dbtable','gpkg', 'parquet'"
config$logger$ERROR(errMsg)
stop(errMsg)
}
switch(type,
"csv" = {
config$logger$INFO("Format type: %s", type)
sf::st_write(obj = obj, paste0("./data/",resourcename,".csv"))
config$logger$INFO("write csv file to data job directory")
},
"shp" = {
resourcename_parts <- unlist(strsplit(resourcename, "\\."))
if(length(resourcename_parts)>1) resourcename <- resourcename_parts[1]
config$logger$INFO("Format type: %s", type)
sf::st_write(obj = obj, paste0("./data/",resourcename,".shp"), delete_layer = TRUE)
config$logger$INFO("write shp file to data job directory")
zip::zipr(zipfile = file.path(getwd(), "data", paste0(resourcename, ".zip")), files = list.files(path= file.path(getwd(), "data"), pattern = resourcename, full.names = T))
config$logger$INFO("zip datafiles for server export")
if(useFeatures){
config$logger$INFO("object use to data features")
df<-st_read(paste0("./data/",resourcename,".shp"), quiet=TRUE)
if(attr(df, "sf_column")== "geometry"){
df$the_geom <- st_geometry(df)
attr(df, "sf_column") <- "the_geom"
df$geometry <- NULL
}
entity$data$features<-df
}
},
"gpkg" = {
resourcename_parts <- unlist(strsplit(resourcename, "\\."))
if(length(resourcename_parts)>1) resourcename <- resourcename_parts[1]
config$logger$INFO("Format type: %s", type)
geom_colname = colnames(obj)[sapply(colnames(obj), function(x){is(obj[[x]], "sfc")})][1]
sf::st_write(obj = obj, dsn=paste0("./data/",resourcename,".gpkg"), layer_options = paste0("GEOMETRY_NAME=",geom_colname))
config$logger$INFO("write gpkg file to data job directory")
zip::zipr(zipfile = file.path(getwd(), "data", paste0(resourcename, ".zip")), files = list.files(path= file.path(getwd(), "data"), pattern = resourcename, full.names = T))
config$logger$INFO("zip datafiles for server export")
},
"parquet" = {
resourcename_parts <- unlist(strsplit(resourcename, "\\."))
if(length(resourcename_parts)>1) resourcename <- resourcename_parts[1]
config$logger$INFO("Format type: %s", type)
sfarrow::st_write_parquet(obj = obj, dsn=paste0("./data/",resourcename,".parquet"))
config$logger$INFO("write gpkg file to data job directory")
},
"dbtable" = {
if(is.null(config$software$output$dbi)){
errMsg<-"Error: no dbi output detected, branch one in the config"
config$logger$ERROR(errMsg)
stop(errMsg)
}
config$logger$INFO("Format type: %s", type)
if(overwrite){
config$logger$INFO("Overwrite is 'true', try to drop table %s", resourcename)
drop_sql <- sprintf("DROP TABLE IF EXISTS %s", resourcename)
try(DBI::dbExecute(config$software$output$dbi, drop_sql), silent = TRUE)
}
if(class(obj)[1]=="sf"){
#srid
srid <- st_crs(obj, parameters = TRUE)$epsg
if(is.null(srid)){
srid <- 4326
st_crs(obj) <- srid
}
#sf upload
if(chunk.size>0){
chunks <- split(obj, ceiling(seq_along(1:nrow(obj))/chunk.size))
config$logger$INFO("Upload DB data by chunks [nb of chunks: %s]", length(chunks))
#1st chunk with overwrite param applied
config$logger$INFO("Upload data to DB: chunk 1 of %s", length(chunks))
sf::st_write(obj = chunks[[1]], dsn = config$software$output$dbi, layer =resourcename ,
layer_options = paste0('OVERWRITE=',ifelse(overwrite,'YES','NO')),
append = append)
#then apply to other chunks with fixed overwrite=NO
if(length(chunks)>1){
chunk_idx = 2
outchunk = lapply(chunks[2:length(chunks)], function(chunk){
config$logger$INFO("Upload data to DB: chunk %s of %s", chunk_idx, length(chunks))
sf::st_write(obj = chunk, dsn = config$software$output$dbi, layer = resourcename,
append = TRUE)
chunk_idx <<- chunk_idx+1
})
}
}else{
#no chunking
config$logger$INFO("Upload data to DB as single chunk")
sf::st_write(obj = obj, dsn = config$software$output$dbi, layer =resourcename ,
layer_options = paste0('OVERWRITE=',ifelse(overwrite,'YES','NO')),
append = append)
}
#enforce srid/geometry type in geometry_columns
geometryName <- attr(obj, "sf_column")
geometryType <- unlist(strsplit(class(st_geometry(obj))[1], "sfc_"))[2]
if(!is.na(srid)){
alter_sql <- sprintf("ALTER TABLE %s DROP CONSTRAINT enforce_srid_the_geom;", resourcename)
try(DBI::dbExecute(config$software$output$dbi, alter_sql), silent = TRUE)
alter_sql <- sprintf("UPDATE %s SET %s = ST_SetSRID(%s, %s);", resourcename, geometryName, geometryName, srid)
try(DBI::dbExecute(config$software$output$dbi, alter_sql), silent = TRUE)
alter_sql <- sprintf("ALTER TABLE %s ADD CONSTRAINT enforce_srid_the_geom CHECK (st_srid(%s) = (%s));", resourcename, geometryName, srid)
try(DBI::dbExecute(config$software$output$dbi, alter_sql), silent = TRUE)
alter_sql <- sprintf("alter table %s alter column %s type geometry(%s, %s);",
resourcename, geometryName, geometryType, srid)
try(DBI::dbExecute(config$software$output$dbi, alter_sql), silent = TRUE)
}
#create index for each colunm except geometry
if(createIndexes){
for (column_name in setdiff(names(obj),geometryName)){
config$logger$INFO("Drop index for column : %s",column_name)
drop_index_sql <- sprintf("DROP INDEX %s_%s_idx;", resourcename, column_name)
try(DBI::dbExecute(config$software$output$dbi, drop_index_sql), silent = TRUE)
}
for (column_name in setdiff(names(obj),geometryName)){
config$logger$INFO("Indexes created for column : %s",column_name)
create_index_sql <- sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", resourcename, column_name, resourcename, column_name)
DBI::dbExecute(config$software$output$dbi, create_index_sql)
}
}
}else{
if(chunk.size>0){
chunks <- split(obj, ceiling(seq_along(1:nrow(obj))/chunk.size))
config$logger$INFO("Upload DB data by chunks [nb of chunks: %s]", length(chunks))
#1st chunk with overwrite param applied
config$logger$INFO("Upload data to DB: chunk 1 of %s", length(chunks))
dbWriteTable(conn=config$software$output$dbi, name =resourcename, value=chunks[[1]],
overwrite = overwrite, append = append)
#then apply to other chunks with fixed overwrite=NO
if(length(chunks)>1){
chunk_idx = 2
outchunk = lapply(chunks[2:length(chunks)], function(chunk){
config$logger$INFO("Upload data to DB: chunk %s of %s", chunk_idx, length(chunks))
dbWriteTable(conn=config$software$output$dbi, name =resourcename, value=chunk, append = TRUE)
chunk_idx <<- chunk_idx+1
})
}
}else{
#no chunking
config$logger$INFO("Upload data to DB as single chunk")
if(config$software$output$dbi_config$parameters$drv == "PostgreSQL"){
config$logger$INFO("-> Using RPostgreSQL enhanced methods")
#simple hack to create the table with proper data types
DBI::dbWriteTable(conn=config$software$output$dbi, name =resourcename, value=obj[1,],
overwrite = overwrite, append = append)
DBI::dbSendQuery(conn=config$software$output$dbi, sprintf("delete from %s", resourcename) )
requireNamespace("RPostgreSQL")
obj[is.na(obj)] <- "NA"
for(col in colnames(obj)) if(!is(obj[,col],"character")) obj[,col] = as(obj[,col],"character")
sql <- paste0("COPY ", resourcename, "(",
paste0(names(obj), collapse=", "), ") FROM STDIN NULL 'NA' ")
RPostgreSQL::postgresqlpqExec(config$software$output$dbi, sql)
RPostgreSQL::postgresqlCopyInDataframe(config$software$output$dbi, obj)
rs <- RPostgreSQL::postgresqlgetResult(config$software$output$dbi)
onerec = DBI::dbGetQuery(conn=config$software$output$dbi, sprintf("select * from %s limit 1", resourcename))
if("row.names" %in% colnames(onerec)){#hack, find better solution
DBI::dbSendQuery(conn=config$software$output$dbi, sprintf("ALTER TABLE %s DROP COLUMN \"row.names\"", resourcename))
}
}else{
config$logger$INFO("-> Using standard DBI")
DBI::dbWriteTable(conn=config$software$output$dbi, name =resourcename, value=obj,
overwrite = overwrite, append = append)
}
}
#create index for each column
if(createIndexes){
for (column_name in names(obj)){
config$logger$INFO("Drop index for column : %s",column_name)
drop_index_sql <- sprintf("DROP INDEX %s_%s_idx;", resourcename, column_name)
try(DBI::dbExecute(config$software$output$dbi, drop_index_sql), silent = TRUE)
}
for (column_name in names(obj)){
config$logger$INFO("Create index for column : %s",column_name)
create_index_sql <- sprintf("CREATE INDEX %s_%s_idx ON %s (%s);", resourcename, column_name, resourcename, column_name)
DBI::dbExecute(config$software$output$dbi, create_index_sql)
}
}
}
}
)
config$logger$INFO("Write data resource end")
config$logger$INFO("------------------------------------------------------------------------------")
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.