R/td-job.R

#' Create Treasure Data client
#'
#' @export
td_job <- function(client, job_id, database)
{
  structure(list(key=client$key, db=database, id=job_id), class=c("tdjob"))
}

#' @export
is.job <- function(x){inherits(x, "job")}


#' Shows the status of a specific job.
#'
#' It is faster and more robust than show()
#'
#' @param job tdjob object generated by td_job()
#' @export
status <- function(job)
{
  url <- paste0(url_api, "job/status/", job$id)
  response <- get(job$key, url)
  result <- jsonlite::fromJSON(content(response, "text"))
  result$cpu_time <- if(is.null(result$cpu_time)){0}else{result$cpu_time}
  as.data.frame(result, stringsAsFactors=FALSE) %>%
    dplyr::mutate_each(funs(to_posixct), created_at, updated_at, start_at, end_at)
}

setOldClass("tdjob")
#' Shows the status and logs of a specific job.
#'
#' @param job tdjob object generated by td_job()
#' @export
setMethod(show, "tdjob", function(object){
  job <- object
  url <- paste0(url_api, "job/show/", job$id)
  response <- get(job$key, url)
  result <- jsonlite::fromJSON(content(response, "text"))
  result$cpu_time <- if(is.null(result$cpu_time)){0}else{result$cpu_time}
  result$organization <- if(is.null(result$organization)){""}else{result$organization}
  result$debug$stderr <- if(is.null(result$debug$stderr)){0}else{result$debug$stderr}
  result <- cbind(
    as.data.frame(result[names(result) != "debug"], stringsAsFactors=FALSE),
    as.data.frame(result$debug, stringsAsFactors=FALSE)
  )
  dplyr::mutate_each(result, funs(to_posixct), created_at, updated_at, start_at, end_at)
})

#' Kills the currently running job.
#'
#' The kill operation is performed asynchronously.
#' @param job tdjob object generated by td_job()
#' @export
kill <- function(job)
{
  url <- paste0(url_api, "job/kill/", job_id)
  response <- post(api_key, url, NULL)
  as.data.frame(jsonlite::fromJSON(content(response, "text")))
}

#' Returns the result of a specific job.
#'
#' Before issuing this command, please confirm that the job has been completed successfully via show()
#' The resulting logs can be large, so using a large timeout value (i.e. several minutes) is recommended when using this command.
#' @param job tdjob object generated by td_job()
#' @export
result <- function(job)
{
  url <- paste0(url_api, sprintf("job/result/%s?format=csv", job$id))
  response <- get(job$key, url)
  df <- show(job)
  res <- as.data.frame(fread(content(response, "text")))
  colnames(res) <- schema(df$hive_result_schema)$name
  res
}

wait_finish <- function(job)
{
  s <- status(job)$status
  while(s == "running"){
    Sys.sleep(3)
    s <- status(job)$status
  }
}
teramonagi/tdclientr documentation built on May 31, 2019, 8:38 a.m.