R/handler.R

Defines functions .hinit hive_create .hadoop_home_dirs .hadoop_bin_absolute .create_hive_from_installation .hive_default_env hive_is_valid print.hive summary.hive hive_start hive_stop hive_is_available hive_get_nreducer hive_set_nreducer hive_default_nreducer hadoop hadoop_home hadoop_streaming hadoop_version hadoop_get_jars hadoop_get_version hadoop_framework_control

Documented in .hinit hive_create hive_get_nreducer hive_is_available hive_set_nreducer hive_start hive_stop

## Handling the Hadoop framework

## .hinit() initializes the Hadoop framework and returns the corresponding environment.
.hinit <- function( hadoop_home ) {
  ## use installation in HADOOP_HOME if it exists, otherwise the Debian default
  if( missing(hadoop_home) )
    hadoop_home <- ifelse( utils::file_test("-d", Sys.getenv("HADOOP_HOME")), Sys.getenv("HADOOP_HOME"), "/etc/hadoop" )
  tmp <- tryCatch( hive_create(hadoop_home), error = identity )
  hive <- if( inherits(tmp, "error") )
    .hive_default_env()
  else
    tmp
  hive
}

## See also .create_hive_from_installation.
## Returns on object of class 'hive'.
hive_create <- function( hadoop_home ){
  if( missing(hadoop_home) ){
    hadoop_home_exists <- file.exists( .hadoop_home_dirs() )
    ifelse( any(hadoop_home_exists),
            hadoop_home <- .hadoop_home_dirs()[hadoop_home_exists],
            stop( "could not find Hadoop home directory!")
           )
  }
  hive <- .create_hive_from_installation( file_path_as_absolute(hadoop_home) )
  hive_set_nreducer( hive_default_nreducer(hive), hive )
  class( hive ) <- "hive"
  hive
}

.hadoop_home_dirs <- function(){
  c("/etc/hadoop")
}

.hadoop_bin_absolute <- function(hadoop_home){
    if( utils::file_test("-x", file.path(hadoop_home, "bin", "hadoop")) )
        file.path(hadoop_home, "bin", "hadoop")
    else
        Sys.which("hadoop")
}

