R/snowWrappers.R

Defines functions sfClusterSetupRNG sfClusterSetupRNGstream sfClusterSetupSPRNG sfMM sfCapply sfRapply sfApply sfSapply sfLapply sfClusterApplyLB sfClusterApply sfClusterMap sfClusterEvalQ sfClusterEval sfClusterCall sfClusterSplit

Documented in sfApply sfCapply sfClusterApply sfClusterApplyLB sfClusterCall sfClusterEval sfClusterEvalQ sfClusterMap sfClusterSetupRNG sfClusterSetupRNGstream sfClusterSetupSPRNG sfClusterSplit sfLapply sfMM sfRapply sfSapply

## Wrappers for Snow function.
##
## The wrappers do the following: decide whether we run in parallel or
## sequential mode.
## In parallel mode the according Snow functions are used.
## In sequential mode, if it makes sense, the sequential counterparts
## of the Snow functions are used.

##****************************************************************************
## Wrapper for: clusterSplit
##****************************************************************************
sfClusterSplit <- function( seq ) {
  sfCheck();

  if( sfParallel() )
    return( clusterSplit( sfGetCluster(), seq ) )
  ## In sequential mode return a list with everything in element 1 (means:
  ## everything is run on one node).
  else
    return( list( seq ) )
}

##****************************************************************************
## Wrapper for: clusterCall
##
## Catches for errors. Return them or stop immidiately.
##****************************************************************************
sfClusterCall <- function( fun, ..., stopOnError=TRUE ) {
  sfCheck();

  if( !checkFunction( fun, stopOnError=FALSE ) ) {
    if( stopOnError )
      stop( "No function or not defined object in sfClusterCall" )
    else {
      warning( "No function or not defined object in sfClusterCall" )
      return( NULL )
    }
  }

  if( sfParallel() ) {
    ## Exec via Snow.
    result <- clusterCall( sfGetCluster(), fun, ... )

    ## Not enough results?
    ## @TODO Check if this test is needed
    if( length( result ) != sfCpus() ) {
      if( stopOnError )
        stop( paste( "Error in sfClusterCall (not all slaves responded).\n",
                     "Call from: ", as.character( sys.call( -1 ) ) ) )
      else {
        message( paste( "Error in sfClusterCall (not all slaves responded).\n",
                        "Call from: ", as.character( sys.call( -1 ) ) ) )
        return( result );
      }
    }

    ## Check if snow throw an exception on any of the slaves.
    if( !all( checkTryErrorAny( result ) ) ) {
      errorsTxt <- sapply( which( inherits( result, "try-error" ) ), function(x) result[[x]] )

      message( "EXCEPTION INFOS:" )
      message( paste( errorsTxt, collapse="\n" ) )
      
      if( stopOnError ) {
        stop( paste( "Error in sfClusterCall (catched TRY-ERROR).\n",
                     "Call from: ", as.character( sys.call( -1 ) ) ) )
      }
      else {
        message( paste( "Error in sfClusterCall (catched TRY-ERROR).\n",
                        "Call from: ", as.character( sys.call( -1 ) ) ) )
        return( result )
      }
    }

    return( result )
  }
  ## Sequential mode.
  else
    return( do.call( fun, list( ... ) ) )
}

##****************************************************************************
## Wrapper for: clusterEvalQ - renamed as indeed "eval" is executed and not
## "evalq".
##****************************************************************************
sfClusterEval <- function( expr, stopOnError=TRUE ) {
  sfCheck();

  if( sfParallel() ) {
    return( sfClusterCall( eval, substitute( expr ), env=globalenv(),
                           stopOnError=stopOnError ) )
  }
  else {
    ## Problems can arise through "enclos", which is default set to parent
    ## and therefore here, too: on this way local variables (higher environments
    ## are visible, which badly are not visible in parallel runs...).
    ## There should be a fix or something.
    return( eval( expr, envir=globalenv(), enclos=parent.frame() ) )
  }
}

## Snows clusterEvalQ uses "eval" and not "evalq", so this wrapper is an alias.
sfClusterEvalQ <- function( expr ) return( sfClusterEval( expr ) )

##****************************************************************************
## Wrapper for: clusterMap.
## Currently not used.
##****************************************************************************
sfClusterMap <- function( fun, ..., MoreArgs=NULL, RECYCLE=TRUE )
  stop( "Currently no wrapper for clusterMap" )

##****************************************************************************
## Wrapper for: clusterApply (snow parallel) - lapply (sequential)
## Adds additional warnings before the execution (esp. in sequential mode,
## where exec works fine but can cause problems runnin in parallel).
##
## PARAMETERS: Parameters like clusterApply
## RETURN:     Result
##****************************************************************************
sfClusterApply <- function( x, fun, ... ) {
  sfCheck();

  checkFunction( fun )

  ## However snow limits list size to cluster nodes in "normal"
  ## execution.
  ## This is a fatal error in parallel mode and a warning in sequential.
  if( length( x ) > sfCpus() ) {
    if( sfParallel() )
      stop( "More list entries as nodes => use sfClusterApplyLB instead. See Snow/Snowfall documentation." )
    else
      warning( "More list entries as nodes => causes error in parallel mode. use sfClusterApplyLB instead." )
  }
  
  if( sfParallel() )
    return( clusterApply( sfGetCluster(), x, fun, ... ) )
  else
    return( lapply( x, fun, ... ) )
}

