ParallelRandomSVD2 <- function(X, fun.scaling,
ind.train,
block.size,
K, I,
use.Eigen,
backingpath,
ncores) {
# parameters
L <- K + 12
n <- length(ind.train)
m <- ncol(X)
I <- I + 1
stopifnot((n - K) >= (I * L))
TIME <- 0.01
tmp.lock.name <- "mutex"
tmp.lock.names <- paste(tmp.lock.name, Sys.getpid(), 1:4, sep = '-')
ifelse(file.exists(tmp.lock.names), FALSE,
file.create(tmp.lock.names))
# shared big.matrices
G <- big.matrix(n, L, type = "double", shared = TRUE)
G[] <- stats::rnorm(n * L) # G0
H <- big.matrix(m, L * I, type = "double", shared = TRUE)
U <- big.matrix(m, L * I, type = "double", shared = TRUE)
T.t <- big.matrix(n, L * I, type = "double", shared = TRUE, init = 0)
remains <- big.matrix(4, I - 1, type = "integer",
shared = TRUE, init = ncores)
# descriptors
X.desc <- describe(X)
G.desc <- describe(G)
H.desc <- describe(H)
U.desc <- describe(U)
T.t.desc <- describe(T.t)
r.desc <- describe(remains)
intervals <- CutBySize(m, nb = ncores)
if (is.seq <- (ncores == 1)) {
registerDoSEQ()
} else {
cl <- parallel::makeCluster(ncores)
doParallel::registerDoParallel(cl)
}
scaling <- foreach(ic = seq_len(ncores), .combine = 'cbind') %dopar% {
lims <- intervals[ic, ]
# get big.matrices
X.part <- sub.big.matrix(X.desc,
firstCol = lims[1],
lastCol = lims[2],
backingpath = backingpath)
# https://www.r-bloggers.com/too-much-parallelism-is-as-bad/
multi <- (!is.seq) && detect_MRO()
if (multi) nthreads.save <- RevoUtilsMath::setMKLthreads(1)
# scaling
means_sds <- fun.scaling(X.part, ind.train)
means <- means_sds$mean
sds <- means_sds$sd
rm(means_sds)
# parameters
m.part <- ncol(X.part)
intervals <- CutBySize(m.part, block.size)
nb.block <- nrow(intervals)
# computation of G and H
G <- attach.big.matrix(G.desc)
H.part <- sub.big.matrix(H.desc, firstRow = lims[1], lastRow = lims[2])
remains <- attach.big.matrix(r.desc)
for (i in 1:(I - 1)) {
# get G, safely
old.G <- G[,]
file.lock1 <- flock::lock(tmp.lock.names[1])
if (remains[1, i] == 1) G[] <- 0 # init new G with 0s
remains[1, i] <- remains[1, i] - 1L
flock::unlock(file.lock1)
tmp.G <- matrix(0, n, L)
for (j in 1:nb.block) {
ind <- seq2(intervals[j, ])
tmp <- scaling(X.part[ind.train, ind], means[ind], sds[ind])
tmp.H <- cross(tmp, old.G, use.Eigen)
tmp.G <- incrMat(tmp.G, mult(tmp, tmp.H, use.Eigen))
H.part[ind, 1:L + (i - 1) * L] <- tmp.H
}
# wait for others at barrier
while (remains[1, i] > 0) Sys.sleep(TIME)
# increment G, safely
file.lock2 <- flock::lock(tmp.lock.names[2])
incrG(G@address, tmp.G, n, L, 2*m)
remains[2, i] <- remains[2, i] - 1L
flock::unlock(file.lock2)
# wait for others at barrier
while (remains[2, i] > 0) Sys.sleep(TIME)
}
i <- I # no need G_{I + 1}
old.G <- G[,]
for (j in 1:nb.block) {
ind <- seq2(intervals[j, ])
tmp <- scaling(X.part[ind.train, ind], means[ind], sds[ind])
tmp.H <- cross(tmp, old.G, use.Eigen)
H.part[ind, 1:L + (i - 1) * L] <- tmp.H
}
rm(G, H.part)
# compute svd(H) once
file.lock3 <- flock::lock(tmp.lock.names[3])
if (remains[3, 1] == 1) {
H <- attach.big.matrix(H.desc)
U <- attach.big.matrix(U.desc)
if (multi) RevoUtilsMath::setMKLthreads(nthreads.save)
U[] <- svd(H[,], nv = 0)$u
if (multi) nthreads.save <- RevoUtilsMath::setMKLthreads(1)
}
remains[3, 1] <- remains[3, 1] - 1L
flock::unlock(file.lock3)
# wait for others at barrier
while (remains[3, 1] > 0) Sys.sleep(TIME)
# compute transpose(T)
U.part <- sub.big.matrix(U.desc, firstRow = lims[1], lastRow = lims[2])
tmp.T.t <- matrix(0, n, I * L)
for (j in 1:nb.block) {
ind <- seq2(intervals[j, ])
tmp <- scaling(X.part[ind.train, ind], means[ind], sds[ind])
tmp.T.t <- incrMat(tmp.T.t, mult(tmp, U.part[ind, ], use.Eigen))
}
rm(U.part)
# increment T.t, safely
T.t <- attach.big.matrix(T.t.desc)
file.lock4 <- flock::lock(tmp.lock.names[4])
incrG(T.t@address, tmp.T.t, n, I * L, 1)
remains[4, 1] <- remains[4, 1] - 1L
flock::unlock(file.lock4)
# wait for others at barrier
while (remains[4, 1] > 0) Sys.sleep(TIME)
if (multi) RevoUtilsMath::setMKLthreads(nthreads.save)
rbind(means, sds)
}
if (!is.seq) parallel::stopCluster(cl)
# delete temporary lock files
unlink(tmp.lock.names)
T.svd <- svd(T.t[,], nu = K, nv = K)
list(d = T.svd$d[1:K], u = T.svd$u, v = U[,] %*% T.svd$v,
means = scaling[1, ], sds = scaling[2, ])
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.