#' Job
#'
#' This class represents an openEO job. Which is submitted by an user, containing an executable process graph. It is strongly bound
#' to the backend, since the OpenEOServer class needs to be accessible, when it comes to loading related process for the process
#' graph.
#'
#' @field job_id The unique identifier of the job
#' @field status The current status in the job lifecycle
#' @field process_graph graph of nested processes that is executable (ExecutableProcess)
#' @field submitted Timestamp when the job was submitted to the server
#' @field user_id The user who owns the job
#' @field consumed_credits For accounting and billing the amount of credits consumed by this job
#' @field last_update Timestamp when the job was last updated
#' @field results Contains the result of the process_graph after execution (Collection)
#' @field persistent Whether or not the job is stored in database
#' @field output list containing the output configuration like format (or additional GDAL commands)
#'
#' @include Process-class.R
#' @include utils.R
#' @importFrom R6 R6Class
#' @importFrom jsonlite fromJSON
#' @importFrom lubridate as_datetime
#' @export
Job <- R6Class(
"Job",
inherit = DatabaseEntity,
# public ====
public = list(
# attributes ----
job_id = NULL,
status=NA,
process_graph = NULL,
submitted=NA,
last_update=NA,
user_id=NULL,
consumed_credits=NA,
output=NA,
budget=NA,
title= NA,
description = NA,
plan = NA,
results = NULL, # contains the results of the process_graph after execution
persistent = FALSE, # whether or not the job is stored in data base
# functions ----
initialize = function(job_id=NULL,process_graph=NULL,user_id = NULL) {
self$job_id = job_id
if (!is.null(user_id)) {
self$user_id = user_id
}
if (is.na(self$submitted)) {
self$submitted = now()
self$last_update = self$submitted
}
if (!is.na(self$status)) {
self$status = "submitted"
}
self$consumed_credits = 0
if (!is.null(process_graph)) {
if (!is.ProcessGraph(process_graph)) {
if (is.graph_id(process_graph)) {
#load graph id and overwrite user and grpah_id
private$pg = ProcessGraph$new(graph_id=process_graph)
private$pg$user_id = user_id
private$pg$graph_id = NULL # will be created on store
} else {
private$pg = ProcessGraph$new(process_graph = process_graph, user_id = user_id)
}
} else {
private$pg$process_graph = process_graph
}
user = User$new()$load(user_id=user_id)
self$process_graph = private$pg$buildExecutableProcessGraph(user=user,job=self)
}
return(self)
},
store = function() {
if (is.null(self$job_id)) {
self$job_id = private$newJobId()
}
if (is.na(private$pg$graph_id) || is.null(private$pg$graph_id)) {
private$pg$store()
}
encodedOutput = NA
if (!is.null(self$output)) {
encodedOutput = encodeChar2Hex(toJSON(x=self$output,auto_unbox = TRUE,pretty = TRUE))
}
if (!exists.Job(self$job_id)) {
insertIntoQuery = "insert into job (
job_id,
user_id,
status,
process_graph,
submitted,
last_update,
consumed_credits,
output,
budget,
title,
description,
plan
) values (
:job_id,
:user_id,
:status,
:process_graph,
:submitted,
:last_update,
:consumed_credits,
:output,
:budget,
:title,
:description,
:plan
);"
con = openeo.server$getConnection()
dbExecute(con, insertIntoQuery, param = list(
job_id = self$job_id,
user_id = self$user_id,
status = self$status,
process_graph = private$pg$graph_id, # it is the graph_id at this point
submitted=as.character(self$submitted),
last_update = as.character(self$last_update),
consumed_credits = self$consumed_credits,
output = encodedOutput,
budget = self$budget,
title = self$title,
description = self$description,
plan = self$plan
))
dbDisconnect(con)
} else {
updateQuery = "update job
set
status = :status,
submitted = :submitted,
last_update = :last_update,
consumed_credits = :consumed_credits,
output = :output,
budget = :budget,
title = :title,
description = :description,
plan = :plan
where
job_id = :job_id;"
con = openeo.server$getConnection()
dbExecute(con, updateQuery, param = list(
status = self$status,
submitted = as.character(self$submitted),
last_update = as.character(self$last_update),
consumed_credits = self$consumed_credits,
job_id = self$job_id,
output = encodedOutput,
budget = self$budget,
title = self$title,
description = self$description,
plan = self$plan
))
dbDisconnect(con)
}
self$persistent = TRUE
invisible(self)
},
load = function() {
if (is.null(self$job_id)) {
stop("Cannot load job without an ID")
}
if (!exists.Job(self$job_id)) {
stop("Cannot find job")
}
con = openeo.server$getConnection()
job_info = dbGetQuery(con, "select * from job where job_id = :id"
,param = list(id=self$job_id))
dbDisconnect(con)
self$user_id = job_info$user_id
self$status = job_info$status
self$submitted = job_info$submitted
self$last_update = job_info$last_update
self$consumed_credits = job_info$consumed_credits
if (is.null(job_info$output) || is.na(job_info$output) || length(job_info$output) < 1) {
self$output = NULL
} else {
self$output = fromJSON(txt=decodeHex2Char(job_info$output),simplifyDataFrame = FALSE)
}
self$budget = job_info$budget
self$title = job_info$title
self$description = job_info$description
self$plan = job_info$plan
# when stored in a db then all the time the graph is loaded from db, regardless if it is published or not
private$pg = ProcessGraph$new(graph_id = job_info$process_graph)
self$process_graph = private$pg$buildExecutableProcessGraph(user = User$new()$load(user_id=self$user_id), job=self) #from db
self$persistent = TRUE
invisible(self)
},
shortInfo = function() {
info = list(
job_id = self$job_id,
title = self$title,
description = self$description,
status = self$status,
submitted = iso_datetime(self$submitted),
updated = iso_datetime(self$last_update),
plan = self$plan,
costs = self$consumed_credits,
budget = self$budget
)
return(info)
},
remove = function() {
if (is.null(self$job_id) || !exists.Job(self$job_id)) {
return(FALSE)
}
con = openeo.server$getConnection()
success1 = dbExecute(con,"delete from process_graph
where graph_id = (
select process_graph
from job
where job_id = :id)",param=list(id = self$job_id)) == 1
success2 = dbExecute(con,"delete from job where job_id = :id",param=list(id = self$job_id)) == 1
dbDisconnect(con)
self$clearLog()
if (dir.exists(self$output.folder)) {
unlink(self$output.folder,recursive = TRUE)
}
return(success1 && success2)
},
detailedInfo = function() {
if (is.null(private$pg)) {
stop("process_graph not loaded from db")
}
info = list(
job_id = self$job_id,
title = self$title,
description = self$description,
process_graph = private$pg$process_graph,
output = self$output,
status = self$status,
submitted = iso_datetime(self$submitted),
updated =iso_datetime(self$last_update),
plan = self$plan,
costs = self$consumed_credits,
budget = self$budget
)
return(info)
},
getProcessGraph = function() {
return(private$pg)
},
modifyProcessGraph = function(graph) {
old_pg_id = private$pg$graph_id
# graph is the list represenation of the process graph as parsed from the job input object
private$pg = ProcessGraph$new(process_graph = graph, user_id = self$user_id)
private$pg$graph_id = old_pg_id
private$pg$update()
user = User$new()$load(user_id=self$user_id)
self$process_graph = private$pg$buildExecutableProcessGraph(user=user,job=self)
},
clearLog = function() {
con = openeo.server$getConnection()
tryCatch(
{
return(con %>% dbExecute("delete from log where job_id = :jid", param=list(jid=self$job_id)))
},
finally = {
dbDisconnect(con)
}
)
},
getLog = function() {
con = openeo.server$getConnection()
tryCatch({
results = con %>% dbGetQuery("select * from log where job_id = :jid", param=list(jid=self$job_id))
return(results)
},
finally = {
dbDisconnect(con)
})
},
run = function(logger) {
tryCatch({
logger$info("Start job processing...")
self$status = "running"
if (self$persistent) {
con = openeo.server$getConnection()
updateJobQuery = "update job set last_update = :time, status = :status where job_id = :job_id"
dbExecute(con, updateJobQuery ,param=list(time=as.character(Sys.time()),
status="running",
job_id=self$job_id))
dbDisconnect(con)
}
self$results = self$process_graph$execute()
self$status = "finished"
if (self$persistent) {
con = openeo.server$getConnection()
updateJobQuery = "update job set last_update = :time, status = :status where job_id = :job_id"
dbExecute(con, updateJobQuery ,param=list(time=as.character(Sys.time()),
status="finished",
job_id=self$job_id))
dbDisconnect(con)
}
logger$info("Job done")
}, error=function (e) {
self$status = "error"
self$results = NULL
if (self$persistent) {
con = openeo.server$getConnection()
updateJobQuery = "update job set last_update = :time, status = :status where job_id = :job_id"
dbExecute(con, updateJobQuery ,param=list(time=as.character(Sys.time()),
status="error",
job_id=self$job_id))
dbDisconnect(con)
}
logger$error("Error. Aborting execution.")
}, finally={
return(self)
})
}
),
# active ----
active = list(
output.folder = function() {
jobs.folder = "jobs"
return(paste(openeo.server$configuration$workspaces.path, jobs.folder, self$job_id,sep="/"))
}
),
# private ----
private = list(
# attributes ====
pg = NULL,
# functions ====
newJobId = function() {
randomString = createAlphaNumericId(n=1,length=15)
if (exists.Job(randomString)) {
# if id exists get a new one (recursive)
return(private$newJobId())
} else {
return(randomString)
}
}
)
)
# statics ====
is.Job = function(obj) {
return("Job" %in% class(obj))
}
exists.Job = function(job_id) {
if (nchar(job_id) == 15) {
con = openeo.server$getConnection()
result = dbGetQuery(con, "select count(*) from job where job_id = :id"
,param = list(id=job_id)) == 1
dbDisconnect(con)
return(result)
} else {
return(FALSE)
}
}
syncJobId = function() {
randomString = paste("SYNC",createAlphaNumericId(n=1,length=11),sep="")
if (exists.Job(randomString)) {
# if id exists get a new one (recursive)
return(syncJobId())
} else {
return(randomString)
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.