Nothing
#' @name initWorkflow
#' @aliases initWorkflow
#' @title initWorkflow
#' @description \code{initWorkflow} allows to init a workflow
#'
#' @usage initWorkflow(file, dir, jobDirPath, handleMetadata, session)
#'
#' @param file a JSON or YAML configuration file
#' @param dir a directory where to execute the workflow.
#' @param jobDirPath a directory set-up for the job. Default is \code{NULL} means it will be created
#' during initialization of the workflow, otherwise the path provided will be used.
#' @param handleMetadata Default is \code{TRUE}. Metadata contacts/entities/dictionary will be handled.
#' If set to \code{FALSE}, they will not be handled. This is used for example in geoflow Shiny app
#' where we want to initialize config without handling metadata to inherit software connections and
#' test dynamically the metadata validity.
#' @param session a \pkg{shiny} session object (optional) to run geoflow in a \pkg{shiny} context
#'
#' @author Emmanuel Blondel, \email{emmanuel.blondel1@@gmail.com}
#' @export
#'
initWorkflow <- function(file, dir, jobDirPath = NULL, handleMetadata = TRUE, session = NULL){
wd <- getwd()
on.exit(setwd(wd))
#optional shiny session object
if(!is.null(session)) if(!is(session, "ShinySession")){
stop("The 'session' argument should specify an object of class 'ShinySession'")
}
#file/config
file <- tools::file_path_as_absolute(file)
config = NULL
config_ext = NULL
switch(
mime::guess_type(file),
"application/json" = {
config = jsonlite::read_json(file)
config_ext = "json"
},
"text/yaml" = {
config = yaml::read_yaml(file)
config_ext = "yml"
},
stop("Configuration file should be a valid JSON or YAML file")
)
#keep the source
config$src <- file
config$src_config <- config
#worfklow config$loggers
config <- add_config_logger(config)
if(config$verbose){
cat("Session info\n")
config$logger$separator("=")
print(sessionInfo())
cat(paste("* RAM:", round(benchmarkme::get_ram()/1e9,1),"GB\n"))
cat(paste("* CPU:", benchmarkme::get_cpu()$no_of_cores,"cores\n"))
config$logger$separator("=")
cat("Workflow initialization\n")
config$logger$separator("=")
config$logger$INFO("Init Workflow configuration")
}
config_file <- config$src
#working dir (where jobs will be created)
config$root <- dirname(file)
if(is.null(config$wd)) config$wd <- tools::file_path_as_absolute(dir)
if(is.null(jobDirPath)) jobDirPath <- initWorkflowJob(dir = dir)
config$job <- jobDirPath
config$logger$INFO("Workflow job directory: %s", jobDirPath)
#copy configuration file
setwd(jobDirPath)
file.copy(from = config_file, to = getwd())
#rename copied file
job_config_file = paste0("job.", config_ext)
file.rename(from = file.path(getwd(), basename(config_file)), to = job_config_file)
setwd(wd)
#profile
profile <- NULL
if(!is.null(config$profile)){
config$logger$INFO("Creating workflow profile...")
profile <- geoflow_profile$new()
#identifier
if(!is.null(config$profile$id)){
profile$setId(config$profile$id)
}else{
config$logger$WARN("Configuration file TO UPDATE: 'id' should be defined in profile!")
profile$setId(config$id)
}
config$logger$INFO("Workflow ID: %s", profile$id)
#other workflow metadata
if(!is.null(config$profile$name)){
profile$setName(config$profile$name)
config$logger$INFO("Workflow name: %s", profile$name)
}
if(!is.null(config$profile$project)){
profile$setProject(config$profile$project)
config$logger$INFO("Workflow project: %s", profile$project)
}
if(!is.null(config$profile$organization)){
profile$setOrganization(config$profile$organization)
config$logger$INFO("Workflow organization: %s", profile$organization)
}
if(!is.null(config$profile$logos)){
for(logo in config$profile$logos) profile$addLogo(logo)
}
#workflow mode
cfg_mode <- NULL
if(!is.null(config$profile$mode)){
cfg_mode <- config$profile$mode
}else{
config$logger$WARN("Configuration file TO UPDATE: 'mode' should be defined in profile!")
cfg_mode <- config$mode
}
if(!is.null(cfg_mode)){
allowedModes <- c("raw","entity")
if(!(cfg_mode %in% allowedModes)) {
errMsg <- sprintf("The workflow '%s' mode is incorrect. Allowed values are [%s]",
cfg_mode, paste(allowedModes, collapse=","))
config$logger$ERROR(errMsg)
stop(errMsg)
}
profile$mode <- cfg_mode
}else{
warnMes <- "No workflow mode specified, 'raw' mode specified by default!"
config$logger$WARN(warnMes)
profile$mode <- "raw"
}
#environment
if(!is.null(config$profile$environment)) if(!is.null(config$profile$environment$file)){
config$logger$INFO("Loading environment from env file '%s'", basename(config$profile$environment$file))
env_vars_before <- as.list(Sys.getenv())
config$session_env <- env_vars_before
loaded <- try(dotenv::load_dot_env(file = config$profile$environment$file))
if(is(loaded,"try-error")){
errMsg <- sprintf("Error while trying to load environment from env file '%s'", basename(config$profile$environment$file))
config$logger$ERROR(errMsg)
stop(errMsg)
}else{
env_vars_after <- as.list(Sys.getenv())
env_vars <- setdiff(env_vars_after, env_vars_before)
config$logger$INFO("Workflow environment:")
hide_env_vars <- c("PASSWORD", "PWD", "TOKEN")
if(!is.null(config$profile$environment$hide_env_vars)) hide_env_vars <- unlist(config$profile$environment$hide_env_vars)
for(env_var_name in names(env_vars)){
env_var_value <- env_vars[[env_var_name]]
if(any(sapply(hide_env_vars, regexpr, env_var_name)>0)) env_var_value <- "**********"
config$logger$INFO("* %s = %s", env_var_name, env_var_value)
}
}
}
#options
cfg_options <- NULL
if(!is.null(config$profile$options)){
cfg_options <- config$profile$options
}else{
config$logger$WARN("Configuration file TO UPDATE: 'options' should be defined in profile!")
cfg_options <- config$options
}
config$logger$INFO("Setting geoflow global options...")
config$profile$options <- cfg_options
if(!is.null(config$profile$options$line_separator)){
config$logger$INFO("Setting option 'line_separator' to '%s'", config$profile$options$line_separator)
set_line_separator(config$profile$options$line_separator)
}
for(option_name in names(config$profile$options)){
profile$setOption(option_name, config$profile$options[[option_name]])
}
}
#session_wd
config$session_wd <- getwd()
#load source scripts
#--------------------
source_scripts <- config$dependencies$scripts
if(length(source_scripts)>0){
config$logger$INFO("Loading R scripts...")
invisible(sapply(source_scripts,function(script){
config$logger$INFO("Loading R script '%s'...", script)
source(script)
}))
}
#load environment
config$logger$INFO("Load workflow environment")
config <- load_workflow_environment(config, session)
#set profile (R6)
config$profile_config <- config$profile
if(!is.null(profile)) config$profile <- profile
#software components
if(!is.null(config$software)){
supportedSoftware <- list_software(raw = TRUE)
software_configs <- config$software
config$software <- list()
config$software$input <- list()
config$software$output <- list()
for(software in software_configs){
if(is.null(software$id)){
errMsg <- "Sofware 'id' is missing. Please make sure to give an id to all declared software"
config$logger$INFO(errMsg)
stop(errMsg)
}
if(is.null(software$type)){
errMsg <- "Sofware 'type' is missing. Please make sure to specify a type ('input' or 'output') to all declared software"
config$logger$INFO(errMsg)
stop(errMsg)
}
if(!(software$type %in% c("input","output"))){
errMsg <- sprintf("Incorrect type value (%s') for software '%s'", software$type, software$id)
}
#embedded software or custom?
embeddedSoftware <- is.null(software$handler)
if(embeddedSoftware){
if(is.null(software$software_type)){
errMsg <- sprintf("The 'software_type' is missing for software '%s'", software$id)
config$logger$INFO(errMsg)
stop(errMsg)
}
}
if(!(software$software_type %in% sapply(supportedSoftware, function(x){x$software_type})) & embeddedSoftware){
errMsg <- sprintf("Embedded Software type '%s' not supported by geoflow. Check the list of embedded software with R code: list_software()", software$software_type)
config$logger$ERROR(errMsg)
stop(errMsg)
}
client <- NULL
if(embeddedSoftware){
target_software <- supportedSoftware[sapply(supportedSoftware, function(x){x$software_type == software$software_type})][[1]]
config$logger$INFO("Configuring %s software '%s' (%s)", software$type, software$id, software$software_type)
target_software$setId(software$id)
target_software$setType(software$type)
if(!is.null(software$parameters)) target_software$setParameters(software$parameters)
#check software dependencies
target_software$checkPackages()
#get handler instance
client <- target_software$getHandlerInstance()
software$actions <- target_software$actions
}else{
client_handler <- eval(parse(text=software$handler))
if(is(client_handler,"try-error")){
errMsg <- sprintf("Error while evaluating software handler '%s'", software$handler)
config$logger$ERROR(errMsg)
stop(errMsg)
}
client_params <- unlist(software[names(software)!="handler"])
if(software$id == "ocs"){
config$logger$INFO("OCS connection details")
print(client_params)
}
client <- client_handler(client_params)
}
if(!is.null(config$software[[software$type]][[switch(software$type,"input"=software$id,"output"=software$software_type)]])){
if(software$type=="input") errMsg <- sprintf("An input software with id '%s' has been already declared!", software$id)
if(software$type=="output") errMsg <- sprintf("An output software with software type '%s' has been already declared!", software$software_type)
config$logger$ERROR(errMsg)
stop(errMsg)
}
config$software[[software$type]][[software$software_type]] <- if(!is.null(client)) client else software #return config in case software handler has no return
config$software[[software$type]][[paste(software$software_type,"config",sep="_")]] <- software
}
}
if(length(config$registers)==0) config$registers <- list()
config_registers <- config$registers #store eventual config$registers
#loading dictionary
#metadata elements
if(!is.null(config$metadata)){
if(is.null(config$metadata$content)) config$metadata$content <- list()
#metadata dictionary
cfg_md_dictionary <- config$metadata$dictionary
if(!is.null(cfg_md_dictionary)){
#manage dictionary handlers as array/object as backward compatibility for object
isarray_dictionary <- length(names(cfg_md_dictionary))==0
if(!isarray_dictionary){
config$metadata$dictionary <- list(config$metadata$dictionary)
cfg_md_dictionary <- config$metadata$dictionary
}
#collating data structures (feature types) from handlers
config$logger$INFO("Loading dictionary data structures...")
config$src_dictionary <- list()
dicts <- lapply(cfg_md_dictionary, function(x){
config$logger$INFO("Loading data structure definitions from '%s' [with '%s' handler]...",
x$source, x$handler)
md_dict_handler <- loadMetadataHandler(config, x, type = "dictionary")
if(md_dict_handler$status == "deprecated"){
config$logger$WARN(sprintf("Dictionary handler '%s' is deprecated. Notes: %s",
md_dict_handler$id, ifelse(nzchar(md_dict_handler$notes), md_dict_handler$notes, "-")))
}
config$logger$INFO("Execute handler to load dictionary data structures...")
dict <- md_dict_handler$fun(
handler = md_dict_handler,
source = x$source,
config = config
)
if(!is(dict, "geoflow_dictionary")){
errMsg <- "The output of the dictionary handler should return an object of class 'geoflow_dictionary'"
config$logger$ERROR(errMsg)
stop(errMsg)
}
#keep source dictionary part of the config
config$src_dictionary[[length(config$src_dictionary)+1]] <<- attr(dict, "source")
return(dict)
})
#build single top-level dictionary
dictionary <- geoflow_dictionary$new()
for(dict in dicts){
for(ft in dict$featuretypes){
if(!ft$id %in% sapply(dictionary$featuretypes,function(x){x$id})) dictionary$addFeatureType(ft)
}
for(reg in dict$registers){
if(!reg$id %in% sapply(dictionary$registers,function(x){x$id})) dictionary$addRegister(reg)
}
}
if(!is(dictionary, "geoflow_dictionary")){
errMsg <- "The output of the dictionary handler should return an object of class 'geoflow_dictionary'"
config$logger$ERROR(errMsg)
stop(errMsg)
}
config$logger$INFO("Successfuly fetched dictionary !")
config$metadata$content$dictionary <- dictionary
config$registers <- dictionary$getRegisters()
if(length(config$registers)==0) config$registers <- list()
}
}
#registers
#registers can be configured either through config or through dictionnary
if(!is.null(config_registers)){
fetched_registers <- list()
if(length(config_registers)>0){
for(reg in config_registers){
register_to_fetch <- NULL
isCustom <- FALSE
if(!is.null(reg$script)){
isCustom <- TRUE
}
if(!isCustom){
if(is.null(reg$id)){
errMsg <- "An 'register' should have an id. Please check your configuration file. In case of a custom register, the id should be the function name."
config$logger$ERROR(errMsg)
stop(errMsg)
}
available_registers <- list_registers(raw=TRUE)
available_register_ids <- sapply(available_registers, function(x){x$id})
if(!(reg$id %in% available_register_ids)){
stop(sprintf("The register '%s' is not among available geoflow registers", reg$id))
}
register_to_fetch <- available_registers[[1]]
}else{
source(reg$script)
customfun <- eval(parse(text = reg$id))
if(is(customfun,"try-error")){
errMsg <- sprintf("Error while trying to evaluate custom function '%s", reg$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(!is(customfun,"function")){
errMsg <- sprintf("'%s' is not a function!", reg$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
register_to_fetch <- geoflow_register$new(
id = reg$id,
def = reg$def,
fun = customfun
)
}
if(!is.null(register_to_fetch)) register_to_fetch$fetch(config)
if(!(reg$id %in% sapply(fetched_registers, function(x){x$id}))){
fetched_registers <- c(fetched_registers, register_to_fetch)
}
}
if(all(sapply(config$registers, function(x){class(x)[1] == "geoflow_register"}))){
config$registers <- c(config$registers, fetched_registers)
}else{
config$registers <- fetched_registers
}
}
}
#metadata elements
if(handleMetadata) if(!is.null(config$metadata)){
config$logger$INFO("Loading metadata elements...")
if(is.null(config$metadata$content)) config$metadata$content <- list()
#metadata contacts
cfg_md_contacts <- config$metadata$contacts
if(!is.null(cfg_md_contacts)){
#manage contact handlers as array/object as backward compatibility for object
isarray_contacts <- length(names(cfg_md_contacts))==0
if(!isarray_contacts){
config$metadata$contacts <- list(config$metadata$contacts)
cfg_md_contacts <- config$metadata$contacts
}
#collating contacts from contact handlers
config$logger$INFO("Loading metadata contacts...")
config$src_contacts <- list()
contacts <- do.call("c", lapply(cfg_md_contacts, function(x){
config$logger$INFO("Loading metadata contacts from '%s' [with '%s' handler]...",
x$source, x$handler)
md_contact_handler <- loadMetadataHandler(config, x, type = "contacts")
if(md_contact_handler$status == "deprecated"){
config$logger$WARN(sprintf("Contact handler '%s' is deprecated. Notes: %s",
md_contact_handler$id, ifelse(nzchar(md_contact_handler$notes), md_contact_handler$notes, "-")))
}
config$logger$INFO("Execute contact handler to load contacts...")
contacts <- md_contact_handler$fun(
handler = md_contact_handler,
source = x$source,
config = config
)
if(!is(contacts, "list") | !all(sapply(contacts, is, "geoflow_contact"))){
errMsg <- "The output of the contacts handler should return a list of objects of class 'geoflow_entity_contact'"
config$logger$ERROR(errMsg)
stop(errMsg)
}
#keep source contacts part of the config
config$src_contacts[[length(config$src_contacts)+1]] <<- attr(contacts, "source")
return(contacts)
}))
config$logger$INFO("Successfuly fetched %s contacts!",length(contacts))
config$metadata$content$contacts <- contacts
config$logger$INFO("Successfuly loaded %s contacts!",length(contacts))
}
#metadata entities
cfg_md_entities <- config$metadata$entities
if(!is.null(cfg_md_entities)){
#manage entity handlers as array/object as backward compatibility for object
isarray_entities <- length(names(cfg_md_entities))==0
if(!isarray_entities){
config$metadata$entities <- list(config$metadata$entities)
cfg_md_entities <- config$metadata$entities
}
#collating entities from entity handlers
config$logger$INFO("Loading metadata entities...")
config$src_entities <- list()
entities <- do.call("c", lapply(cfg_md_entities, function(x){
config$logger$INFO("Loading metadata entities from '%s' [with '%s' handler]...",
x$source, x$handler)
md_entity_handler <- loadMetadataHandler(config, x, type = "entities")
if(md_entity_handler$status == "deprecated"){
config$logger$WARN("Entity handler '%s' is deprecated. Notes: %s",
md_entity_handler$id, ifelse(nzchar(md_entity_handler$notes), md_entity_handler$notes, "-"))
}
config$logger$INFO("Execute handler to load entities...")
entities <- md_entity_handler$fun(
handler = md_entity_handler,
source = x$source,
config = config
)
if(!is(entities, "list") | !all(sapply(entities, is, "geoflow_entity"))){
errMsg <- "The output of the entities handler should return a list of objects of class 'geoflow_entity'"
config$logger$ERROR(errMsg)
stop(errMsg)
}
#keep source entities part of the config
config$src_entities[[length(config$src_entities)+1]] <<- attr(entities, "source")
return(entities)
}))
config$logger$INFO("Successfuly fetched %s entities!",length(entities))
if(!is.null(config$metadata$content$contacts)){
config$logger$INFO("Enrich metadata entities from directory of contacts")
directory_of_contacts <- config$metadata$content$contacts
#enrich entity contacts from contacts directory
entities <- lapply(entities, function(entity){
newentity <- entity$clone()
newentity$contacts <- lapply(entity$contacts, function(contact){
newcontact <- NULL
if(is(contact,"geoflow_contact")){
id <- contact$identifiers[["id"]]
role <- contact$role
if(!is.null(id)) if(!is.na(id)){
contact_from_directory <- directory_of_contacts[sapply(directory_of_contacts, function(x){id %in% x$identifiers})]
if(!all(is.null(contact_from_directory))){
if(length(contact_from_directory)>0){
if(length(contact_from_directory)>1 & length(unique(sapply(contact_from_directory, function(x){x$role})))>1){
config$logger$WARN("Warning: 2 contacts identified with same id/role! Check your contacts")
}
contact_from_directory <- contact_from_directory[[1]]
newcontact <- contact_from_directory$clone(deep=TRUE)
newcontact$setIdentifier(key = "id", id)
newcontact$setRole(role)
}
}else{
config$logger$WARN(sprintf("Warning: contact %s is not registered in directory! Contact will be ignored!", id))
}
}
}
return(newcontact)
})
newentity$contacts <- newentity$contacts[!sapply(newentity$contacts, is.null)]
#we look at data provenance
if(!is.null(entity$provenance)) if(is(entity$provenance, "geoflow_provenance")){
newprov <- entity$provenance$clone()
if(length(entity$provenance$processes)>0){
processors <- entity$contacts
if(length(processors)>0) processors <- entity$contacts[sapply(entity$contacts, function(x){startsWith(x$role, "processor")})]
newprov$processes <- lapply(1:length(entity$provenance$processes), function(i){
process <- entity$provenance$processes[[i]]
newprocess <- process$clone()
if(length(processors)>0) for(j in 1:length(processors)){
processor = processors[[j]]
processor_to_add = regexpr(paste0("[",i,"]$"), processor$role) > 0 || processor$role == "processor"
if(processor_to_add){
processor_from_directory <- directory_of_contacts[sapply(directory_of_contacts, function(x){any(processor$identifiers %in% x$identifiers)})]
if(length(processor_from_directory)>0){
processor_from_directory <- processor_from_directory[[1]]
new_processor <- processor_from_directory
new_processor$setIdentifier(key = "id", processor$identifiers[["id"]])
new_processor$setRole("processor")
newprocess$addProcessor(new_processor)
}
}
}
return(newprocess)
})
}
newentity$setProvenance(newprov)
}
return(newentity)
})
}
config$metadata$content$entities <- entities
config$logger$INFO("Successfuly loaded %s entities!",length(entities))
}
}
#add function to get easiy metadata elements
config$getDictionary <- function(){
return(config$metadata$content$dictionary)
}
config$getEntities <- function(){
return(config$metadata$content$entities)
}
config$getContacts = function(){
return(config$metadata$content$contacts)
}
#Actions
if(!is.null(config$actions)){
config$actions <- lapply(config$actions,function(action){
if(!action$run) return(NULL)
action_to_trigger <- NULL
isCustom <- FALSE
if(!is.null(action$script)){
isCustom <- TRUE
}
if(!isCustom){
if(is.null(action$id)){
errMsg <- "An 'action' should have an id. Please check your configuration file. In case of a custom action, the id should be the function name."
config$logger$ERROR(errMsg)
stop(errMsg)
}
#we try to find it among embedded actions
available_actions <- list_actions(raw=TRUE)
available_action_ids <- sapply(available_actions, function(x){x$id})
if(!(action$id %in% available_action_ids)){
stop(sprintf("The action '%s' is not among available geoflow actions", action$id))
}
action_to_trigger <- .geoflow$actions[sapply(.geoflow$actions, function(x){x$id==action$id})][[1]]$clone(deep=TRUE)
#check action dependencies
action_to_trigger$checkPackages()
#options
if(length(action$options)>0) if(!all(names(action$options) %in% names(action_to_trigger$available_options))){
errMsg <- sprintf("Option(s) [%s] invalid for action '%s'", paste0(setdiff(names(action$options), names(action_to_trigger$available_options)), collapse=","), action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
action_to_trigger$options <- action$options
}else{
if(config$profile$mode == "entity"){
customfun <- source(get_config_resource_path(config, action$script))$value
if(is(customfun,"try-error")){
errMsg <- sprintf("Error while trying to evaluate custom function'%s", action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(!is(customfun,"function")){
errMsg <- sprintf("'%s' is not a function!", action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
funparams <- unlist(names(formals(customfun)))
if(!("action" %in% funparams)){
config$logger$WARN(sprintf("Action '%s' - Custom action arguments: [%s]", action$id, paste(funparams, collapse=",")))
errMsg <- sprintf("Missing parameter 'action' in function '%s'", action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(!("entity" %in% funparams)){
config$logger$WARN(sprintf("Custom action arguments: [%s]", paste(funparams, collapse=",")))
errMsg <- sprintf("Missing parameter 'entity' in function '%s'", action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
if(!("config" %in% funparams)){
config$logger$WARN(sprintf("Custom action arguments: [%s]", paste(funparams, collapse=",")))
errMsg <- sprintf("Missing parameter 'config' in function '%s'", action$id)
config$logger$ERROR(errMsg)
stop(errMsg)
}
action_to_trigger <- geoflow_action$new(
id = action$id,
type = action$type,
def = action$def,
fun = customfun,
options = action$options
)
}else if(config$profile$mode == "raw"){
action_to_trigger <- geoflow_action$new(
id = action$script,
type = action$type,
def = action$def,
script = action$script,
options = action$options
)
}
}
return(action_to_trigger)
})
config$actions <- config$actions[!sapply(config$actions, is.null)]
}
#create sub directories as listed in the configuration file
job_targets <- sapply(config$actions, function(x){if(!is.na(x$target)) if(x$target=="job") return(x$target_dir)})
job_targets <- job_targets[!sapply(job_targets,is.null)]
directories <- unique(job_targets)
directories <- directories[!is.na(directories)]
for(directory in directories){
if (!file.exists(directory)){
dir_name <- file.path(config$job, directory)
config$logger$INFO("Creating '%s' job directory: %s",directory, dir_name)
dir.create(dir_name)
}
}
if(config$profile$mode == "raw"){
config$logger$INFO("Copying raw action scripts to job directory")
if(length(config$actions)>0) for(i in 1:length(config$actions)){
action = config$actions[[i]]
config$logger$INFO("Copying %s ...", action$script)
file.copy(from = file.path(config$wd, action$script), to = jobDirPath)
config$actions[[i]]$script = basename(action$script)
}
}
return(config)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.