R/bdm_mtsne.R

Defines functions iter.info info.head thread.freeMem thread.rmYbm thread.gainReset thread.itrRun thread.repFget thread.affMtx thread.init bd_.rtsne bd_.mtsne bdm.mtsne

Documented in bdm.mtsne

#' Multi-core t-SNE (mtSNE)
#'
#' Starts the multi-core t-SNE (mtSNE) algorithm.
#'
#' @param data A \var{data.frame} or \var{matrix} with raw input-data. The dataset must not have duplicated rows.
#'
#' @param is.distance Default value is \var{is.distance = FALSE}. TRUE indicates that raw data is a distance matrix.
#'
#' @param is.sparse Default value is \var{is.sparse = FALSE}. TRUE indicates that the raw data is a sparse matrix.
#'
#' @param ppx The value of perplexity to compute similarities.
#'
#' @param iters Number of iters/epoch. Default value is \var{iters = 250}.
#'
#' @param theta Accuracy/speed trade-off factor, a value between 0.33 and 0.8. Default value is \var{theta = 0.5}. If \var{theta < 0.33} the algorithm uses the exact computation of the gradient. The closer it is this value to 1 the faster the computation and the coarser the approximation of the gradient.
#'
#' @param mpi.cl An MPI (inter-node parallelization) cluster as generated by \var{bdm.mpi.start()}. By default \var{mpi.cl = NULL}, i.e. a 'SOCK' (intra-node parallelization) cluster is generated.
#'
#' @param threads Number of parallel threads (according to data size and hardware resources, i.e. number of cores and available memory). Default value is \var{threads = 4}.
#'
#' @param infoRate Number of epochs to show output information. Default value is \var{infoRate = 25}.
#'
#' @return A \var{bdm} data mapping instance.
#'
#' @examples
#'
#' # --- load example dataset
#' bdm.example()
#' \dontrun{
#' # --- perform mtSNE
#' m <- bdm.mtsne(ex$data, ex$map, ppx = 250, iters = 250, threads = 4)
#' # --- plot the Cost function
#' bdm.cost(m)
#' # --- plot mtSNE output (use bdm.ptsne.plot() function)
#' bdm.ptsne.plot(m)
#' }

bdm.mtsne <- function(data, is.distance = F, is.sparse = F, ppx = 100, theta = .5, iters = 250, mpi.cl = NULL, threads = 4, infoRate = 25)
{
	m <- bd_.mtsne(data, is.distance = is.distance, is.sparse = is.sparse, ppx = ppx, theta = theta, iters = iters, mpi.cl = mpi.cl, threads = threads, infoRate = infoRate)
	return(m)
}

# --------------------------------------------------------------------------------
# +++ mtSNE and refinement t-SNE with naive parallelization
# --------------------------------------------------------------------------------

bd_.mtsne <- function(data, is.distance = F, is.sparse = F, ppx = 100, theta = .5, iters = 250, mpi.cl = NULL, threads = 4, infoRate = 25)
{
	# ++++++++++++ !!! see settings in R/bdm_glbl.R for: normalize, xppx ++++++++++++++++++++

	m <- bdm.init(data, is.distance = is.distance, is.sparse = is.sparse, ppx = ppx, mpi.cl = mpi.cl, threads = threads)
	m$ppx <- m$ppx[[1]]

	m.list <- bd_.rtsne(data, m, ppx = ppx, theta = theta, iters = iters, mpi.cl = mpi.cl, threads = threads, infoRate = infoRate)
	return(m.list[[2]])
}

