R/executeWorkflowJob.R

Defines functions executeWorkflowJob

Documented in executeWorkflowJob

#' @name executeWorkflowJob
#' @aliases executeWorkflowJob
#' @title executeWorkflowJob
#' @description \code{executeWorkflowJob} allows to execute a workflow job
#'
#' @usage executeWorkflowJob(config, jobdir, queue, monitor)
#'                 
#' @param config a configuration object as read by \code{initWorkflow}
#' @param jobdir the Job directory. Optional, by default inherited with the configuration.
#' @param queue an \pkg{ipc} queue to use geoflow in \pkg{geoflow-shiny}
#' @param monitor a monitor function to increase progress bar 
#' 
#' @author Emmanuel Blondel, \email{emmanuel.blondel1@@gmail.com}
#' @export
#'    
executeWorkflowJob <- function(config, jobdir = NULL, queue = NULL, monitor = NULL){
  
    wd <- getwd()
    on.exit(setwd(wd))
  
    if(is.null(jobdir)) jobdir <- config$job
  
    if(config$verbose){
      config$logger$separator("=")
      cat("Workflow Execution\n")
      config$logger$separator("=")
      config$logger$INFO("Executing workflow job...")
    }
    
    #options
    skipDataDownload = FALSE
    if(!is.null(config$profile$options[["skipFileDownload"]])){
      config$logger$WARN("Global option 'skipFileDownload' is deprecated, use 'skipDataDownload instead!")
      skipDataDownload = config$profile$options[["skipDataDownload"]]
    }
    skipDataDownload <- if(!is.null(config$profile$options[["skipDataDownload"]])) config$profile$options[["skipDataDownload"]] else FALSE
    skipEnrichWithDatatypes <- if(!is.null(config$profile$options[["skipEnrichWithDatatypes"]])) config$profile$options[["skipEnrichWithDatatypes"]] else FALSE
    skipEnrichWithData = if(!is.null(config$profile$options[["skipEnrichWithData"]])) config$profile$options[["skipEnrichWithData"]] else FALSE
    skipEnrichWithDataSubjects = if(!is.null(config$profile$options[["skipEnrichWithDataSubjects"]])) config$profile$options[["skipEnrichWithDataSubjects"]] else FALSE
    dataSubjectsToExclude = if(!is.null(config$profile$options[["dataSubjectsToExclude"]])) config$profile$options[["dataSubjectsToExclude"]] else c()
    
    #Actions onstart
    config$logger$separator("-")
    config$logger$INFO("Executing software actions 'onstart' ...")
    
    #function to run software actions
    runSoftwareActions <- function(config, softwareType, actionType){
      software_list <- config$software[[softwareType]]
      if(length(software_list)>0){
        software_names <- names(software_list)
        software_names <- software_names[!endsWith(software_names, "_config")]
        for(software_name in software_names){
          software <- software_list[[software_name]]
          software_cfg <- software_list[[paste0(software_name, "_config")]]
          if(length(software_cfg$actions)>0){
            if(actionType %in% names(software_cfg$actions)){
              config$logger$INFO("Executing input software action '%s'",actionType)
              software_cfg$actions[[actionType]](config, software, software_cfg)
            }
          }
        }
      }
    }
    #run software 'onstart' actions
    runSoftwareActions(config, "input", "onstart")
    runSoftwareActions(config, "output", "onstart")
    
    #workflow actions
    config$logger$separator("-")
    config$logger$INFO("Executing entity actions ...")
    actions <- config$actions
    if(is.null(actions)){
      config$logger$WARN("No actions enabled for this workflow!")
    }else{
      if(length(actions)==0){
        config$logger$WARN("No actions enabled for this workflow!")
      }else{
        config$logger$INFO("Workflow mode: %s", config$profile$mode)
        config$logger$INFO("Workflow with %s actions", length(actions))
        for(i in 1:length(actions)){config$logger$INFO("Action %s: %s", i, actions[[i]]$id)}
      }
    }
    
    if(config$profile$mode == "entity"){
     
      #execute entity-based actions
      entities <- config$metadata$content$entities
      if(!is.null(entities)){
        
        #TODO refactor actions in case they have specific behaviors to triggered early before entity iterator (eg. Zenodo cleaning)
        withZenodo <- any(sapply(actions, function(x){x$id=="zen4R-deposit-record"}))
        
        #cleaning in case Zenodo action is enabled with clean properties 
        if(withZenodo){
          ZENODO_CONFIG <- config$software$output$zenodo_config
          clean <- ZENODO_CONFIG$properties$clean
          if(!is.null(clean)) if(!is.null(clean$run)) if(as.logical(clean$run)){
            config$logger$INFO("Zenodo action 'clean' activated: deleting deposits prior to new deposits!")
            if(is.null(clean$query) && length(clean$doi)==0 && length(clean$community)==0){
              config$logger$INFO("Zenodo: no query or list of DOIs specified for cleaning: cleaning all draft deposits")
              config$software$output$zenodo$deleteRecords()
            }else{
              #selective cleaning by Zenodo ElasticSearch query
              if(!is.null(clean$query)){
                config$logger$INFO("Zenodo: cleaning draft deposits based on query '%s'",clean$query)
                config$software$output$zenodo$deleteRecords(q = clean$query)
              }
              #selective cleaning by prereserved DOI(s)
              if(!is.null(clean$doi)){
                config$logger$INFO("Zenodo: cleaning draft deposits with prereserved DOIs [%s]", paste(clean$doi, collapse=","))
                invisible(lapply(clean$doi, config$software$output$zenodo$deleteRecordByDOI))
              }
              #selective cleaning by community(ies)
              if(!is.null(clean$community)){
                config$logger$INFO("Zenodo: cleaning draft deposits in communities [%s]", paste(clean$community, collapse=","))
                invisible(lapply(clean$community, function(community){
                  config$software$output$zenodo$deleteRecords(q = sprintf("communities:%s", community))
                }))
              }
            }
            config$logger$INFO("Zenodo: sleeping 5 seconds...")
            Sys.sleep(5)    
          }
        }
        
        #Monitor incrementation
        nb_step<-length(entities) * length(config$actions) + sum(sapply(entities, function(entity){length(entity$data$actions)}))
        inc_step<-1/nb_step*100
        step=0
        
        for(entity in entities){
          
          #create entity jobdir
          entity$prepareEntityJobDir(config, jobdir)
          #let's work in entity jobdir
          setwd(entity$getEntityJobDirPath(config, jobdir))
          config$logger$INFO("Entity working directory: %s", getwd())
          
          #enrich metadata with dynamic properties
          if(!is.null(entity$data)){
            #data features/coverages
            if(!skipDataDownload){
              config$logger$INFO("SkipDataDownload is false: copying and fetching data...")
              #we copy data to job data dir (for data files)
              entity$copyDataToJobDir(config, jobdir)
              #enrich with data types
              if(!skipEnrichWithDatatypes) entity$enrichWithDatatypes(config, jobdir)
              #vector data: we enrich entity with features
              #control is added in case of entity already enriched with features/coverages (when loaded from custom R entity handlers)
              if(!skipEnrichWithData) if(is.null(entity$data$features) && is.null(entity$data$coverages)){
                entity$enrichWithData(config, jobdir)
              }
              
              setwd(entity$getEntityJobDirPath(config, jobdir)) #make sure we are in entity jobdir
              #we check if the source and upload are both different file format (csv,shp,gpkg) and process automatically to conversion from source to upload type
              entity$prepareFeaturesToUpload(config)
            }else{
              config$logger$INFO("SkipDataDownload is true:")
              if(!skipEnrichWithData){
                config$logger$INFO("SkipEnrichWithData is false: Fetching spatial coverage from data (for DB sources only)...")
                #alternative behaviors in case we don't download data, applies to DB only
                entity$enrichSpatialCoverageFromDB(config)
              }else{
                config$logger$INFO("SkipEnrichWithData is true")
              }
            }
            
            #copyStylesToJobDir
            entity$copyStylesToJobDir(config)
            
            #extra identifiers to use in entity identification/actions
            entity$enrichWithIdentifiers(config)
            #data relations (eg. geosapi & OGC data protocol online resources)
            entity$enrichWithRelations(config)
            
            #data formats
            entity$enrichWithFormats(config)
            
            #data subjects
            if(!skipEnrichWithDataSubjects) entity$enrichWithSubjects(config, exclusions = dataSubjectsToExclude)
          }
          
          #enrich entities with metadata (other properties)
          entity$enrichWithMetadata(config)
          
          #enrich entities with vocabularies
          entity$enrichWithVocabularies(config)
          
          #run sequence of entity data actions (if any)
          if(!is.null(entity$data)) {
            if(entity$data$run){
              if(length(entity$data$actions)>0){
                for(i in 1:length(entity$data$actions)){
                  entity_action <- entity$data$actions[[i]]
                  config$logger$INFO("Executing entity data action %s: '%s' ('%s')", i, entity_action$id, entity_action$script)
                  entity_action$run(entity, config)
                  #monitor local action
                  step<-step+inc_step
                  config$logger$INFO("WORKFLOW PROGRESS : ACTION '%s' of ENTITY '%s' ... %s %%",entity_action$id,entity$identifiers[["id"]],step)
                  if(!is.null(monitor)) monitor(step=step,config=config,entity=entity,action=entity_action,queue=queue)
                }
                #we trigger entity enrichment (in case entity data action involved modification of entity)
                entity$enrichWithMetadata()
              }
            }else{
              config$logger$INFO("Execution of entity data actions is disabled")
            }
          }
          
          #Check existance of Workspace and create it if need
          geosapi <-sapply(actions, function(x){x$id=="geosapi-publish-ogc-services"})
          withGeosapi <- any(geosapi)
          
          if(withGeosapi){
            createWorkspace<-actions[geosapi][[1]]$getOption("createWorkspace")
    
            GS <- config$software$output$geoserver
            if(is.null(GS)){
              errMsg <- "This action requires a GeoServer software to be declared in the configuration"
              config$logger$ERROR(errMsg)
              stop(errMsg)
            }
            
            GS_CONFIG <- config$software$output$geoserver_config
            workspace <- GS_CONFIG$properties$workspace
            if(!is.null(entity$data$workspaces$geoserver)) workspace <- entity$data$workspaces$geoserver
            if(is.null(workspace)){
              errMsg <- "The geoserver configuration requires a workspace for publishing action"
              config$logger$ERROR(errMsg)
              stop(errMsg)
            }
              
            # Check existence of workspace
            ws <- GS$getWorkspace(workspace)
            # If workspace not exist
            # Check if createWorkspace is TRUE
            if(length(ws)==0){
              if(createWorkspace){
                created <- GS$createWorkspace(workspace, paste0("http://",workspace))
                if(created){
                  infoMsg <- sprintf("Successful Geoserver '%s' workspace creaction", workspace)
                  config$logger$INFO(infoMsg)
                }else{
                  errMsg <- "Error during Geoserver workspace creation. Aborting 'geosapi' action!"
                  config$logger$ERROR(errMsg)
                  stop(errMsg)
                }
              }else{
                # If createWorkspace is FALSE edit ERROR Message
                errMsg <- sprintf("Workspace '%s' don't exist and createWorkspace option = FALSE, please verify config if workspace already exist or change createWorkpace = TRUE to create it",workspace)
                config$logger$ERROR(errMsg)
                stop(errMsg)
              }
            }
          }
          
          #run sequence of global actions
          to_publish = FALSE
          if(length(actions)>0) for(i in 1:length(actions)){
            action <- actions[[i]]$clone(deep = TRUE)
            config$logger$INFO("Executing Action %s: %s - for entity %s", i, action$id, entity$identifiers[["id"]])
            if(action$id == "zen4R-deposit-record"){
              if(!is.null(action$options$publish)) if(action$options$publish){
                to_publish <- TRUE
                action$options$publish <- FALSE
              }
            }
            action$run(entity = entity, config = config)
            
            #monitor global action
            step<-step+inc_step
            config$logger$INFO("WORKFLOW PROGRESS : ACTION '%s' of ENTITY '%s' ... %s %%",action$id,entity$identifiers[["id"]],step)
            if(!is.null(monitor)) monitor(step=step,config=config,entity=entity,action=action,queue=queue)
          }
          
          #search for generic uploader actions (eg. Zenodo, Dataverse)
          if(length(actions)>0) {
            generic_uploaders <- actions[sapply(actions, function(x){x$isGenericUploader()})]
            if(length(generic_uploaders)>0){
              for(i in 1:length(generic_uploaders)){
                generic_uploader = generic_uploaders[[i]]$clone(deep = TRUE)
                config$logger$INFO("Last trigger for action '%s' (generic upload behavior)", generic_uploader$id)
                #For Zenodo: 
                #if Zenodo is the only action then let's sleep to avoid latence issues when listing depositions
                #if no sleep is specified, getDepositions doesn't list yet the newly deposited recorded with
                #isIdenticalTo relationship
                if(generic_uploader$id == "zen4R-deposit-record" & length(actions)==1){
                  config$logger$INFO("Zenodo: sleeping 5 seconds...")
                  Sys.sleep(5)
                }
                
                #behavior for generic uploaders, we set depositWithFiles = TRUE and proceed with all resource uploads
                generic_uploader_options <- generic_uploader$options
                generic_uploader_options$depositWithFiles <- TRUE
                if(generic_uploader$id == "zen4R-deposit-record"){
                  generic_uploader_options$publish <- to_publish
                }
                generic_uploader$options <- generic_uploader_options
                generic_uploader$run(entity, config)
              }
            }
          }
          
          entity$data$features <- NULL
          setwd(jobdir)
        }
        
        #special business logics in case of PID generators (eg. DOIs)
        if(length(actions)>0){
          pid_generators <- actions[sapply(actions, function(x){x$isPIDGenerator()})]
          if(length(pid_generators)>0){
            for(i in 1:length(pid_generators)){
              pid_generator = pid_generators[[i]]$clone(deep = TRUE)
              pid_generator$exportPIDs(config, entities)
            }
          }
        }
        
        #save source tabular entities,contacts and dictionaries used
        if(length(config$src_entities)>0){
          for(i in 1:length(config$src_entities)){
            readr::write_csv(config$src_entities[[i]], file.path(getwd(), sprintf("config_copyof_entities_%s.csv", i)))
          }
        }
        if(length(config$src_contacts)>0){
          for(i in 1:length(config$src_contacts)){
            readr::write_csv(config$src_contacts[[i]], file.path(getwd(), sprintf("config_copyof_contacts_%s.csv", i)))
          }
        }
        if(length(config$src_dictionary)>0){
          for(i in 1:length(config$src_dictionary)){
            readr::write_csv(config$src_dictionary[[i]], file.path(getwd(), sprintf("config_copyof_dictionary_%s.csv", i)))
          }
        }
        
        #save entities
        entities = config$getEntities()
        if(length(entities)>0){
          entities_df = do.call("rbind", lapply(entities, function(x){x$asDataFrame()}))
          readr::write_csv(entities_df, file.path(getwd(), "config_geoflow_entities.csv"))
        }
      }
    }else if(config$profile$mode == "raw"){
      #execute raw actions (--> not based on metadata entities)
      for(i in 1:length(actions)){
        action <- actions[[i]]
        config$logger$INFO("Executing Action %s: %s", i, action$id)
        eval(expr = parse(action$script))
      }
    }
    
    #Actions onend
    config$logger$separator("-")
    config$logger$INFO("Executing software actions 'onend' ...")
    #run software 'onend' actions
    runSoftwareActions(config, "input", "onend")
    runSoftwareActions(config, "output", "onend")
    config$logger$INFO("Workflow successfully executed ...")
}

Try the geoflow package in your browser

Any scripts or data that you put into this service are public.

geoflow documentation built on Dec. 12, 2025, 5:08 p.m.