R/jobj.R

Defines functions clearJobjs cleanup.jobj getClassName.jobj print.jobj jobj getJobj isValidJobj

Documented in print.jobj

# References to objects that exist on the JVM backend
# are maintained using the jobj.
NULL

# Maintain a reference count of Java object references
# This allows us to GC the java object when it is safe
.validJobjs <- new.env(parent = emptyenv())

# List of object ids to be removed
.toRemoveJobjs <- new.env(parent = emptyenv())

# Check if jobj was created with the current SparkContext
isValidJobj <- function(jobj) {
  if (exists(".scStartTime", envir = SparkR:::.sparkREnv)) {
    jobj$appId == get(".scStartTime", envir = SparkR:::.sparkREnv)
  } else {
    FALSE
  }
}

getJobj <- function(objId) {
  newObj <- jobj(objId)
  if (exists(objId, .validJobjs)) {
    .validJobjs[[objId]] <- .validJobjs[[objId]] + 1
  } else {
    .validJobjs[[objId]] <- 1
  }
  newObj
}

# Handler for a java object that exists on the backend.
jobj <- function(objId) {
  if (!is.character(objId)) {
    stop("object id must be a character")
  }
  # NOTE: We need a new env for a jobj as we can only register
  # finalizers for environments or external references pointers.
  obj <- structure(new.env(parent = emptyenv()), class = "jobj")
  obj$id <- objId
  obj$appId <- get(".scStartTime", envir = SparkR:::.sparkREnv)

  # Register a finalizer to remove the Java object when this reference
  # is garbage collected in R
  reg.finalizer(obj, cleanup.jobj)
  obj
}

#' Print a JVM object reference.
#'
#' This function prints the type and id for an object stored
#' in the SparkR JVM backend.
#'
#' @param x The JVM object reference
#' @param ... further arguments passed to or from other methods
#' @note print.jobj since 1.4.0
print.jobj <- function(x, ...) {
  name <- getClassName.jobj(x)
  cat("Java ref type", name, "id", x$id, "\n", sep = " ")
}

getClassName.jobj <- function(x) {
  cls <- call_method(x, "getClass")
  call_method(cls, "getName")
}

cleanup.jobj <- function(jobj) {
  if (isValidJobj(jobj)) {
    objId <- jobj$id
    # If we don't know anything about this jobj, ignore it
    if (exists(objId, envir = .validJobjs)) {
      .validJobjs[[objId]] <- .validJobjs[[objId]] - 1

      if (.validJobjs[[objId]] == 0) {
        rm(list = objId, envir = .validJobjs)
        # NOTE: We cannot call removeJObject here as the finalizer may be run
        # in the middle of another RPC. Thus we queue up this object Id to be
        # removed and then run all the removeJObject when the next RPC is
        # called.
        .toRemoveJobjs[[objId]] <- 1
      }
    }
  }
}

clearJobjs <- function() {
  valid <- ls(.validJobjs)
  rm(list = valid, envir = .validJobjs)

  removeList <- ls(.toRemoveJobjs)
  rm(list = removeList, envir = .toRemoveJobjs)
}
danzafar/tidyspark documentation built on Sept. 30, 2020, 12:19 p.m.