##' Function for running a kalman filter to a subset of potential
##' drift dives.
##'
##' This functions is a wrap to a kalman filter implemented in jags
##' (Just another Gibs sampler). It is a bit slow as it requires
##' require long chains. Current set up is a burn in of 400.000
##' iterations. The chains can be run in parallel. If parallel is set
##' to TRUE, it will run three chains, each one in a different core
##' @title Function for applying a Kalman filter to the drift dives
##' @param Data An object of class 'data.frame' that needs to include
##' at least date (named \code{Date}) and drift rate (named \{NDE})
##' @param update Burn.in length.
##' @param n.iter posterior drawn length.
##' @param n.chains number of chains to be drawn.
##' @param n.adapt adaption length (for jags performance)
##' @param parallel logical. if TRUE, it will run 3 chains in three
##' different cores of the local machine.
##' @return an object of class 'list' that includes: 1) the original
##' dataset (plus some remaping of variables), the model (a jags
##' object), the output etc...
##' @export
kalman <- function(Data, update=400000, n.iter=10000, n.chains=3,
n.adapt=1000, parallel = FALSE){
if (parallel)
return(parallelKalman(Data))
Data <- Data[order(Data$Date),]
Data$time <- (as.numeric(Data$Date)-as.numeric(min(Data$Date)))/3600
requireNamespace('rjags')
bugs.model <- "
model {
## Random walk for mass increments
delta[1] <- 0
for(i in 2:N) {
taudt[i] <- taud/(time[i]-time[i-1])
delta[i] ~ dnorm(delta[i-1],taudt[i])
}
for(i in 1:N) {
rho[i] <- (m0+delta[i])/(v0+V*delta[i])
s[i] <- step(rho[i]-1)-step(1-rho[i])
mu[i] <- a*s[i]*sqrt(abs(rho[i]-1))
z[i] ~ dbern(p)
tau[i] <- tau0 + tau1*z[i]
rate[i] ~ dnorm(mu[i],tau[i])
}
## Halfnormal prior for a
#a ~ dnorm(-1,100)T(,0)
a <- -1.2
taud ~ dgamma(0.001,0.001)
p ~ dbeta(90000,10000)
}
"
Data$time <- (as.numeric(Data$Date)-as.numeric(min(Data$Date)))/3600
model <- rjags::jags.model(textConnection(bugs.model),
data = list(
"rate" = Data$NDE,
"time" = Data$time,
"m0" = 100,
"v0" = 90,
"V" = 1.1,
"tau0" = 1,
"tau1" = 10000,
"taud" = 1,
"N" = nrow(Data)),
n.chains = n.chains,
n.adapt = n.adapt)
before <- Sys.time()
update(model,update)
after <- Sys.time()
duration=after-before
s <- rjags::coda.samples(model,
var=c("mu","z"),
n.iter=n.iter,thin=10)
mns <- sapply(s,colMeans)
res <- list(Data = Data, model = model, kalman = s,
mns = mns, duration = duration, burn.in = update )
## cat('\n\n\n','time duration : ', duration, '\n\n\n')
attr(res, 'update.type') <- 'kalman.single'
class(res) <- append(class(res), 'kalman')
return(res)
}
##' function for update objects generated by the \code{kalman}
##' function
##'
##' Requires an object generated by the function \code{kalman}. If
##' the object is loaded from a previously saved session, the model
##' will need to be recompiled. this function is not parallelized.
##' @title Update the kalman fit
##' @param data an object generated by the Kalman function
##' @param update number of iterations to be used for update the
##' burn.in.
##' @param n.iter length of the drawn after the last burn.in.
##' @param recompile \code{logical}. if TRUE, the model will be recompiled
##' @return same object type as the one returned by \code{kalman}
updateKalman <- function(data,update=100000,n.iter=1000, recompile = FALSE){
requireNamespace('rjags')
if (recompile) data$model$recompile()
before <- Sys.time()
update(data$model,update)
after <- Sys.time()
data$duration=data$duration + (after-before)
data$burn.in <- data$burn.in + update
data$kalman <- rjags::coda.samples(data$model,
var=c("mu","z"),
n.iter=n.iter,thin=10)
data$mns <- sapply(data$kalman,colMeans)
return(data)
}
##' Function for post-processing kalman objects
##'
##' This functions extrac the \code{mus} (recalculated drift rate values) and
##' \code{zetas} (the probability for a given dive to be inside the drifting
##' trajectory) from the \code{kalman} output and place them into the
##' \code{data.frame} used by the \code{kalman} function
##' @title Kalman post-processing
##' @param Data an object of class 'kalman' generated by the local
##' function Kalman (and or updateKalman)
##' @return an object of class \code{data.frame}
postKalman <- function(Data){
requireNamespace('rjags')
stopifnot('kalman' %in% class(Data))
if (attr(Data, 'update.type') == 'kalman.parallel') {
Data <- list(Data = Data[[1]][[1]], model = Data[[1]][[2]],
kalman = list(Data[[1]][[3]],Data[[2]][[3]], Data[[3]][[3]]),
mns = Data[[1]][[4]], duration = Data[[1]][[5]], brun.in = Data[[1]][[6]])
}
output <- Data[[1]]
values <- rowMeans(Data[[4]])
params <- length(values)
output$mus <- values[1:(params/2)]
output$zetas <- values[(params/2 + 1):params ]
return(output)
}
## de momento parece que hay que cargar la libreria jags
##' Function to parallelise the Kalman filter
##'
##' in progress. This function needs still a proper way to process the
##' output. It whould at some point be mixed as there are three kalman
##' instances. By using three cores, the coptutation time is reduced to half
##' @title parallelised version of the Kalman filter.
##' @param Data a data set to be kalmaned
##' @return work in progress
parallelKalman <- function(Data=Data){
DKALP <<- Data
DKALP <- Data
cl <- makeCluster(3)
clusterExport(cl, c('kalman','data','jags.model','coda.samples'))
cores <- seq_along(cl)
r <- clusterApply(cl[cores], cores, function(core) {
if (core == 1) {
o1 <- kalman(DKALP,update=400000, n.iter=10000, n.adapt=1000,n.chains=1)
} else if (core == 2) {
o2 <- kalman(DKALP,update=400000, n.iter=10000, n.adapt=1000, n.chains=1)
}
else if (core == 3) {
o3 <- kalman(DKALP,update=400000, n.iter=10000, n.adapt=1000, n.chains=1)
}
})
stopCluster(cl)
rm(cl)
rm(DKALP, envir=.GlobalEnv)
attr(r, 'update.type') <- 'kalman.parallel'
class(r) <- append(class(r), 'kalman')
return(r)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.