#' Fit Multivariable Regime Model
#'
#' @param time_series Matrix time series to fit.
#' @param number_of_states Integer number of states.
#' @param verbose Logical for verbose output.
#' @param initial_probabilities Numeric vector (of size number_of_states) of initial probabilities of each state.
#' @param random.start Logical on random start.
#'
#' @return List with regime switching model parameters
#' @export
fit_multivariable <- function(time_series,
init_p = c(0.8, 0.2),
init_trn_mtx,
init_response = list(
mean = matrix(rep(0, ncol(time_series)), ncol = ncol(time_series)),
cov_mtx = diag(ncol(time_series))
),
n_states = 2,
verbose = FALSE,
random.start = TRUE,
use_rk = TRUE) {
if (any(is.na(time_series))) {
stop("NA in time series")
}
if (n_states > 2) {
stop("This function has not been tested for multivariable states > 2")
}
time_series.matrix <- as.matrix(time_series)
init_trn_model_data <- data.frame(1)
init_trn_model_formula <- ~ 1
if (use_rk) {
a_fitted_model <- baum_welch(
data = time_series.matrix,
init_p = init_p,
init_trn_mtx = init_trn_mtx,
init_response = init_response,
init_trn_model_data = init_trn_model_data,
init_trn_model_formula = init_trn_model_formula,
verbose = verbose,
random.start = FALSE,
maxit = 100
)
base <- 1
# trn_model_dist_type <- depmixS4::mlogit()
trn_model_dist_type <- list()
trn_model_dist_type$linkinv <- depmixS4::mlogit()$linkinv
trn_model_dist_type$linkfun <- function(x, base = 1) {
log(x / sum(x))
}
trn_model_x <- model.matrix(init_trn_model_formula, init_trn_model_data)
new_pars <- list()
new_pars$coefficients <- matrix(0, ncol = n_states, nrow = ncol(trn_model_x))
new_pars$coefficients[1, ] <- trn_model_dist_type$linkfun(
init_p,
base = base
)
mask <- matrix(1,nrow = nrow(new_pars$coefficients), ncol = ncol(new_pars$coefficients))
mask[, base] <- 0 # fix base category coefficients to 0
mask <- rbind(0,mask) # fix "bias" nodes to 0
Wts <- mask
Wts[-1,] <- new_pars$coefficients # set starting weights
Wts[Wts == Inf] <- .Machine$double.max.exp # Fix this!!!!
Wts[Wts == -Inf] <- .Machine$double.min.exp # Fix this!!!!!
fit <- nnet::nnet.default(
x = trn_model_x,
y = a_fitted_model$y,
size = 0,
entropy = TRUE,
skip = TRUE,
mask = mask,
Wts = Wts,
trace = FALSE
)
new_pars$coefficients <- matrix(
fit$wts,
ncol = ncol(new_pars$coefficients),
nrow = nrow(new_pars$coefficients) + 1
)
new_pars$coefficients <- t(new_pars$coefficients[-1, ])
trn_model_dens <- trn_model_dist_type$linkinv(
trn_model_x %*% new_pars$coefficients,
base = base
)
factor_density_array <- array(NA, c(nrow(time_series.matrix), 1, ncol(init_trn_mtx)))
for (i in 1:ncol(init_trn_mtx)) {
factor_density_array[ , 1, i] <- mvtnorm::dmvnorm(
time_series.matrix,
a_fitted_model$response[[i]]$mean,
a_fitted_model$response[[i]]$cov_mtx
)
}
trDens <- array(dim = c(1, 2, 2))
trDens[1, , ] <- a_fitted_model$trn_mtx
states <- viterbi(
states = n_states,
A = trDens,
prior = trn_model_dens,
factor_density_array = factor_density_array
)
a_fitted_model$states_ts <- states
transition_matrix <- t(a_fitted_model$trn_mtx)
initial_probabilities <- trn_model_dens
means <- purrr::map(a_fitted_model$response, "mean")
cov_matrices <- purrr::map(a_fitted_model$response, "cov_mtx")
time_series_fit <- a_fitted_model$states_ts
} else {
response_models <- plyr::alply(
.data = 1:n_states,
.margins = 1,
.fun = function(x) {
list(depmixS4::MVNresponse(time_series.matrix ~ 1))
}
)
initial_transition_matrix <- list(
depmixS4::transInit( ~ 1, nstates = n_states, data = data.frame(1), pstart = init_trn_mtx[1, ]),
depmixS4::transInit( ~ 1, nstates = n_states, data = data.frame(1), pstart = init_trn_mtx[2, ])
)
initial_model <- depmixS4::transInit(
formula = init_trn_model_formula,
nstates = n_states,
pstart = init_p,
data = init_trn_model_data
)
a_model <- depmixS4::makeDepmix(
response = response_models,
transition = initial_transition_matrix,
prior = initial_model
)
a_fitted_model <- depmixS4:::em.depmix(
object = a_model,
emc = depmixS4::em.control(
random.start = random.start,
crit = "absolute"
),
verbose = verbose,
random.start = FALSE,
maxit = 100
)
initial_probabilities <- a_fitted_model@init[, 1:2]
transition_matrix <- t(a_fitted_model@trDens[1, , ])
means <- plyr::llply(a_fitted_model@response, function(l) {
l[[1]]@parameters$coefficients
})
cov_matrices <- plyr::llply(a_fitted_model@response, function(l) {
m1 <- matrix(NA, ncol(time_series.matrix), ncol(time_series.matrix))
c1 <- l[[1]]@parameters$Sigma
x <- 1
for (i in 1:nrow(m1)) {
for (j in 1:ncol(m1)) {
if (is.na(m1[i, j])) {
m1[i, j] <- c1[x]
m1[j, i] <- c1[x]
x <- x + 1
}
}
}
m1
})
time_series_fit <- depmixS4::posterior(a_fitted_model)
}
# reorder states according to vol
w <- rep(1 / ncol(time_series.matrix), ncol(time_series.matrix))
states <- plyr::ldply(cov_matrices, function(cm) {
data.frame(sd = sqrt(t(w) %*% cm %*% w))
})
states$state <- 1:nrow(states)
time_series_fit_all <- cbind(time_series_fit, as.matrix(time_series))
states$old_state <- states$state
states <- dplyr::arrange(states, sd)
states$state <- 1:nrow(states)
time_series_fit_all <- dplyr::rename(time_series_fit_all, old_state = state)
time_series_fit_all$order <- 1:nrow(time_series_fit_all)
time_series_fit_all_ordered <- merge(
time_series_fit_all,
states,
by.x = "old_state",
by.y = "old_state"
)
time_series_fit_all_ordered <- dplyr::arrange(time_series_fit_all_ordered, order)
# reorder the S* probabilities
df <- data.frame(current_state = 1:n_states)
df <- merge(df, states, by.x = "current_state", by.y = "old_state")
df <- dplyr::arrange(df, current_state)
idx <- names(time_series_fit_all_ordered) %in% paste("S", df$current_state, sep = "")
names(time_series_fit_all_ordered)[idx] <- paste("S", df$state, sep = "")
time_series_fit_all_ordered$order <- NULL
time_series_fit_all_ordered$old_state <- NULL
means <- means[states$old_state]
cov_matrices <- cov_matrices[states$old_state]
if (all(states$state != states$old_state)) {
transition_matrix <- t(transition_matrix)
diag(transition_matrix) <- rev(diag(transition_matrix))
initial_probabilities <- rev(initial_probabilities)
}
rownames(transition_matrix) <- c("Low Vol", "High Vol")
colnames(transition_matrix) <- c("Low Vol", "High Vol")
state_sds <- plyr::ddply(
.data = time_series_fit_all_ordered,
.variables = "state",
.fun = function(x) {
if (is.null(colnames(time_series))) {
colnames(time_series) <- 1:ncol(time_series)
}
z <- x[, colnames(x) %in% colnames(time_series)]
data.frame(
actual_sd = t(w) %*% cov(z) %*% w,
expected_sd = x$sd[1]
)
}
)
if (!all(order(state_sds$actual_sd) == order(state_sds$expected_sd))) {
warning("state vols don't match actual vol", state_sds)
}
list(
fitted_model = a_fitted_model,
initial_probabilities = initial_probabilities,
means = means,
cov_matrices = cov_matrices,
transition_matrix = transition_matrix,
states = state_sds,
time_series = time_series_fit_all_ordered
)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.