##****************************************************************************
## Wrapper for: clusterApplyLB (snow parallel) - lapply (sequential)
##
## PARAMETERS: Parameters like clusterApply
## RETURN:     Result
##****************************************************************************
sfClusterApplyLB <- function( x, fun, ... ) {
  sfCheck();

  checkFunction( fun )

  if( sfParallel() )
    return( clusterApplyLB( sfGetCluster(), x, fun, ... ) )
  else
    ## array... korrigieren.
    return( lapply( x, fun, ... ) )
}

##****************************************************************************
## Also snow-Handler handling is hidden to the user.
##
## Wrapper for: parLappy (snow parallel) - lapply (sequential)
##
## As lapply parameters were inkonsitent ("x"/"fun") they were corrected to
## ""x"/"fun".
##
## PARAMETERS: Parameters like lapply
## RETURN:     Result
##****************************************************************************
sfLapply <- function( x, fun, ... ) {
  sfCheck()

  checkFunction( fun )
  
  if( sfParallel() )
    return( parLapply( sfGetCluster(), x, fun, ... ) )
  else
    return( lapply( x, fun, ... ) )
}

##****************************************************************************
## Wrapper for: parSapply (snow parallel) - sapply (sequential)
##
## PARAMETERS: Parameters like sapply
## RETURN:     Result
##****************************************************************************
sfSapply <- function( x, fun, ..., simplify=TRUE, USE.NAMES=TRUE ) {
  sfCheck()

  checkFunction( fun )

  if( sfParallel() )
    return( parSapply( sfGetCluster(), x, fun, ..., simplify=simplify, USE.NAMES=USE.NAMES ) )
  else
    return( sapply( x, fun, ..., simplify=simplify, USE.NAMES=USE.NAMES ) )
}

##****************************************************************************
## Wrapper for: parApply (snow parallel) - apply (sequential)
##
## PARAMETERS: Parameters like apply
## RETURN:     Result
##****************************************************************************
sfApply <- function( x, margin, fun, ... ) {
  sfCheck()

  checkFunction( fun )

  if( sfParallel() )
    return( parApply( sfGetCluster(), x, margin, fun, ... ) )
  else
    return( apply( x, margin, fun, ... ) )
}

sfRapply <- function( x, fun, ... ) {
  stop( "sfRapply does not exists yet. Use Snow's parRapply instead." )
  return( invisible( NULL ) );
}

sfCapply <- function( x, fun, ... ) {
  stop( "sfCapply does not exists yet. Use Snow's parCapply instead." )
  return( invisible( NULL ) );
}

##****************************************************************************
## Wrapper for: parMM (snow parallel) - %*% (sequential)
##
## PARAMETERS: Matrix a, Matrix b
## RETURN:     Result
##****************************************************************************
sfMM <- function( a, b ) {
  sfCheck();

  if( sfParallel() )
    return( parMM( sfGetCluster(), a, b ) )
  else
    return( a %*% b )
}

##****************************************************************************
## Wrappers for the two uniform RNGs used in snow.
## Basically, at the moment these are not used in sequential (means: none
## of the two is included here for sequential execution).
## @TODO Sequential use of the RNGs.
##****************************************************************************
sfClusterSetupSPRNG <- function( seed = round( 2^32 * runif(1) ),
                                 prngkind = "default", para = 0, ... ) {
  sfCheck();

  if( sfParallel() )
    clusterSetupSPRNG( sfGetCluster(), seed, prngkind, para, ... )
  else {
    warning( paste( "Uniform random number streams (currently) not available in serial execution.",
                    "Random numbers may differ in serial & parallel execution." ) )
    set.seed( seed )
  }
}

sfClusterSetupRNGstream <- function( seed=rep( 12345, 6 ), ... ) {
  sfCheck();

  if( sfParallel() )
    clusterSetupRNGstream( sfGetCluster(), seed=seed, ... )
  else {
    warning( paste( "Uniform random number streams (currently) not available in serial execution.",
                    "Random numbers may differ in serial & parallel execution." ) )
    set.seed( seed[1] )
  }
}

sfClusterSetupRNG <- function( type="RNGstream", ... ) {
  sfCheck();

  if( sfParallel() )
    clusterSetupRNG( sfGetCluster(), type=type, ... )
  else {
    warning( paste( "Uniform random number streams (currently) not available in serial execution.",
                    "Random numbers may differ in serial & parallel execution." ) )
  }
}

Try the snowfall package in your browser

Any scripts or data that you put into this service are public.

snowfall documentation built on Nov. 26, 2023, 5:07 p.m.