bd_.rtsne <- function(data, m, ppx, theta = .5, iters = 100, mpi.cl = NULL, threads = 4, infoRate = 25, gain = gain, momentum = momentum, qDecay = qDecay, logSize = T, reset = F, movie = F)
{

	# ++++++++++++ !!! see settings in R/bdm_glbl.R for: gain, momentum, qDecay ++++++++++++++++++++

	m.list <- list(m)
	# +++ start cluster
	cl <- cluster.start(threads, mpi.cl)
	if (is.null(cl)) return(bdm)
	# +++ export input data (if using mpi.cl it might have been already exported)
	cat('+++ exporting data \n')
	if (!is.null(data)) {
		Xdata.exp(cl, data, m$is.distance, m$is.sparse, m$normalize)
	}
	# +++ compute/export betas
	if (m$ppx$ppx != ppx || m$ppx$xppx != xppx) {
		m$ppx <- beta.get(cl, ppx, xppx)
	}
	cat('+++ exporting betas \n')
	Xbeta.exp(cl, m$ppx$B)
	# +++ define thread chunks
	# Att!! nnSize refers here to the whole dataset, thus max is (nX -1)
	nX <- m$nX
	nnSize <- min((m$ppx$ppx *m$ppx$xppx), (nX -1))
	# +++ initial embedding
	if (is.null(m$ptsne$Y)){
		Y <- ptsne.init(m$nX, 1)
	} else {
		Y <- m$ptsne$Y[, 1:2]
	}
	mY <- 2
	clusterExport(cl, c('nX', 'mY', 'nnSize'), envir = environment())
	# +++ thread initialization
	clusterCall(cl, thread.init)
	# +++ compute thread affinity matrices
	m$t$affMtx <- system.time({
		cat('+++ computing thread affMtx \n')
		sumP <- sum(unlist(clusterCall(cl, thread.affMtx)))
		cat('... sumP', sumP, '/', nX, formatC(sumP /nX, format = 'e', digits = 4, width = 12), '\n')
		clusterExport(cl, c('sumP'), envir = environment())
	})
	print(m$t$affMtx)
	# +++ cost/size function
	itCost <- rep(0, iters)
	itSize <- rep(0, iters)
	# +++ BHt-SNE parameter
	clusterExport(cl, c('theta'), envir = environment())
	# learning rate factor
	logSize <- ifelse(logSize, log(nX *nnSize), 1.0)
	clusterExport(cl, c('gain'), envir = environment())
	# cost pseudo-norm factor
	lognX <- log(nX * (nX -1))
	# +++ gradient error control
	gradErrors <- 0
	gradCumError <- 0
	# +++ set movie on/off
	if (movie == T) m$progress <- list()
	# +++ start m-tsne
	m$ptsne <- list(threads = 1, layers = 1, mtsne.threads = threads, iters = iters, theta = theta, gain = gain, momentum = momentum)
	t0 <- Sys.time()
	info.head()
	m$t$mtsne <- system.time({
		# +++ iterate
		for (it in seq(iters)) {
			t1 <- Sys.time()
			# +++ export current embedding
			# (no need to transpose as each thread will use a local copy!!!)
			eSize <- sqrt(sum(apply(apply(Y, 2, range), 2, diff)**2))
			lRate <- 2.0 *(eSize +1.0 /eSize) *logSize
			if (qDecay) {
				alpha <- momentum *(1 -it /iters)**2
			}
			else {
				alpha <- momentum *(1 -it /iters)
			}
			clusterExport(cl, c('lRate', 'alpha'), envir = environment())
			itSize[it] <- eSize
			Ydata.exp(cl, Y)
			# +++ repulsive forces
			sumQ <- sum(unlist(clusterCall(cl, thread.repFget)))
			clusterExport(cl, c('sumQ'), envir = environment())
			# +++ update embedding
			zRet.list <- clusterCall(cl, thread.itrRun)
			# +++ check gradient
			eCost <- sum(sapply(zRet.list, function(zRet) zRet$zCost))
			itCost[it] <- (log(sumQ) -eCost /sumP) /lognX
			if (it > 1 && itCost[it] > itCost[(it -1)]) {
				if (reset) nulL <- clusterCall(cl, thread.gainReset)
				gradErrors <- gradErrors +1
				gradError <- itCost[it] - itCost[(it -1)]
				cat('... Gradient Error !!! ', gradErrors, formatC(gradError, format = 'e', digits = 4), '\r')
				if (gradError > 2e-4) break
			}
			# +++ update embedding
			Y <- matrix(unlist(lapply(zRet.list, function(zRet) zRet$newY)), ncol = 2, byrow = T)
			# cat('!!! chk3', length(which(is.na(Y))), '\n')
			nulL <- clusterCall(cl, thread.rmYbm)
			if (movie == T) m$progress[[it]] <- list(epoch = it, Y = Y)
			if (it %% infoRate == 0) {
				iter.info(it, iters, t0, t1, itCost[it], itSize[it], lRate)
			}
		}
	})
	if (it %%infoRate != 0) {
		cat('... \n')
		iter.info(it, iters, t0, t1, itCost[it], itSize[it], lRate)
	}
	nulL <- clusterCall(cl, thread.freeMem)
	print(m$t$mtsne)
	# +++ cluster stop
	cluster.stop(cl)
	m$ptsne$Y <- Y
	m$ptsne$cost <- matrix(itCost, ncol = 1)
	m$ptsne$size <- itSize
	m.list[[(length(m.list) +1)]] <- m
	return(m.list)
}