## Given a pointer to a Hadoop installation directory, this function
## creates an environment containing all information about the Hadoop
## cluster.  We store the hadoop home directory, the hadoop version,
## and the parsed configuration files in a separate R environment.
.create_hive_from_installation <- function( hadoop_home ){
  if( !file.exists(hadoop_home) )
    stop( sprintf("There is no directory '%s'.", hadoop_home) )
  hive <- new.env()
  hvers <- hadoop_get_version( hadoop_home )
  streaming_home <- c( file.path(hadoop_home, "share/hadoop/tools/lib"),
                       file.path(hadoop_home, "contrib/streaming"),
                       file.path("/usr/lib/hadoop/contrib/streaming") )

  jar <- lapply(lapply(streaming_home, dir), function(x) grep("(hadoop.+streaming.+jar$)", x, value = TRUE))
  ind <- unlist(lapply(jar, function(x) length(x) > 0))
  hadoop_streaming <- file.path(streaming_home[ind][1], unlist(jar[ind])[1])
  ## OLD CONFIG: config files have been split up and located in different places since version 0.20.0
  if( hvers < "0.20.0" ){
      local( {
          hadoop <- .hadoop_bin_absolute(hadoop_home)
          version <- hvers
          stopifnot(file.exists(hadoop))
          config_files <- list(hadoop_default = get_hadoop_config("hadoop-default.xml", file.path(hadoop_home,"conf")),
                               hadoop_site = get_hadoop_config("hadoop-site.xml", file.path(hadoop_home,"conf")),
                                workers = readLines(file.path(hadoop_home, "conf", "slaves")),
                                masters = readLines(file.path(hadoop_home, "conf", "masters")))
      }, hive )

  } else {
  ## CURRENT CONFIGS
      if( hvers >= "2.6.0" ) {
          hadoop_src       <- file.path( hadoop_home, "share", "doc", "hadoop" )
          hadoop_common    <- file.path( hadoop_home, "share", "hadoop", "common" )
          hadoop_lib       <- file.path( hadoop_common, "lib")
          hadoop_mapreduce <- file.path( hadoop_home, "share", "hadoop", "mapreduce" )
          hadoop_hdfs      <- file.path( hadoop_home, "share", "hadoop","hdfs" )
          hadoop_tools     <- file.path( hadoop_home, "share", "hadoop","tools", "lib" )
          local( {
              hadoop <- .hadoop_bin_absolute(hadoop_home)
              version <- hvers
              stopifnot(file.exists(hadoop))

              config_files <- list(core_default = get_hadoop_config("core-default.xml", file.path(hadoop_src, "hadoop-project-dist", "hadoop-common")),
                                   core_site = get_hadoop_config("core-site.xml", file.path(hadoop_home, "etc", "hadoop")),
                                   hdfs_default = get_hadoop_config("hdfs-default.xml", file.path(hadoop_src, "hadoop-project-dist", "hadoop-hdfs")),
                                   hdfs_site = get_hadoop_config("hdfs-site.xml", file.path(hadoop_home, "etc", "hadoop")),
                                   mapred_default = get_hadoop_config("mapred-default.xml", file.path(hadoop_src, "hadoop-mapreduce-client", "hadoop-mapreduce-client-core")),
                                   mapred_site = get_hadoop_config("mapred-site.xml", file.path(hadoop_home, "etc", "hadoop")),
                                   log4j_properties = suppressWarnings(tryCatch(readLines(file.path(hadoop_home, "etc", "hadoop", "log4j.properties")), error = function(x) NA)),
                                   workers = suppressWarnings(tryCatch(readLines(file.path(hadoop_home, "etc", "hadoop", "workers")), error = function(x) NA)),
                                   masters = suppressWarnings(tryCatch(readLines(file.path(hadoop_home, "etc", "hadoop", "masters")), error = function(x) NA)))

              commonsloggingjar       <- grep( "commons-logging-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              commonsconfigurationjar <- grep( "commons-configuration-[0-9].*[.]jar", dir(file.path(hadoop_lib)), value = TRUE )
              if(!length(commonsconfigurationjar))
                  commonsconfigurationjar <- grep( "commons-configuration2-[0-9].*[.]jar", dir(file.path(hadoop_lib)), value = TRUE )
              commonslangjar <- grep( "commons-lang-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              if(!length(commonslangjar))
                  commonslangjar <- grep( "commons-lang3-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              commonscli <- grep( "commons-cli-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              commonsiojar <- grep( "commons-io-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              guavajar <- grep( "guava-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE ) # in lib since at least version 2.10 and 3.2.1
              commonscollectionsjar <- grep( "commons-collections-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE ) # in lib since at least version 2.10 and 3.2.1
              hadoop_auth <- sprintf( "hadoop-auth-%s.jar", hvers )
              slf4jjar <- grep( "slf4j-api-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              slf4jlogjar <- grep( "slf4j-log4j12-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              servletjar <- grep( "servlet-api-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE ) # in lib since at least version 2.10 and 3.2.1
              log4jjar <- grep( "log4j-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE ) # in lib since at least version 2.10 and 3.2.1
              protobufjavajar <- grep( "protobuf-java-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              htracejar <- grep("htrace-core-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              if(!length(htracejar))
                  htracejar <- grep("htrace-core4-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              woodstoxjar <- grep("woodstox-core-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              staxjar <- grep("stax2-api-[0-9].*[.]jar", dir(hadoop_lib), value = TRUE )
              hadoop_jars <- c( file.path(hadoop_lib, c(commonscli, commonsloggingjar, commonsconfigurationjar, commonslangjar, commonsiojar)),
                                file.path(hadoop_lib, c(guavajar, commonscollectionsjar, hadoop_auth, servletjar, woodstoxjar, staxjar)), # in lib since at least version 2.10 and 3.2.1
                                file.path(hadoop_lib, c(slf4jjar, slf4jlogjar, log4jjar, protobufjavajar, htracejar)),
                                file.path(hadoop_common, c(sprintf("hadoop-common-%s.jar", hvers))),
                                file.path(hadoop_hdfs, c(sprintf("hadoop-hdfs-%s.jar", hvers))),
                                file.path(hadoop_hdfs, c(sprintf("hadoop-hdfs-client-%s.jar", hvers))),
                                file.path(hadoop_mapreduce, c(sprintf("hadoop-mapreduce-client-common-%s.jar", hvers)))
  )
              stopifnot( all(file.exists(hadoop_jars)) )

      }, hive )
  } else {
      ## VERSION 1.0 CONFIGS
      if( file.exists( file.path(hadoop_home, "src/core" )) ){
          hadoop_src <- file.path(hadoop_home, "src" )
      } else if( file.exists( file.path("/usr/src/hadoop-0.20", "core" )) ){
          hadoop_src <- "/usr/src/hadoop-0.20"
      } else {
          hadoop_src <- system.file("defaults", package = "hive")
      }
      local( {
          hadoop <- .hadoop_bin_absolute(hadoop_home)
          version <- hvers
          stopifnot(file.exists(hadoop))

          config_files <- list(core_default = get_hadoop_config("core-default.xml", file.path(hadoop_src, "core")),
                                core_site = get_hadoop_config("core-site.xml", file.path(hadoop_home, "conf")),
                                hdfs_default = get_hadoop_config("hdfs-default.xml", file.path(hadoop_src, "hdfs")),
                                hdfs_site = get_hadoop_config("hdfs-site.xml", file.path(hadoop_home, "conf")),
                                mapred_default = get_hadoop_config("mapred-default.xml", file.path(hadoop_src, "mapred")),
                                mapred_site = get_hadoop_config("mapred-site.xml", file.path(hadoop_home, "conf")),
                                workers = suppressWarnings(tryCatch(readLines(file.path(hadoop_home, "conf", "slaves")), error = function(x) NA)),
                                masters = suppressWarnings(tryCatch(readLines(file.path(hadoop_home, "conf", "masters")), error = function(x) NA)))


          ## FIXME: Debian packages not available anymore, thus using CDH paths
          ## hadoop_jars <- file.path("/usr/share/java/", c("hadoop-core.jar", "commons-logging.jar"))
          commonsloggingjar <- grep("commons-logging-[0-9].*[.]jar", dir(file.path("/usr/lib/hadoop", "lib")), value = TRUE)
          hadoop_jars <- file.path("/usr/lib/hadoop", c("hadoop-core.jar", file.path("lib",commonsloggingjar)))

          if( !all(file.exists(hadoop_jars)) ){
              commonsloggingjar <- grep("commons-logging-[0-9].*[.]jar", dir(file.path(hadoop_home, "lib")), value = TRUE)
              commonsconfigurationjar <- grep("commons-configuration-[0-9].*[.]jar", dir(file.path(hadoop_home, "lib")), value = TRUE)
              commonslangjar <- grep("commons-lang-[0-9].*[.]jar", dir(file.path(hadoop_home, "lib")), value = TRUE)
              hadoop_jars <- if(version >= "0.20.203")
                  file.path(hadoop_home, c(sprintf("hadoop-core-%s.jar", version), file.path("lib", commonsconfigurationjar), file.path("lib", commonslangjar), file.path("lib", commonsloggingjar)))
              else
                  file.path(hadoop_home, c(sprintf("hadoop-%s-core.jar", version),  file.path("lib", commonsloggingjar)))
        }
      }, hive )
  }}
  hive
}

## Default environment: NA
.hive_default_env <- function(){
  NA
}

## Checks if henv inherits from class 'hive'
hive_is_valid <- function( henv ){
  inherits(henv, "hive")
}

## Provides information about the "hive"
print.hive <- function( x, ... ){
  writeLines( "HIVE: Hadoop Cluster" )
  writeLines( sprintf("- Avail. datanodes: %d", length(hive_get_workers(x))) )
  writeLines( sprintf("'- Max. number Map tasks per datanode: %s",
                      if( hadoop_version(x) >= "3.2.1" ) { hive_get_parameter("mapreduce.job.maps", x) } else { if(hadoop_version(x) >= "2.6.0") {
                             hive_get_parameter("mapreduce.tasktracker.map.tasks.maximum", x) } else {hive_get_parameter("mapred.tasktracker.map.tasks.maximum", x)}}) )
  writeLines( sprintf("'- Configured Reducer tasks: %d",
                      hive_get_nreducer(x)) )
}

summary.hive <- function( object, ... ){
    print(object)
    writeLines( "---" )
    writeLines( sprintf("- Hadoop version: %s", hadoop_version(hive(object))) )
    writeLines( sprintf("- Hadoop home/conf directory: %s", hadoop_home(object)) )
    writeLines( sprintf("- Namenode: %s", hive_get_masters(object)) )
    writeLines( "- Datanodes:")
    writeLines( sprintf("'- %s\n", hive_get_workers(object)) )
}

## Start and stop a Hadoop cluster.
## NOTE: Java DFS support is only available for the current cluster.
##       Thus, add/remove DFS support in each call to hive_start/stop
hive_start <- function( henv = hive() ){
    ## does nothing if hadoop daemons are already running
    if( DFS_is_registered(henv) )
        return( invisible(TRUE) )
    ## otherwise start the Hadoop framework ...
    ## FIXME: on Debian systems Hadoop daemons are controlled via
    ##        /etc/init.d scripts (started automatically)
    start_dfs_sh <- ifelse( hadoop_version(henv) >= "2.6.0",
                            file.path(hadoop_home(henv), "sbin", "start-dfs.sh"),
                            file.path(hadoop_home(henv), "bin", sprintf("%s-all.sh", "start")) )
    if( file.exists(start_dfs_sh) )
        hadoop_framework_control( "start", henv )
    ## ... and add Java DFS support
    ## NOTE: really important that Java DFS support is added AFTER
    ## framework has been started
    status <- add_java_DFS_support( henv = hive() )
    ## if there are problems starting hive, close it
    if( !status ){
        # writeLines(msg)
        suppressWarnings( hive_stop(henv) )
        return( invisible(FALSE) )
    }
    invisible( TRUE )
}

hive_stop <- function( henv = hive() ){
    if( DFS_is_registered(henv) ){
        remove_java_DFS_support( henv )
        ## FIXME: on Debian systems Hadoop daemons are controlled via
        ##        /etc/init.d scripts (stopped automatically)
        stop_dfs_sh <- ifelse( hadoop_version(henv) >= "2.6.0",
                              file.path(hadoop_home(henv), "sbin", "stop-dfs.sh"),
                              file.path(hadoop_home(henv), "bin", sprintf("%s-all.sh", "stop")) )
        if( file.exists(stop_dfs_sh) )
            hadoop_framework_control( "stop", henv )
    }
    else
        warning( "No Hadoop cluster running. Nothing to stop." )
    invisible( TRUE )
}

## FIXME: Very simple query of hadoop status. Probably better to use pid?
hive_is_available <- function( henv = hive() ){
    stopifnot( hive_is_valid(henv) )
    suppressWarnings( DFS_is_available(henv) )
}


hive_get_nreducer <- function( henv = hive() ){
  get( "nreducer", henv )
}

hive_set_nreducer <- function( n, henv = hive() ) {
  assign( "nreducer", as.integer(n), henv )
}

hive_default_nreducer <- function( henv = hive() ){
    as.integer(round( length(hive_get_workers(henv)) / 1.5 ))
}

## Internal extractor functions
hadoop <- function( henv )
  get( "hadoop", henv )

hadoop_home <- function( henv )
  get( "hadoop_home", henv )

hadoop_streaming <- function( henv )
  get( "hadoop_streaming", henv )

hadoop_version <- function( henv )
  get( "version", henv )

hadoop_get_jars <- function( henv )
  get( "hadoop_jars", henv )

hadoop_get_version <- function( hadoop_home ){
  version_file <- file.path(hadoop_home, "contrib", "hod", "bin", "VERSION")
  if( file.exists( version_file ) )
    readLines( version_file )
  else
    gsub("Hadoop ", "", system(sprintf("%s version", .hadoop_bin_absolute(hadoop_home)), intern = TRUE, ignore.stderr = TRUE)[1])
}

## Controlling the Hadoop framework: currently using the start/stop_all.sh scripts
## in the $HADOOP_HOME/bin directory
## FIXME: This may not be platform independent
hadoop_framework_control <- function( action = c("start", "stop"), henv ){
    action <- match.arg(action)
    dfs_action_sh <- ifelse( hadoop_version(henv) >= "2.6.0",
                             file.path(hadoop_home(henv), "sbin", sprintf("%s-dfs.sh", action)),
                             file.path(hadoop_home(henv), "bin", sprintf("%s-all.sh", action)) )
    system( dfs_action_sh, intern = TRUE, ignore.stderr = TRUE )
}

Try the hive package in your browser

Any scripts or data that you put into this service are public.

hive documentation built on Jan. 12, 2020, 6:19 p.m.