stmv = function( p, runmode=NULL, DATA=NULL, nlogs=100, niter=1,
debug_plot_variable_index=1, debug_data_source="saved.state", debug_plot_log=FALSE, robustify_quantiles=c(0.0005, 0.9995), ... ) {
if (0) {
runmode = NULL
nlogs = 100
niter = 1
DATA=NULL
debug_plot_variable_index=1
robustify_quantiles=c(0.0005, 0.9995)
global_sppoly=NULL
# runmode=c("interpolate", "globalmodel")
# runmode=c("interpolate")
}
#\\ localized modelling of space and time data to predict/interpolate upon a grid
#\\ speed ratings: bigmemory.ram (1), ff (2), bigmemory.filebacked (3)
# -----------------------------------------------------
if (!exists("time_start", p) ) p$time_start = Sys.time()
# if runmode is passed, it overrides everything ..
if (is.null(runmode)) {
s_runmode = names(p$stmv_runmode)
o = rep(TRUE, length(p$stmv_runmode))
TF = c( sapply(p$stmv_runmode, is.logical ))
o[TF] = p$stmv_runmode[TF]
s_runmode = s_runmode[ unlist(o) ]
runmode = intersect(s_runmode, c( "globalmodel", "scale", "scale_interpolate", "interpolate_fast_predictions", "interpolate", "interpolate_correlation_basis", "interpolate_distance_basis", "interpolate_predictions", "save_completed_data", "restart_load", "carstm" ) )
}
message( "Specified runmodes are: ", paste(runmode, ", ", sep="") )
p$nlogs = nlogs
p_add = list(...)
if (length(p_add) > 0 ) p = c(p, p_add )
p = stmv_parameters(p=p ) # fill in parameters with defaults where required
p = stmv_db( p=p, DS="filenames" )
p$ptr = list() # location for data pointers
# set up the data and problem using data objects
tmpfiles = unlist( p$cache)
for (tf in tmpfiles) if (file.exists( tf)) file.remove(tf)
message( "||| Initializing data files ... " )
message( "||| In case something should go wrong, intermediary outputs will be placed at:" )
message( "||| ", p$stmvSaveDir )
message( "||| Monitor status by looking at the output of the following file:")
message( "||| in linux, you can issue the following command: \n" )
message( "||| watch -n 60 cat ", p$stmv_current_status )
# permit passing a function rather than data directly .. less RAM usage in parent call
if (is.null(DATA) ) {
if (exists("DATA", p)) {
if (class(p$DATA)=="character") {
assign("DATA", eval(parse(text=p$DATA) ) )
} else {
DATA = p$DATA
p$DATA = NULL # to reduce RAM requirements
}
}
} else {
if (class(DATA)=="character") {
assign("DATA", eval(parse(text=DATA) ) )
} else {
# nothing to do as we assume DATA is a list
}
}
if (is.null(DATA)) stop( "||| something went wrong with DATA inputs ... ")
if (class(DATA) != "list") stop( "||| DATA should a list ... ")
testvars = c(p$stmv_variables$Y, p$stmv_variables$COV, p$stmv_variables$TIME, p$stmv_variables$LOCS)
if (length(testvars)==1 ) {
withdata = which( is.finite( DATA$input[, testvars] ) )
} else {
withdata = which( is.finite( rowSums(DATA$input[, testvars] ) ) )
}
if (length(withdata) < 1) stop( "Missing data or insufficient data")
DATA$input = DATA$input[withdata, ]
withdata=NULL
# number of time slices
if (!exists("nt", p)) {
p$nt = 1 # default to 1 == no time
if (exists( "ny", p)) p$nt = p$nt * p$ny # annual time slices
if (exists( "nw", p)) p$nt = p$nt * p$nw # sub-annual time slices
}
if ( !exists("stmv_tmin", p)) p$stmv_tmin = max(1, trunc( p$nt / 5) ) # min no of time slices in twostep modelling
# prediction times for space.annual methods, treat time as independent timeslices
if ( !exists("prediction_ts", p)) p$prediction_ts = 1
global_model_do = FALSE
if (p$stmv_global_modelengine !="none" ) global_model_do = TRUE
message( "\n")
message( "||| Initializing temporary storage of data and output files... ")
message( "||| These can be large files and take a minute ... ")
message( "||| Try to turn off swap/paging such that only RAM is used. ")
stmv_db( p=p, DS="cleanup" )
global_sppoly=NULL
if (exists("sppoly", DATA)) global_sppoly = sppoly
Ydata = as.matrix( DATA$input[, p$stmv_variables$Y ] )
Ydata_datarange = range( Ydata, na.rm=TRUE )
if (!is.null(robustify_quantiles)) Ydata_datarange = quantile( Ydata, probs=robustify_quantiles, na.rm=TRUE )
Yq_link = p$stmv_global_family$linkfun( Ydata_datarange ) # convert raw data range to link range
if (exists("stmv_Y_transform", p)) Ydata = p$stmv_Y_transform$transf( Ydata )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Y = big.matrix( nrow=nrow(Ydata), ncol=1, type="double" )
tmp_Y[] = Ydata[]
p$ptr$Y = bigmemory::describe( tmp_Y )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Y = p$cache$Y
bigmemory::as.big.matrix( Ydata, type="double", backingfile=basename(p$bm$Y), descriptorfile=basename(p$cache$Y), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Y = ff( Ydata, dim=dim(Ydata), file=p$cache$Y, overwrite=TRUE )
}
# if there is a global model update (overwrite) Ydata with residuals
if ( any(grepl("globalmodel", runmode) ) ) {
if ( global_model_do ) {
stmv_global_model( p=p, DS="global_model.redo", B=DATA$input )
if ( any(grepl("globalmodel.only", runmode))) return( stmv_global_model( p=p, DS="global_model" ) )
# model complete .. now predict to get residuals
if ( p$stmv_global_modelengine != "userdefined" ) Ydata=NULL # only needed with userdefined models
Y = stmv_attach( p$storage_backend, p$ptr$Y )
Y[] = stmv_global_model(p=p, DS="residuals", Yq_link=Yq_link, Ydata=Ydata)[]
gc()
}
}
Ydata = NULL
# data coordinates
Yloc = as.matrix( DATA$input[, p$stmv_variables$LOCS ])
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Yloc = big.matrix( nrow=nrow(Yloc), ncol=ncol(Yloc), type="double" )
tmp_Yloc[] = Yloc[]
p$ptr$Yloc = bigmemory::describe( tmp_Yloc )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Yloc = p$cache$Yloc
bigmemory::as.big.matrix( Yloc, type="double", backingfile=basename(p$bm$Yloc), descriptorfile=basename(p$cache$Yloc), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Yloc = ff( Yloc, dim=dim(Yloc), file=p$cache$Yloc, overwrite=TRUE )
}
Yloc = NULL
# independent stmv_variables/ covariate
if (exists("COV", p$stmv_variables)) {
if (length(p$stmv_variables$COV) > 0) {
Ycov = as.matrix( DATA$input[ , p$stmv_variables$COV ] )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Ycov = big.matrix( nrow=nrow(Ycov), ncol=ncol(Ycov), type="double")
tmp_Ycov[] = Ycov[]
p$ptr$Ycov = bigmemory::describe( tmp_Ycov )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Ycov = p$cache$Ycov
bigmemory::as.big.matrix( Ycov, type="double", backingfile=basename(p$bm$Ycov), descriptorfile=basename(p$cache$Ycov), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Ycov = ff( Ycov, dim=dim(Ycov), file=p$cache$Ycov, overwrite=TRUE )
}
Ycov= NULL
}
}
# data times
if ( exists("TIME", p$stmv_variables) ) {
Ytime = as.matrix( DATA$input[, p$stmv_variables$TIME ] )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Ytime = big.matrix( nrow=nrow(Ytime), ncol=ncol(Ytime), type="double" )
tmp_Ytime[] = Ytime[]
p$ptr$Ytime = bigmemory::describe( tmp_Ytime )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Ytime = p$cache$Ytime
bigmemory::as.big.matrix( Ytime, type="double", backingfile=basename(p$bm$Ytime), descriptorfile=basename(p$cache$Ytime), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Ytime = ff( Ytime, dim=dim(Ytime), file=p$cache$Ytime, overwrite=TRUE )
}
Ytime =NULL;
}
Y = stmv_attach( p$storage_backend, p$ptr$Y )
Yloc = stmv_attach( p$storage_backend, p$ptr$Yloc )
Yi = 1:length(Y) # index with useable data
bad = which( !is.finite( Y[]))
if (length(bad)> 0 ) Yi[bad] = NA
# data locations
bad = which( !is.finite( rowSums(Yloc[])))
if (length(bad)> 0 ) Yi[bad] = NA
# data locations
if (exists("COV", p$stmv_variables)) {
if (length(p$stmv_variables$COV) > 0) {
Ycov = stmv_attach( p$storage_backend, p$ptr$Ycov )
if (length(p$stmv_variables$COV)==1) {
bad = which( !is.finite( Ycov[] ))
} else {
bad = which( !is.finite( rowSums(Ycov[])))
}
if (length(bad)> 0 ) Yi[bad] = NA
Yi = na.omit(Yi)
}
}
# data locations
if (exists("TIME", p$stmv_variables)) {
Ytime = stmv_attach( p$storage_backend, p$ptr$Ytime )
bad = which( !is.finite( Ytime[] ))
if (length(bad)> 0 ) Yi[bad] = NA
Yi = na.omit(Yi)
}
bad = NULL
Yi = as.matrix(Yi)
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Yi = big.matrix( nrow=nrow(Yi), ncol=ncol(Yi), type="double" )
tmp_Yi[] = Yi[]
p$ptr$Yi = bigmemory::describe( tmp_Yi )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Yi = p$cache$Yi
bigmemory::as.big.matrix( Yi, type="double", backingfile=basename(p$bm$Yi), descriptorfile=basename(p$cache$Yi), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Yi = ff( Yi, dim=dim(Yi), file=p$cache$Yi, overwrite=TRUE )
}
Yi=NULL
#############
nPloc = nrow(DATA$output$LOCS)
if (exists("COV", p$stmv_variables)) {
if (length(p$stmv_variables$COV) > 0) {
# this needs to be done as Prediction covars need to be structured as lists
p$ptr$Pcov = list()
tmp_Pcov = list()
nc_cov =NULL
for ( covname in p$stmv_variables$COV ) {
Pcovdata = as.matrix( DATA$output$COV[[covname]] )
nc_cov = c( nc_cov, ncol(Pcovdata) ) # test no. cols
nPcovloc = nrow(Pcovdata)
if (nPcovloc != nPloc) {
message( "||| Inconsistency between number of prediction locations and prediction covariates: input data needs to be checked:")
message( "||| Usually this due to bathymetry and temperature being out of sync")
print(str(DATA))
stop()
}
attr( Pcovdata, "dimnames" ) = NULL
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Pcov[[covname]] = big.matrix( nrow=nPcovloc, ncol=ncol(Pcovdata), type="double" )
tmp_Pcov[[covname]][] = Pcovdata[]
p$ptr$Pcov[[covname]] = bigmemory::describe( tmp_Pcov[[covname]] )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Pcov[[covname]] = p$cache$Pcov[[covname]]
bigmemory::as.big.matrix( Pcovdata, type="double", backingfile=basename(p$bm$Pcov[[covname]]), descriptorfile=basename(p$cache$Pcov[[covname]]), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Pcov[[covname]] = ff( Pcovdata, dim=dim(Pcovdata), file=p$cache$Pcov[[covname]], overwrite=TRUE )
}
Pcovdata = NULL
}
p$all.covars.static = ifelse( any(nc_cov > 1), FALSE, TRUE )
nc_cov = NULL
} else {
p$all.covars.static = TRUE # degenerate case where the model is an intercept-only model (to remove mean effects)
}
}
# predictions and associated stats
sP = matrix( NaN, nrow=nPloc, ncol=p$nt )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_P = big.matrix( nrow=nrow(sP), ncol=ncol(sP), type="double" )
tmp_P[] = sP[]
p$ptr$P = bigmemory::describe( tmp_P )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$P = p$cache$P
bigmemory::as.big.matrix( sP, type="double", backingfile=basename(p$bm$P), descriptorfile=basename(p$cache$P), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$P = ff( sP, dim=dim(sP), file=p$cache$P, overwrite=TRUE )
}
sP = NULL
# count of prediction estimates
sPn = matrix( NaN, nrow=nPloc, ncol=p$nt )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Pn = big.matrix( nrow=nrow(sPn), ncol=ncol(sPn), type="double" )
tmp_Pn[] = sPn[]
p$ptr$Pn = bigmemory::describe( tmp_Pn )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Pn = p$cache$Pn
bigmemory::as.big.matrix( sPn, type="double", backingfile=basename(p$bm$Pn), descriptorfile=basename(p$cache$Pn), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Pn = ff( sPn, dim=dim(sPn), file=p$cache$Pn, overwrite=TRUE )
}
sPn = NULL
# sd of prediction estimates
sPsd = matrix( NaN, nrow=nPloc, ncol=p$nt )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Psd = big.matrix( nrow=nrow(sPsd), ncol=ncol(sPsd), type="double" )
tmp_Psd[] = sPsd[]
p$ptr$Psd =bigmemory::describe( tmp_Psd )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Psd = p$cache$Psd
bigmemory::as.big.matrix( sPsd, type="double", backingfile=basename(p$bm$Psd), descriptorfile=basename(p$cache$Psd), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Psd = ff( sPsd, dim=dim(sPsd), file=p$cache$Psd, overwrite=TRUE )
}
sPsd = NULL
# prediction coordinates
Ploc = as.matrix( DATA$output$LOCS )
attr( Ploc, "dimnames" ) = NULL
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Ploc = big.matrix( nrow=nrow(Ploc), ncol=ncol(Ploc), type="double" )
tmp_Ploc[] = Ploc[]
p$ptr$Ploc = bigmemory::describe( tmp_Ploc )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Ploc = p$cache$Ploc
bigmemory::as.big.matrix( Ploc, type="double", backingfile=basename(p$bm$Ploc), descriptorfile=basename(p$cache$Ploc), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Ploc = ff( Ploc, dim=dim(Ploc), file=p$cache$Ploc, overwrite=TRUE )
}
Ploc = NULL;
################
if (global_model_do ) {
sP0 = matrix( NaN, nrow=nPloc, ncol=p$nt )
if (!is.null(sP0)) {
if (p$storage_backend == "bigmemory.ram" ) {
tmp_P0= big.matrix( nrow=nrow(sP0), ncol=ncol(sP0) , type="double" )
tmp_P0[] = sP0[]
p$ptr$P0 = bigmemory::describe(tmp_P0 )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$P0 = p$cache$P0
bigmemory::as.big.matrix( sP0, type="double", backingfile=basename(p$bm$P0), descriptorfile=basename(p$cache$P0), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$P0 = ff( sP0, dim=dim(sP0), file=p$cache$P0, overwrite=TRUE )
}
sP0 = NULL
}
sP0sd = matrix( NaN, nrow=nPloc, ncol=p$nt )
if (!is.null(sP0sd)) {
if (p$storage_backend == "bigmemory.ram" ) {
tmp_P0sd= big.matrix( nrow=nrow(sP0sd), ncol=ncol(sP0sd) , type="double" )
tmp_P0sd[] = sP0sd[]
p$ptr$P0sd = bigmemory::describe(tmp_P0sd )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$P0sd = p$cache$P0sd
bigmemory::as.big.matrix( sP0sd, type="double", backingfile=basename(p$bm$P0sd), descriptorfile=basename(p$cache$P0sd), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$P0sd = ff( sP0sd, dim=dim(sP0sd), file=p$cache$P0sd, overwrite=TRUE )
}
sP0sd=NULL;
}
}
# --------------------------------
# completed data structures .. save params and clear up RAM
DATA = NULL;
if (!is.character(p$DATA)) p$DATA = NULL # where data was sent, remove it to free RAM
file.create( p$stmv_current_status, showWarnings=FALSE )
p <<- p # force copy to parent (calling) environment (to remove "p$DATA" )
stmv_db( p=p, DS="save.parameters" ) # save in case a restart is required .. mostly for the pointers to data
gc()
# --------------------------------
# analysis begins
# check if resetimation is required
niterations = 1 # default .. no global model nor no covariates
if ( exists("COV", p$stmv_variables )) {
if (length(p$stmv_variables$COV) > 0) {
niterations = niter
}
}
##############
sbox = Sloc = NULL
if ("carstm" %in% runmode ) {
if (!is.null(global_sppoly)) {
# Slocs == Plocs == centroids of global_sppoly
nSloc = nPloc
Sloc = as.matrix( coordinates( global_sppoly) ) # centroids , nominal
nSloc = nrow(Sloc)
if (nSloc != nPloc) stop( "Global polygons provided seem to have mismatched stats locations.")
}
}
if (is.null(Sloc)) {
# default: statistics storage matrix ( aggregation window, coords ) .. no inputs required
sbox = list(
plats = seq( p$corners$plat[1], p$corners$plat[2], by=p$stmv_distance_statsgrid ),
plons = seq( p$corners$plon[1], p$corners$plon[2], by=p$stmv_distance_statsgrid ) )
# statistics coordinates
Sloc = as.matrix( expand_grid_fast( sbox$plons, sbox$plats ))
nSloc = nrow(Sloc)
}
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Sloc = big.matrix(nrow=nSloc, ncol=ncol(Sloc), type="double" )
tmp_Sloc[] = Sloc[]
p$ptr$Sloc = bigmemory::describe( tmp_Sloc )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Sloc = p$cache$Sloc
bigmemory::as.big.matrix( Sloc, type="double", backingfile=basename(p$bm$Sloc), descriptorfile=basename(p$cache$Sloc), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Sloc = ff( Sloc, dim=dim(Sloc), file=p$cache$Sloc, overwrite=TRUE )
}
Sloc = NULL
sbox = NULL
sSflag = matrix( stmv_error_codes()[["todo"]], nrow=nSloc, ncol=1 )
if (p$storage_backend == "bigmemory.ram" ) {
tmp_Sflag = big.matrix(nrow=length(sSflag), ncol=1, type="double" )
tmp_Sflag[] = sSflag[]
p$ptr$Sflag = bigmemory::describe( tmp_Sflag )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$Sflag = p$cache$Sflag
bigmemory::as.big.matrix( sSflag, type="double", backingfile=basename(p$bm$Sflag), descriptorfile=basename(p$cache$Sflag), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$Sflag = ff( sSflag, dim=dim(sSflag), file=p$cache$Sflag, overwrite=TRUE )
}
sSflag = NULL
# determine stats to retain / expect
res = NULL
message("Running a test interpolation to obtain dimensionality of results")
if ( any(grepl("carstm", runmode )) ) {
res = stmv_interpolate_polygons( p=p, global_sppoly=global_sppoly, stmv_au_buffer_links=p$stmv_au_buffer_links, stmv_au_distance_reference=p$stmv_au_distance_reference, just_testing_variablelist=TRUE )
} else {
res = stmv_interpolate_lattice( p=p, just_testing_variablelist=TRUE )
}
# str( res )
# print( str(res$stmv_stats) )
# require knowledge of size of stats output which varies with a given type of analysis
p$statsvars = c("sdTotal", "rsquared", "ndata")
if (!is.null(res)) p$statsvars = names(res$stmv_stats )
if (! "carstm" %in% runmode ) p$statsvars = c(p$statsvars, "sdSpatial", "sdObs", "phi", "nu", "localrange" )
if (exists("TIME", p$stmv_variables) ) p$statsvars = c( p$statsvars, "ar_timerange", "ar_1" )
p$statsvars = unique( p$statsvars )
message( "Statistics include: ", paste( p$statsvars, ", ", sep="") )
res = NULL
sS = matrix( NaN, nrow=nSloc, ncol=length( p$statsvars ) ) # NA forces into logical
if (p$storage_backend == "bigmemory.ram" ) {
tmp_S = big.matrix(nrow=nSloc, ncol=length( p$statsvars ), type="double" )
tmp_S[] = sS[]
p$ptr$S = bigmemory::describe( tmp_S )
}
if (p$storage_backend == "bigmemory.filebacked" ) {
p$ptr$S = p$cache$S
bigmemory::as.big.matrix( sS, type="double", backingfile=basename(p$bm$S), descriptorfile=basename(p$cache$S), backingpath=p$stmvSaveDir )
}
if (p$storage_backend == "ff" ) {
p$ptr$S = ff( sS, dim=dim(sS), file=p$cache$S, overwrite=TRUE )
}
sS = NULL
# debug(stmv_statistics_status)
stmv_statistics_status( p=p, reset="features", verbose=FALSE ) # flags/filter stats locations base dupon prediction covariates. .. speed up and reduce storage
devold = Inf
eps = 1e-9
# NOTE:: must not sink the following memory allocation steps into a deeper function as
# NOTE:: bigmemory RAM seems to lose the pointers if they are not made simultaneously ?
# data to be worked upon .. either the raw data or covariate-residuals
nn = 1
for ( nn in 1:niterations ) {
#
message( "Iteration ", nn, " of ", niterations, "\n" )
if ( global_model_do ) {
# create prediction suface .. additive offsets
# test to see if all covars are static as this can speed up the initial predictions
message ( "\n", "||| Entering Predicting global effect of covariates at each prediction location: ", format(Sys.time()) , "\n" )
message( "||| This can take a while (usually a few minutes but hours if complex) ... ")
p$time_covariates_0 = Sys.time()
global_model = stmv_global_model( p=p, DS="global_model")
if (is.null(global_model)) stop("Global model not found.")
dev = global_model$deviance
message("Model Deviance: ", dev)
if (nn > 1) {
if (abs(dev - devold)/(0.1 + abs(dev)) < eps ) break() # globalmodel_converged
iYP = stmv_index_predictions_to_observations(p) #### this only works for lattice mode .. areal units will require an alternate ... TO DO
inputdata = P[ iYP ] # P are the spatial/spatio-temporal random effects (on link scale)
iYP = NULL
iYP_nomatch = which(!is.finite(iYP))
inputdata[ iYP_nomatch ] = 0 # E[RaneFF] = 0
iYP_nomatch = NULL
if (exists("linkinv", p$stmv_global_family )) {
# return to user scale (that of Y)
inputdata = p$stmv_global_family$linkinv( inputdata )
}
global_model$model[, p$stmv_variables$Y ] = global_model$model[, p$stmv_variables$Y ] - inputdata # update to current estimate of fixed effects
inputdata = global_model$model
global_model = NULL
gc()
global_model = stmv_global_model( p=p, DS="global_model.redo", B=inputdata, savedata=FALSE ) # do not overwrite initial model
inputdata = NULL
}
devold = dev
gc()
message("Creating fixed effects predictons")
parallel_run( FUNC=stmv_predict_globalmodel, p=p, runindex=list( pnt=1:p$nt ), Yq_link=Yq_link, global_model=global_model )
invisible( stmv_db(p=p, DS="save_current_state", runmode="meanprocess", datasubset=c("P0", "P0sd") ) )
Ydata = residuals(global_model, type="working") # ie. internal (link) scale
# Yq_link:: could operate upon quantiles of residuals but in poor models this can hyper inflate errors and slow down the whole estimation process
# truncating using data range as a crude approximation of overall residual and prediction scale
lb = which( Ydata < Yq_link[1])
ub = which( Ydata > Yq_link[2])
if (length(lb) > 0) Ydata[lb] = Yq_link[1]
if (length(ub) > 0) Ydata[ub] = Yq_link[2]
Y = stmv_attach( p$storage_backend, p$ptr$Y )
Y[] = Ydata[] # update "Ydata" ... ( residuals)
Ydata = NULL
global_model = NULL
p$time_covariates = round(difftime( Sys.time(), p$time_covariates_0 , units="hours"), 3)
message( paste( "||| Time taken to predict covariate surface (hours):", p$time_covariates ) )
}
# -----------------------------------------------------
if ("carstm" %in% runmode ) {
p$current_runmode = "carstm"
# this models, and predicts in same step (via inla)
message ( "\n", "||| Entering carstm areal unit modelling: ", format(Sys.time()) )
if ( "restart_load" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode=p$restart_load, datasubset=c("P", "Psd", "Pn", "statistics" ) ) )
if ( global_model_do ) invisible( stmv_db(p=p, DS="load_saved_state", runmode="meanprocess", datasubset=c("P0", "P0sd") ) )
stmv_statistics_status( p=p, reset=c( "all", "complete", "incomplete", "features" ), verbose=FALSE ) # required to start as scale determination uses Sflags too
}
p$time_start_current_runmode = Sys.time()
for ( j in 1:length(p$stmv_distance_interpolation) ) {
current_runmode_iter = paste( p$current_runmode, p$stmv_distance_scale[j] , sep="_")
message( "\n||| Entering carstm iteration < ", current_runmode_iter, " > : ", format(Sys.time()) )
runmode_cpus = p$stmv_runmode[[ p$current_runmode ]] #reset
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
currentstatus = stmv_statistics_status( p=p, reset="flags", reset_flags=c("insufficient_data", "unknown" ) )
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < (2*length(runmode_clusters)) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
parallel_run( stmv_interpolate_polygons,
p=p,
runmode=p$current_runmode,
localrange_interpolation=p$stmv_distance_interpolation[j],
global_sppoly=global_sppoly,
stmv_au_buffer_links=p$stmv_au_buffer_links,
stmv_au_distance_reference=p$stmv_au_distance_reference,
runindex=list( locs=sample( currentstatus$todo )
) )
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c( "statistics", "P", "Psd", "Pn" ) ) ) # temp save to disk
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Carstm phase", p$current_runmode, "completed ...") ) # final update before continuing
}
message( "||| Time used for carstm estimation: ", format(difftime( Sys.time(), p$time_start_current_runmode )), "\n" )
}
# -----------------------------------------------------
if ( "scale" %in% runmode ) {
p$current_runmode = "scale"
message ( "\n", "||| Entering spatial scale (variogram) determination: ", format(Sys.time()) )
if ( "restart_load" %in% runmode ) invisible( stmv_db(p=p, DS="load_saved_state", runmode="scale", datasubset="statistics" ) )
p$time_start_current_runmode = Sys.time()
for ( j in 1:length( p$stmv_distance_scale ) ) {
current_runmode_iter = paste( p$current_runmode, p$stmv_distance_scale[j] , sep="_")
message( "\n||| Entering scale iteration < ", current_runmode_iter, " > : ", format(Sys.time()) )
runmode_cpus = p$stmv_runmode[[ p$current_runmode ]] #reset
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
currentstatus = stmv_statistics_status( p=p, reset="flags", reset_flags=c("insufficient_data", "variogram_failure", "variogram_range_limit", "unknown" ) )
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < (2*length(runmode_clusters)) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
parallel_run( stmv_scale, p=p, stmv_localrange=p$stmv_distance_scale[j], runindex=list( locs=sample( currentstatus$todo )) )
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset="statistics") )
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Iteration completed:", current_runmode_iter ) ) # final update before continuing
}
slog = stmv_logfile(p=p, flag= "Scaling phase completed" ) # final update before continuing
message( "||| Time used for scale estimation: ", format(difftime( Sys.time(), p$time_start_current_runmode )), "\n" )
}
# -----------------------------------------------------
if ("interpolate" %in% runmode | "interpolate_correlation_basis" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode="scale", datasubset="statistics" ))
if ( "restart_load" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode=p$restart_load, datasubset=c("P", "Psd", "Pn" ) ) )
if ( global_model_do ) invisible( stmv_db(p=p, DS="load_saved_state", runmode="meanprocess", datasubset=c("P0", "P0sd") ) )
stmv_statistics_status( p=p, reset=c( "all", "complete", "incomplete", "features" ), verbose=FALSE ) # required to start as scale determination uses Sflags too
}
if ("interpolate" %in% runmode) p$current_runmode = "interpolate"
if ("interpolate_correlation_basis" %in% runmode) p$current_runmode = "interpolate_correlation_basis"
p$time_start_current_runmode = Sys.time()
runmode_cpus = p$stmv_runmode[[ p$current_runmode ]] #reset
for ( j in 1:length(p$stmv_autocorrelation_interpolation) ) {
current_runmode_iter = paste( p$current_runmode, p$stmv_autocorrelation_interpolation[j] , sep="_")
if (exists("stmv_fft_filter", p)) {
if (grepl("fast_and_exhaustive_predictions", p$stmv_fft_filter)) {
# do a quick cycle first
p$stmv_fft_filter = paste( p$stmv_fft_filter, "fast_predictions" )
message( "\n||| Entering < Fast ", current_runmode_iter, " > : ", format(Sys.time()) )
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
currentstatus = stmv_statistics_status( p=p, reset=c( "incomplete" ) ) # flags/filter stats locations base dupon prediction covariates. .. speed up and reduce storage
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < (2*length(runmode_clusters)) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
invisible( parallel_run( stmv_interpolate_lattice,
p=p,
localrange_interpolation_correlation = p$stmv_autocorrelation_interpolation[j] ,
runindex=list( locs=sample( currentstatus$todo ))
) )
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd", "statistics") ) )
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Fast Interpolation correlation basis phase", current_runmode_iter, "completed ...") ) # final update before continuing
p$stmv_fft_filter = gsub( "fast_predictions", "", p$stmv_fft_filter ) # remove fast ... now moving to slow/exhaustive
}
}
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
currentstatus = stmv_statistics_status( p=p, reset=c( "incomplete" ) ) # flags/filter stats locations base dupon prediction covariates. .. speed up and reduce storage
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < length(runmode_clusters) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
message( "\n||| Entering < Exhaustive ", current_runmode_iter, " > : ", format(Sys.time()) )
invisible( parallel_run( stmv_interpolate_lattice,
p=p,
localrange_interpolation_correlation = p$stmv_autocorrelation_interpolation[j],
runindex=list( locs=sample( currentstatus$todo ))
) )
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd", "statistics") ) )
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Exhaustive Interpolation correlation basis phase", current_runmode_iter, "completed ...") ) # final update before continuing
}
slog = stmv_logfile(p=p, flag= "interpolate_correlation_basis phase completed" ) # final update before continuing
message( paste( "Time used for <interpolations", ">: ", format(difftime( Sys.time(), p$time_start_current_runmode )), "\n" ) )
}
if(0) {
stmv_db(p=p, DS="load_saved_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd") )
stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd"))
P = stmv_attach( p$storage_backend, p$ptr$P )
Ploc = stmv_attach( p$storage_backend, p$ptr$Ploc )
if (length(dim(P)) > 1 ) {
for (i in 1:p$nt) print(lattice::levelplot( P[,i] ~ Ploc[,1] + Ploc[,2], col.regions=heat.colors(100), scale=list(draw=FALSE), aspect="iso" ))
} else {
lattice::levelplot( P[] ~ Ploc[,1] + Ploc[,2], col.regions=heat.colors(100), scale=list(draw=FALSE), aspect="iso" )
}
}
# -----------------------------------------------------
if ("interpolate_distance_basis" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode="scale", datasubset="statistics" ))
if ( "restart_load" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode=p$restart_load, datasubset=c("P", "Psd", "Pn" ) ) )
if ( global_model_do ) invisible( stmv_db(p=p, DS="load_saved_state", runmode="meanprocess", datasubset=c("P0", "P0sd") ) )
stmv_statistics_status( p=p, reset=c( "all", "complete", "incomplete", "features" ), verbose=FALSE ) # required to start as scale determination uses Sflags too
}
p$current_runmode = "interpolate_distance_basis"
p$time_start_current_runmode = Sys.time()
for ( j in 1:length(p$stmv_distance_interpolation) ) {
current_runmode_iter = paste( p$current_runmode, p$stmv_distance_interpolation[j], sep="_")
message( "\n||| Entering <", current_runmode_iter, " > : ", format(Sys.time()) )
runmode_cpus = p$stmv_runmode[[ p$current_runmode ]] #reset
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
currentstatus = stmv_statistics_status( p=p, reset=c( "incomplete" ) ) # flags/filter stats locations base dupon prediction covariates. .. speed up and reduce storage
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < length(runmode_clusters) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
invisible( parallel_run( stmv_interpolate_lattice,
p=p,
localrange_interpolation = p$stmv_distance_interpolation[j],
runindex=list( locs=sample( currentstatus$todo ) )
) )
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd", "statistics") ) )
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Interpolation distance basis phase", current_runmode_iter, "completed ...") ) # final update before continuing
}
slog = stmv_logfile(p=p, flag= "interpolate_distance_basis phase completed" ) # final update before continuing
message( paste( "Time used for <interpolations", ">: ", format(difftime( Sys.time(), p$time_start_current_runmode )), "\n" ) )
}
if(0) {
stmv_db(p=p, DS="load_saved_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd") )
stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd") )
P = stmv_attach( p$storage_backend, p$ptr$P )
Ploc = stmv_attach( p$storage_backend, p$ptr$Ploc )
if (length(dim(P)) > 1 ) {
for (i in 1:p$nt) print(lattice::levelplot( P[,i] ~ Ploc[,1] + Ploc[,2], col.regions=heat.colors(100), scale=list(draw=FALSE), aspect="iso" ))
} else {
lattice::levelplot( P[] ~ Ploc[,1] + Ploc[,2], col.regions=heat.colors(100), scale=list(draw=FALSE), aspect="iso" )
}
}
} #end loop
# --------------------
if ("interpolate_predictions" %in% runmode) {
message( "\n||| Entering <interpolate force complete> stage: ", format(Sys.time()), "\n" )
invisible( stmv_db(p=p, DS="load_saved_state", runmode="scale", datasubset="statistics" ))
if ( "restart_load" %in% runmode ) {
invisible( stmv_db(p=p, DS="load_saved_state", runmode=p$restart_load, datasubset=c("P", "Psd", "Pn" ) ) )
if ( global_model_do ) invisible( stmv_db(p=p, DS="load_saved_state", runmode="meanprocess", datasubset=c("P0", "P0sd") ) )
stmv_statistics_status( p=p, reset=c( "all", "complete", "incomplete", "features" ), verbose=FALSE ) # required to start as scale determination uses Sflags too
}
p$current_runmode = "interpolate_predictions"
p$time_start_current_runmode = Sys.time()
for ( j in 1:length(p$stmv_distance_interpolate_predictions) ) {
current_runmode_iter = paste( p$current_runmode, p$stmv_distance_interpolate_predictions[j], sep="_")
runmode_cpus = p$stmv_runmode[[ p$current_runmode ]] #reset
ni = length( runmode_cpus )
jcpu = ifelse( ni > 1, ifelse( j > ni, ni, j ), 1 ) # in case index j > runmode_clusters provided
runmode_clusters = runmode_cpus [[ jcpu ]]
stmv_statistics_status( p=p, reset=c( "complete" ), verbose=FALSE )
currentstatus = stmv_statistics_status( p=p, reset=c( "incomplete" ) ) # flags/filter stats locations base dupon prediction covariates. .. speed up and reduce storage
if ( currentstatus$n.todo == 0 ) break()
if ( currentstatus$n.todo < (2*length(runmode_clusters)) ) runmode_clusters = runmode_clusters[1] # drop to serial mode
p$clusters = runmode_clusters
invisible( parallel_run( stmv_interpolate_predictions,
p=p,
localrange_interpolation = p$stmv_distance_interpolate_predictions[j],
runindex=list( locs=sample( currentstatus$todo ))
) )
stmv_statistics_status( p=p, verbose=FALSE ) # quick update before logging
slog = stmv_logfile(p=p, flag= paste("Exhaustive Interpolation correlation basis phase", current_runmode_iter, "completed ...") ) # final update before continuing
invisible( stmv_db(p=p, DS="save_current_state", runmode=p$current_runmode, datasubset=c("P", "Pn", "Psd", "statistics") ) )
}
message( paste( "Time used for <interpolations", ">: ", format(difftime( Sys.time(), p$time_start_current_runmode )), "\n" ) )
}
# -----------------------------------------------------
if ("save_completed_data" %in% runmode) {
message( "\n||| Saving final results: ", format(Sys.time()), "\n" )
stmv_db( p=p, DS="stmv.results" ) # save to disk for use outside stmv*, returning to user scale
}
message( "||| Temporary files exist in case you need to examine them or restart a process. ")
message( "||| You can delete them by running: stmv_db( p=p, DS='cleanup.all' ), once you are done. ")
message( "||| Or by adding 'cleanup.all' to the runmodes" )
if ("cleanup.all" %in% runmode) {
if ( p$storage_backend !="bigmemory.ram" ) {
resp = readline( "||| Delete temporary files? Type to confirm <YES>: ")
if (resp=="YES") stmv_db( p=p, DS="cleanup.all" )
}
}
message( "||| NOTE:: parameter 'p' has been updated in case you need to re-run something \n" )
p$time_total = round( difftime( Sys.time(), p$time_start, units="hours" ), 3)
message( "||| Total time taken (hours):", p$time_total )
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.