thread.init <- function()
{
	if (thread.rank != 0) {
		breaks <- c(sapply(1:threads, function(rank) (rank -1) *(nX +1.0) %/%threads), nX)
		# C++ indexes
		z.ini <<- breaks[thread.rank];
		z.end <<- breaks[thread.rank +1];
		# local matrices
		Pbm <<- as.big.matrix(matrix(rep(0, (z.end -z.ini) *nnSize), nrow = nnSize), type = 'double')
		Wbm <<- as.big.matrix(matrix(rep(0, (z.end -z.ini) *nnSize), nrow = nnSize), type = 'integer')
		Rbm <<- as.big.matrix(matrix(rep(0, (z.end -z.ini) *mY), nrow = mY), type = 'double')
		Ubm <<- as.big.matrix(matrix(rep(0, (z.end -z.ini) *mY), nrow = mY), type = 'double')
		Gbm <<- as.big.matrix(matrix(rep(1, (z.end -z.ini) *mY), nrow = mY), type = 'double')
	}
}

thread.affMtx <- function()
{
	if (thread.rank != 0) {
		thread_affMtx(z.ini, z.end, Xbm@address, is.distance, is.sparse, Bbm@address, Pbm@address, Wbm@address)
	}
}

thread.repFget <- function()
{
	if (thread.rank != 0) {
		thread_repF(z.ini, z.end, Ybm@address, theta, Rbm@address)
	}
}

thread.itrRun <- function()
{
	if (thread.rank != 0) {
		thread_mIter(z.ini, z.end, Pbm@address, Wbm@address, Ybm@address, sumP, sumQ, Rbm@address, Ubm@address, Gbm@address, lRate, alpha, gain)
	}
	else {
		list(zCost = 0, newY = NULL)
	}
}

thread.gainReset <- function()
{
	if (thread.rank != 0) Gbm[, ] <<- rep(1, (z.end -z.ini))
}

thread.rmYbm <- function()
{
	if (thread.rank != 0) rm(Ybm)
}

thread.freeMem <- function()
{
	if (thread.rank != 0) rm(list = c('Pbm', 'Wbm', 'Rbm', 'Ubm', 'Gbm'))
}

info.head <- function()
{
	cat('--- iter       ')
	cat('   <Qlty>   ')
	cat('   <Size>   ')
	cat(' <itSecs> ')
	cat(' time2End ')
	cat(' eTime ')
	cat('\n')
}

iter.info <- function(it, iters, t0, t1, eCost, eSize, lRate)
{
	cat('+++ ', formatC(it, width=4, flag='0'), '/', formatC(iters, width=4, flag='0'), sep='')
	cat(formatC(eCost, format='e', digits=4, width=12))
	cat(formatC(eSize, format='e', digits=4, width=12))
	itSecs <- as.numeric(difftime(Sys.time(), t1, units='secs'))
	cat('  ', formatC(itSecs, format='f', digits=4, width=8), sep='')
	runTime <- as.numeric(difftime(Sys.time(), t0, units='secs'))
	tm2End <- runTime /it * (iters -it)
	cat('   ', time.format(tm2End), sep='')
	cat('  ', format(Sys.time()+tm2End, '%H:%M'), sep='')
	cat('  ', round(lRate, 2))
	cat('\n')
}
jgarriga65/bigMap documentation built on June 10, 2024, 7:05 a.m.