R/client_func.R

Defines functions testSSCP federateHdbscan federateUMAP federateSNF federateComDim .federateSSCP .solveSSCP .toEuclidean .rebuildMatrixDsc .rebuildMatrix .partitionMatrix matrix2DscFD

Documented in federateComDim federateHdbscan federateSNF .federateSSCP federateUMAP matrix2DscFD .partitionMatrix .rebuildMatrix .rebuildMatrixDsc .solveSSCP testSSCP .toEuclidean

#' @title Bigmemory description of a matrix
#' @description Bigmemory description of a matrix.
#' @param value Encoded value of a matrix.
#' @returns Bigmemory description of the given matrix
#' @importFrom arrow read_ipc_stream
#' @importFrom bigmemory as.big.matrix describe
#' @export
matrix2DscFD <- function(value) {
    tcp <- as.matrix(read_ipc_stream(.decode.arg(value)))
    dscbigmatrix <- describe(as.big.matrix(tcp, backingfile = ""))
    rm(list=c("tcp"))
    return (dscbigmatrix)
}


#' @title Matrix partition
#' @description Partition a matrix into blocks.
#' @param x A matrix.
#' @param seprow A numeric vectors indicating sizes of blocks in rows.
#' @param sepcol A numeric vectors indicating sizes of blocks in columns.
#' @returns List of blocks
#' @importFrom arrow arrow_table
#' @keywords internal
.partitionMatrix <- function(x, seprow, sepcol = seprow) {
    stopifnot(sum(seprow)==nrow(x) && sum(sepcol)==ncol(x))
    csseprow <- cumsum(seprow)
    indrow <- lapply(1:length(seprow), function(i) {
        return (c(ifelse(i==1, 0, csseprow[i-1])+1, csseprow[i]))
    })
    cssepcol <- cumsum(sepcol)
    indcol <- lapply(1:length(sepcol), function(i) {
        return (c(ifelse(i==1, 0, cssepcol[i-1])+1, cssepcol[i]))
    })
    parMat <- lapply(1:length(indrow), function(i) {
        lapply(ifelse(isSymmetric(x), i, 1):length(indcol), function(j) {
            xij <- x[indrow[[i]][1]:indrow[[i]][2],
                     indcol[[j]][1]:indcol[[j]][2], drop=F]
            return (arrow_table(as.data.frame(xij)))
        })
    })
    
    return (parMat)
}


#' @title Matrix reconstruction
#' @description Rebuild a matrix from its partition.
#' @param matblocks List of lists of matrix blocks, obtained from
#' .partitionMatrix.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @returns The reconstructed matrix.
#' @importFrom parallel mclapply
#' @keywords internal
.rebuildMatrix <- function(matblocks, mc.cores = 1) {
    uptcp <- lapply(matblocks, function(bl) do.call(cbind, bl))
    ## combine the blocks into one matrix
    if (length(uptcp) > 1) {
        if (length(unique(sapply(uptcp, ncol)))==1) {
            tcp <- do.call(rbind, uptcp)
        } else {
            ## without the first layer of blocks
            no1tcp <- mclapply(
                2:length(uptcp),
                mc.cores=mc.cores,
                function(i) {
                    cbind(do.call(cbind, lapply(1:(i-1), function(j) {
                        t(matblocks[[j]][[i-j+1]])
                    })), uptcp[[i]])
                })
            ## with the first layer of blocks
            tcp <- rbind(uptcp[[1]], do.call(rbind, no1tcp))
            rm(list=c("no1tcp"))
            rownames(tcp) <- colnames(tcp)
            stopifnot(isSymmetric(tcp))
        }
    } else {
        ## for either asymmetric matrix or column-wise blocks
        tcp <- uptcp[[1]]
    }
    rm(list=c("uptcp"))
    
    return (tcp)
}


#' @title Symmetric matrix reconstruction
#' @description Rebuild a symmetric matrix from its partition bigmemory objects
#' @param dscblocks List of lists of bigmemory objects pointed to matrix blocks
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @returns The complete symmetric matrix
#' @importFrom parallel mclapply
#' @importFrom bigmemory attach.big.matrix
#' @keywords internal
.rebuildMatrixDsc <- function(dscblocks, mc.cores = 1) {
    ## access to matrix blocks 
    matblocks <- mclapply(dscblocks, mc.cores=mc.cores, function(y) {
        lapply(y, function(x) {
            bmx <- (attach.big.matrix(x))[,,drop=F]
            rm(x)
            gc()
            return (bmx)
        })
    })
    tcp <- .rebuildMatrix(matblocks, mc.cores=mc.cores)
    return (tcp)
}


#' @title Euclidean distance
#' @description Transform XX' matrix into Euclidean distance between samples
#' (rows) in X.
#' @param XXt An SSCP matrix XX'.
#' @returns Euclidean distance
#' @keywords internal
.toEuclidean <- function(XXt) {
    if (!isSymmetric(XXt) || any(rownames(XXt) != colnames(XXt)))
        stop('Input XXt (an SSCP matrix) should be symmetric.')
    .printTime("toEuclidean XXt started")
    lowerTri <- cbind(do.call(cbind, lapply(1:(ncol(XXt)-1), function(i) {
        res <- sapply((i+1):ncol(XXt), function(j) {
            ## d(Xi, Xj)^2 = Xi'Xi - 2Xi'Xj + Xj'Xj
            ## = XXt[i,i] - 2XXt[i,j] + XXt[j,j]
            t(c(1,-1)) %*% XXt[c(i,j), c(i,j)] %*% c(1, -1)
        })
        return (c(rep(0, i), res))
    })), rep(0, ncol(XXt)))
    distmat <- sqrt(lowerTri + t(lowerTri))
    rownames(distmat) <- rownames(XXt)
    colnames(distmat) <- colnames(XXt)
    
    return (distmat)
}


#' @title Find X from XX' and X'X
#' @description Find X from XX' and X'X.
#' @param XXt XX'
#' @param XtX X'X
#' @param r A non-null vector of length \code{ncol(X'X)}
#' @param Xr A vector of length \code{nrow(XX'}, equals to the product Xr.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @returns X
#' @importFrom parallel mclapply
#' @importFrom Matrix rankMatrix
#' @keywords internal
.solveSSCP <- function(XXt, XtX, r, Xr, TOL = .Machine$double.eps) {
    if (length(r) != ncol(XtX)) {
        stop("r length shoud match ncol(XtX).")
    }
    if (length(Xr) != nrow(XXt)) {
        stop("Xr length shoud match nrow(XXt).")
    }
    if (max(abs(r)) < TOL) {
        stop("Cannot solve with r = 0.")
    }
    ## scale by some exponential of 10 to deal with high values in input
    absval <- setdiff(c(abs(XXt), abs(XtX)), 0)
    scaling <- 10^ceiling(log10(min(absval))/4+1)
    Xrb <- Xr/(scaling^4)
    rb <- r/(scaling^2)
    B1 <- XXt/(scaling^4)
    B2 <- XtX/(scaling^4)
    N1 <- nrow(B1)
    N2 <- nrow(B2)
    Nmin <- min(N1, N2)
    
    eB1 <- eigen(B1, symmetric=T)
    eB2 <- eigen(B2, symmetric=T)
    vecB1 <- eB1$vectors                    # not unique
    vecB2 <- eB2$vectors                    # not unique
    valB1 <- eB1$values
    valB2 <- eB2$values                     # valB2 == [valB1, 0] or reversely
    vecs <- list("XXt"=vecB1, "XtX"=vecB2)
    vals <- list("XXt"=valB1, "XtX"=valB2)
    
    ## NB: numerically imprecise: poseignum = min(Matrix::rankMatrix(B1),
    ## Matrix::rankMatrix(B2))
    ## this number of positive eigenvalues can upper-limitted by the number of
    ## variables, yet required as argument of the function .solveSSCP
    ## the following formula is empiric, based on the fact that errors of
    ## zero-value are random
    poseignum <- min(
        Nmin+1,
        which(
            vals$XXt[1:Nmin]/(vals$XtX[1:Nmin]+TOL) < 0.99 |
                vals$XtX[1:Nmin]/(vals$XXt[1:Nmin]+TOL) < 0.99)
        ) - 1
    
    vals <- mclapply(vals, mc.cores=length(vals), function(x) {
        if (poseignum < length(x))
            x[(poseignum+1):length(x)] <- 0
        return (x)
    })

    # if (N2 > N1) {
    #     tol <- max(abs(valB2[(N1+1):N2]))*10
    # } else if (N1 > N2) {
    #     tol <- max(abs(valB1[(N2+1):N1]))*10
    # } else {
    #     tol <- TOL
    # }
    # vals <- mclapply(vals, mc.cores=length(vals), function(x) {
    #     x[abs(x) < tol] <- 0
    #     return (x)
    # })
    # eignum <- length(vals[[1]])
    # poseignum <- unique(sapply(vals, function(x) {
    #     max(which(x > 0))
    # }))
    # cat("Number of strictly positive eigenvalues:", poseignum,
    #     "with tolerance of", tol, "\n")
    # stopifnot(length(poseignum)==1)
    
    ## verify deduced info
    invisible(lapply(1:length(vecs), function(j) {
        vec <- vecs[[j]]
        cat("------", names(vecs)[j], "------\n")
        cat("Determinant:",
            det(vec), "\n")
        cat("Precision on v' = 1/v:",
            max(abs(t(vec) - solve(vec, tol=1e-150))), "\n")
        cat("Precision on Norm_col = 1:",
            max(abs(apply(vec, 2, function(x) norm(as.matrix(x), "2")) - 1)),
            "\n")
        cat("Precision on Norm_row = 1:",
            max(abs(apply(vec, 1, function(x) norm(as.matrix(x), "2")) - 1)),
            "\n")
        cat("Precision on Orthogonal:",
            max(sapply(1:(ncol(vec)-1), function(i) {
                max(sum(vec[i,] * vec[i+1,]), sum(vec[, i] * vec[, i+1]))
            })), "\n")
    }))
    
    ## solution S: X * r = vecB1 * E * S * vecB2' * r = Xr
    ## E * S * vecB2' * r = vecB1' * Xr = tmprhs1
    tmprhs1 <- crossprod(vecs[[1]], Xrb)
    if (poseignum < N1)
        cat("Precision on tmprhs1's zero:",
            max(abs(tmprhs1[(poseignum+1):N1, 1])), "\n")
    ## S * vecB2' * rmX2 = S * lhs1 = 1/E * tmprhs1 = rhs1
    E <- diag(sqrt(vals[[1]][1:poseignum]), ncol=poseignum, nrow=poseignum)
    invE <- diag(1/diag(E), ncol=poseignum, nrow=poseignum)
    rhs1 <- crossprod(t(invE), tmprhs1[1:poseignum, , drop=F])
    lhs1 <- crossprod(vecs[[2]], rb)
    signs1 <- rhs1[1:poseignum,]/lhs1[1:poseignum,]
    ## S = [signs1 0]
    S <- cbind(diag(signs1, ncol=poseignum, nrow=poseignum),
               matrix(0, nrow=poseignum, ncol=N2-poseignum))
    ## D = E %*% S
    D <- rbind(crossprod(t(E), S), matrix(0, nrow=N1-poseignum, ncol=N2))
    ## a = vecs[["A*A'"]] %*% D %*% t(vecs[["A'*A"]])
    a1 <- tcrossprod(tcrossprod(vecs[[1]], t(D)), vecs[[2]])
    
    cat("----------------------\n")
    cat("Precision on XXt = a1*a1':", max(abs(B1 - tcrossprod(a1))),
        " / (", quantile(abs(B1)), ")\n")
    cat("Precision on XtX = a1'*a1:", max(abs(B2 - crossprod(a1))),
        " / (", quantile(abs(B2)), ")\n")
    
    return (a1*(scaling^2))
    
    # ## solution S: A' * r = vecB2 * S' * E' * vecB1' * r = Xr
    # ## S' * E' * vecB1' * r = S' * lhs2 = vecB2' * Xr = rhs2
    # rhs2 <- crossprod(vecs[[2]], Xrb)
    # if (poseignum < N2) cat("Precision on rhs2's zero:", max(abs(rhs2[(poseignum+1):N2, 1])), "\n")
    # E <- diag(sqrt(vals[[1]][1:poseignum]))
    # 
    # lhs2 <- crossprod(E, crossprod(vecs[[1]][,1:poseignum], rb))
    # signs2 <- rhs2[1:poseignum,]/lhs2[1:poseignum,]
    # 
    # ## check signs: signs2 should be identical to signs1
    # cat("Precision on signs double-check:", max(abs(signs1-signs2)), "\n")
    # 
    # S <- cbind(diag(signs2), matrix(0, nrow=poseignum, ncol=N2-poseignum)) # S = [signs1 0]
    # D <- rbind(crossprod(t(E), S), matrix(0, nrow=N1-poseignum, ncol=N2))  # D = E %*% S
    # a2 <- tcrossprod(tcrossprod(vecs[[1]], t(D)), vecs[[2]]) # a = vecs[["A*A'"]] %*% D %*% t(vecs[["A'*A"]])
    # 
    # cat("----------------------\n")
    # cat("Precision on XXt = a2*a2':", max(abs(B1 - tcrossprod(a2))), "\n")
    # cat("Precision on XtX = a2'*a2:", max(abs(B2 - crossprod(a2))), "\n")
    
    # return (a2*(scaling^2))
}


#' @title Federated SSCP
#' @description Function for computing the federated SSCP matrix.
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param funcPreProc Definition of a function for preparation of raw data
#' matrices. Two arguments are required: conns (list of DSConnection-classes), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param querytables Vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{funcPreProc} are ignored.
#' @param byColumn A logical value indicating whether the input data is
#' centered by column or row. Default, TRUE, centering by column, with constant
#' variables across samples are removed. If FALSE, centering and scaling by
#' row, with constant samples across variables are removed.
#' @param scale A logical value indicating whether the variables should be
#' scaled to have unit variance. Default, FALSE.
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @param connRes A logical value indicating if the connection to \code{logins}
#' is returned. Default, FALSE, connections are closed.
#' @importFrom parallel mclapply detectCores
#' @importFrom DSI datashield.aggregate datashield.assign datashield.logout
#' datashield.errors datashield.symbols
#' @keywords internal
.federateSSCP <- function(loginFD, logins, funcPreProc, querytables,
                          byColumn = TRUE, scale = FALSE,
                          chunk = 500L, mc.cores = 1,
                          TOL = .Machine$double.eps,
                          width.cutoff = 500L,
                          connRes = FALSE) {
    loginFDdata <- .decode.arg(loginFD)
    logindata   <- .decode.arg(logins)
    opals <- .login(logins=logindata)
    nnode <- length(opals)
    ntab <- length(querytables)
    mc.cores <- min(mc.cores, detectCores())
    mc.nodes <- min(mc.cores, nnode)
    .printTime(".federateSSCP Login-ed")
    
    tryCatch({
        ## take a snapshot of the current session
        safe.objs <- .ls.all()
        ## leave alone .Random.seed for sample()
        safe.objs[['.GlobalEnv']] <- setdiff(safe.objs[['.GlobalEnv']],
                                             '.Random.seed')
        ## lock everything so no objects can be changed
        .lock.unlock(safe.objs, lockBinding)
        
        ## apply funcPreProc for preparation of querytables on opals
        ## TODO: control hacking!
        ## TODO: control identical colnames!
        funcPreProc(conns=opals, symbol=querytables)
        
        ## unlock back everything
        .lock.unlock(safe.objs, unlockBinding)
        ## get rid of any sneaky objects that might have been created in the
        ## filters as side effects
        .cleanup(safe.objs)
    }, error=function(e) {
        .printTime(paste0("DATA MAKING PROCESS: ", e))
        return (paste0("DATA MAKING PROCESS: ", e,
                       ' --- ', datashield.symbols(opals),
                       ' --- ', datashield.errors(),
                       ' --- ', datashield.logout(opals)))
    })
    .printTime(".federateSSCP Input data processed")
    
    ## compute XX' on opals
    tryCatch({
        ## center data
        datashield.assign(opals,
                          "centeredData",
                          as.call(c(as.symbol("center"),
                                    x=as.call(c(as.symbol("list"),
                                                setNames(
                                                    lapply(querytables,
                                                           as.symbol),
                                                    querytables))),
                                    subset=NULL,
                                    byColumn=byColumn,
                                    scale=scale
                          )),
                          async=T)
        
        ## samples
        samplesTabs <- datashield.aggregate(
            opals,
            as.symbol("rowNames(centeredData)"),
            async=T)
        samples <- mclapply(
            names(opals),
            mc.cores=mc.nodes,
            function(opn) samplesTabs[[opn]][[1]])
        
        ## variables
        variables <- datashield.aggregate(
            opals[1],
            as.symbol('colNames(centeredData)'),
            async=T)[[1]]
        
        ## compute XX'
        datashield.assign(opals, "tcrossProdSelf", 
                          as.call(list(as.symbol("tcrossProd"),
                                       x=as.symbol("centeredData"),
                                       y=NULL,
                                       chunk=chunk)),
                          async=T)
            
        ## connection from non-FD servers to FD-assigned server:
        ## user and password for login between servers are required
        loginFDdata$user     <- loginFDdata$userserver
        loginFDdata$password <- loginFDdata$passwordserver
        datashield.assign(opals, 'FD',
                          as.symbol(paste0("crossLogin('",
                                           .encode.arg(loginFDdata), "')")),
                          async=T)
    }, error=function(e) {
        .printTime(paste0("SSCP MAKING PROCESS: ", e))
        return (paste0("SSCP MAKING PROCESS: ", e,
                       ' --- ', datashield.symbols(opals),
                       ' --- ', datashield.errors(),
                       ' --- ', datashield.logout(opals)))
    })
    .printTime(".federateSSCP XX' data computed")
    
    ## send XX' from opals to FD
    tryCatch({
        ## send decomposed XX' to FD memory
        command <- list(as.symbol("pushToDscFD"),
                        as.symbol("FD"),
                        as.symbol("tcrossProdSelf"),
                        async=T)
        .printTime("Command: pushToDscFD(FD, tcrossProdSelf)")
        tcrossProdSelfDSC <- datashield.aggregate(opals,
                                                  as.call(command),
                                                  async=T)
        
        ## rebuild XX'
        tcrossProdSelf <- mclapply(
            names(opals),
            mc.cores=mc.nodes,
            function(opn) {
                mc.tabs <- max(1, min(ntab, floor(mc.cores/mc.nodes)))
                tcps <- mclapply(
                    querytables,
                    mc.cores=mc.tabs,
                    function(tab) {
                        mc.chunks <- max(1, floor(mc.cores/(mc.nodes*mc.tabs)))
                        return (.rebuildMatrixDsc(
                            tcrossProdSelfDSC[[opn]][[tab]],
                            mc.cores=mc.chunks))
                    })
                names(tcps) <- querytables
                return (tcps)
            })
        names(tcrossProdSelf) <- names(opals)
    }, error = function(e) {
        .printTime(paste0("SSCP PUSH PROCESS: ", e))
        datashield.assign(opals, 'crossEnd',
                          as.symbol("crossLogout(FD)"), async=T)
        return (paste0("SSCP PUSH PROCESS: ", e,
                       ' --- ', datashield.symbols(opals),
                       ' --- ', datashield.errors(),
                       ' --- ', datashield.logout(opals)))
    })
    .printTime(".federateSSCP Single XX' communicated to FD")
    
    if (nnode==1) {
        XXt <- tcrossProdSelf[[1]]
        for (tab in querytables) {
            rownames(XXt[[tab]]) <- colnames(XXt[[tab]]) <- samples[[1]]
        }
    } else {
        tryCatch({
            ## compute X'X
            datashield.assign(opals, "crossProdSelf", 
                              as.call(list(as.symbol("crossProd"),
                                           x=as.symbol("centeredData"),
                                           pair=F,
                                           chunk=chunk)),
                              async=T)
        
            ## login from each to other nodes
            opn.logined <- c()
            for (opn in names(opals)) {
                ind.opn <- which(logindata$server == opn)
                logindata.opn <- logindata[-ind.opn, , drop=F]
                logindata.opn$user <- logindata.opn$userserver
                logindata.opn$password <- logindata.opn$passwordserver
                ## from each node opn, log in other nodes (mates)
                opals.loc <- paste0("crossLogin('",
                                    .encode.arg(logindata.opn),
                                    "')")
                tryCatch({
                    datashield.assign(opals[opn], 'mates',
                                      as.symbol(opals.loc), async=T)
                    opn.logined <- c(opn.logined, opn)
                }, error = function(e) {
                    .printTime(paste0("CROSS LOGIN: ", e))
                    datashield.assign(opals[opn.logined], 'crossEnd',
                                      as.symbol("crossLogout(mates)"),
                                      async=T)
                    return (paste0("CROSS LOGIN: ", e,
                                   ' --- ', datashield.symbols(opals[opn]),
                                   ' --- ', datashield.errors()))
                })
            }

            ##- received by each from other nodes ----
            prodDataCross <- mclapply(
                names(opals),
                mc.cores=mc.nodes,
                function(opn) {
                    tryCatch({
                        ## prepare raw data matrices on mates of opn
                        command.opn <- list(as.symbol("crossAssignFunc"),
                                            as.symbol("mates"),
                                            .encode.arg(funcPreProc),
                                            .encode.arg(querytables))
                        .printTime(
                            "Command: crossAssignFunc(mates, funcPreProc, ...")
                        invisible(datashield.aggregate(opals[opn],
                                                       as.call(command.opn),
                                                       async=T))
                        
                        ## center raw data on mates of opn
                        command.opn <- list(
                            as.symbol("crossAssign"),
                            as.symbol("mates"), 
                            symbol="centeredDataMate",
                            value=.encode.arg(deparse1(
                                as.call(c(as.symbol("center"),
                                          x=as.call(
                                              c(as.symbol("list"),
                                                setNames(
                                                    lapply(querytables,
                                                           as.symbol),
                                                    querytables))),
                                          byColumn=byColumn,
                                          scale=scale)),
                                width.cutoff=width.cutoff)),
                            value.call=T,
                            async=T
                        )
                        .printTime(
                            "Command: crossAssign(mates, centeredDataMate)")
                        invisible(datashield.aggregate(opals[opn],
                                                       as.call(command.opn),
                                                       async=T))
                        
                        ## create singularProd from mates of opn
                        command.opn <- paste0(
                            "crossAggregatePrimal(mates, '",
                            .encode.arg('singularProd(centeredDataMate)'),
                            "', async=T)")
                        .printTime("Command: crossAggregatePrimal(mates, 
                        singularProd(centeredDataMate), ...")
                        datashield.assign(opals[opn],
                                          "singularProdMate",
                                          as.symbol(command.opn), async=T)
                        
                        ## send X'X from opn to mates of opn
                        command.opn <- list(as.symbol("pushToDscMate"),
                                            as.symbol("mates"),
                                            as.symbol("crossProdSelf"),
                                            opn,
                                            async=T)
                        .printTime(
                            "Command: pushToDscMate(mates, crossProdSelf...")
                        invisible(datashield.aggregate(opals[opn],
                                                       as.call(command.opn),
                                                       async=T))
                        .printTime(paste0(
                            ".federateSSCP Pairwise X'X communicated: ",
                            opn))
                        
                        ## compute (X_i) * (X_j)' * (X_j) * (X_i)' on mates
                        command.opn <- list(
                            as.symbol("crossAssign"),
                            as.symbol("mates"),
                            symbol=paste0("prodDataCross_",opn),
                            value=.encode.arg(deparse1(
                                as.call(c(as.symbol("tripleProdChunk"),
                                          x=as.symbol("centeredDataMate"),
                                          mate=opn,
                                          chunk=chunk,
                                          mc.cores=mc.cores)),
                                width.cutoff=width.cutoff)),
                            value.call=T,
                            async=T
                        )
                        .printTime("Command: crossAssign(mates, prodDataCross,
                        tripleProdChunk(...")
                        invisible(datashield.aggregate(opals[opn],
                                                       as.call(command.opn),
                                                       async=T))
                        
                        ## login to FD from mates
                        command.opn <- paste0(
                            "crossAssign(mates, symbol='FDmate', value='",
                            .encode.arg(paste0("crossLogin('", loginFD, "')")),
                            "', value.call=T, async=F)")
                        .printTime("Command: crossAssign(mates, FDmate,
                        crossLogin(...")
                        invisible(datashield.aggregate(opals[opn],
                                                       as.symbol(command.opn),
                                                       async=T))
                        
                        ## push X_i * X_j * X_j' * X_i' to FD from mates
                        tryCatch({
                            command.opn <- paste0(
                                "crossAggregateDual(mates, '", 
                                .encode.arg(paste0(
                                    "pushToDscFD(FDmate, prodDataCross_",
                                    opn, ")")), 
                                "', async=T)")
                            .printTime("Command: crossAggregateDual(mates,
                            pushToDscFD(FDmate, prodDataCross_...")
                            prodDataCrossDSC <- datashield.aggregate(
                                opals[opn],
                                as.symbol(command.opn),
                                async=T)
                            
                            prodDataCross.opn <- lapply(
                                prodDataCrossDSC[[opn]],
                                function(dscblocks) {
                                    mc.tabs <- max(
                                        1,
                                        min(ntab, floor(mc.cores/mc.nodes)))
                                    pdcs <- mclapply(
                                        querytables,
                                        mc.cores=mc.tabs,
                                        function(tab) {
                                            mc.chunks <- max(
                                                1,
                                                floor(mc.cores/
                                                          (mc.nodes*mc.tabs)))
                                            return (.rebuildMatrixDsc(
                                                dscblocks[[tab]],
                                                mc.cores=mc.chunks))
                                        })
                                    names(pdcs) <- querytables
                                    return (pdcs)
                                })
                            .printTime(paste0(".federateSSCP XY'YX' tripleProd
                                          communicated to FD: ", opn))
                        }, error = function(e) {
                            .printTime(paste0("CROSS SSCP PUSH PROCESS: ", e))
                            return (paste0("CROSS SSCP PUSH PROCESS: ", e,
                                           ' --- ',
                                           datashield.symbols(opals[opn]),
                                           ' --- ',
                                           datashield.errors()))
                        }, finally = {
                            datashield.aggregate(
                                opals[opn], 
                                as.symbol(paste0(
                                    "crossAssign(mates, symbol='crossFDEnd', ",
                                    "value='",
                                    .encode.arg("crossLogout(FDmate)"),
                                    "', value.call=T, async=F)")),
                                async=T)
                        })
                        return (prodDataCross.opn)
                    }, error = function(e) {
                        .printTime(paste0("CROSS PROCESS: ", e))
                        return (paste0("CROSS PROCESS: ", e,
                                       ' --- ', datashield.symbols(opals[opn]),
                                       ' --- ', datashield.errors()))
                    }, finally = {
                        datashield.assign(opals[opn], 'crossEnd',
                                          as.symbol("crossLogout(mates)"),
                                          async=T)
                    })
                })
            names(prodDataCross) <- names(opals)
            ##-----
            
            tryCatch({
                ## send the single-column matrix from opals to FD
                ## (X_i) * (X_j)' * ((X_j) * (X_j)')[,1]
                datashield.assign(opals, "singularProdCross",
                                  as.symbol('tcrossProd(centeredData,
                                            singularProdMate)'),
                                  async=T)
                command <- list(as.symbol("pushToDscFD"),
                                as.symbol("FD"),
                                as.symbol("singularProdCross"),
                                async=T)
                .printTime("Command: pushToDscFD(FD, singularProdCross)")
                singularProdCrossDSC <- datashield.aggregate(opals,
                                                             as.call(command),
                                                             async=T)
                mc.tabs <- max(1, min(ntab, floor(mc.cores/mc.nodes)))
                mc.chunks <- max(1, floor(mc.cores/(mc.nodes*mc.tabs)))
                singularProdCross <- mclapply(
                    names(opals),
                    mc.cores=mc.nodes,
                    function(opn) {
                        lapply(singularProdCrossDSC[[opn]],
                               function(dscblocks) {
                                   spcs <- mclapply(
                                       querytables,
                                       mc.cores=mc.tabs,
                                       function(tab) {
                                           return (.rebuildMatrixDsc(
                                               dscblocks[[tab]],
                                               mc.cores=mc.chunks))
                                       })
                                   names(spcs) <- querytables
                                   return (spcs)
                               })
                    })
                names(singularProdCross) <- names(opals)
                .printTime(".federateSSCP Ar communicated to FD")
            }, error = function(e) {
                .printTime(paste0("FD PROCESS MULTIPLE: ", e))
                datashield.assign(opals, 'crossEnd',
                                  as.symbol("crossLogout(FD)"), async=T)
                return (paste0("FD PROCESS MULTIPLE: ", e,
                               ' --- ', datashield.symbols(opals),
                               ' --- ', datashield.errors(),
                               ' --- ', datashield.logout(opals)))
            })
        }, error = function(e) {
            .printTime(paste0("XX' PROCESS MULTIPLE: ", e))
            return (paste0("XX' PROCESS MULTIPLE: ", e,
                           ' --- ', datashield.symbols(opals),
                           ' --- ', datashield.errors(),
                           ' --- ', datashield.logout(opals)))
        })
        
        ## deduced from received info by federation: (X_i) * (X_j)'
        mc.tabs <- min(ntab, mc.cores)
        mc.nodes1 <- max(1, min(nnode-1, floor(mc.cores/mc.tabs)))
        crossProductPair <- mclapply(
            querytables,
            mc.cores=mc.tabs,
            function(tab) {
                cptab <- mclapply(
                    1:(nnode-1),
                    mc.cores=mc.nodes1,
                    function(opi) {
                        mc.nodes2 <- max(1, min(nnode-opi,
                                                floor(mc.cores/mc.nodes1)))
                        crossi <- mclapply(
                            (opi+1):(nnode),
                            mc.cores=mc.nodes2,
                            function(opj) {
                                opni <- names(opals)[opi]
                                opnj <- names(opals)[opj]
                                a1 <- .solveSSCP(
                                    XXt=prodDataCross[[opnj]][[opni]][[
                                        tab]],
                                    XtX=prodDataCross[[opni]][[opnj]][[
                                        tab]],
                                    r=tcrossProdSelf[[opnj]][[
                                        tab]][,1,drop=F],
                                    Xr=singularProdCross[[opni]][[opnj]][[
                                        tab]],
                                    TOL=TOL)
                                a2 <- .solveSSCP(
                                    XXt=prodDataCross[[opni]][[opnj]][[
                                        tab]],
                                    XtX=prodDataCross[[opnj]][[opni]][[
                                        tab]],
                                    r=tcrossProdSelf[[opni]][[
                                        tab]][,1,drop=F],
                                    Xr=singularProdCross[[opnj]][[opni]][[
                                        tab]],
                                    TOL=TOL)
                                cat("Precision on a1 = t(a2):",
                                    max(abs(a1 - t(a2))),
                                    " / (", quantile(abs(a1)), ")\n")
                                return (a1)
                            })
                        names(crossi) <- names(opals)[(opi+1):(nnode)]
                        return (crossi)
                    })
                names(cptab) <- names(opals)[1:(nnode-1)]
                return (cptab)
            })
        names(crossProductPair) <- querytables
        .printTime(".federateSSCP XY' computed")
        
        ## SSCP whole matrix
        mc.tabs <- max(1, min(ntab, floor(mc.cores/mc.nodes)))
        XXt <- mclapply(
            querytables,
            mc.cores=mc.tabs,
            function(tab) {
                XXt.tab <- do.call(rbind, mclapply(
                    1:nnode,
                    mc.cores=mc.nodes,
                    function(opi) {
                        upper.opi <- do.call(cbind, as.list(
                            crossProductPair[[tab]][[names(opals)[opi]]]))
                        lower.opi <- do.call(cbind, lapply(
                            setdiff(1:opi, opi),
                            function(opj) {
                                t(crossProductPair[[tab]][[names(
                                    opals)[opj]]][[names(
                                        opals)[opi]]])
                            }))
                        return (cbind(lower.opi,
                                      tcrossProdSelf[[opi]][[tab]],
                                      upper.opi))
                    }))
                rownames(XXt.tab) <- colnames(XXt.tab) <- 
                    unlist(samples, use.names=F)
                
                return (XXt.tab)
            })
        names(XXt) <- querytables
        .printTime(".federateSSCP Whole XX' computed")
    }
    
    if (!connRes) {
        datashield.assign(opals, 'crossEnd',
                          as.symbol("crossLogout(FD)"), async=T)
        datashield.logout(opals)
    }
    return (list(sscp=XXt,
                 samples=samples,
                 variables=variables,
                 conns=opals))
}


#' @title Federated ComDim
#' @description Function for ComDim federated analysis on the virtual cohort
#' combining multiple cohorts: finding common dimensions in multiblock data.
#' @usage federateComDim(loginFD,
#'                       logins,
#'                       func,
#'                       symbol,
#'                       ncomp = 2,
#'                       scale = FALSE,
#'                       scale.block = TRUE,
#'                       threshold = 1e-8,
#'                       chunk = 500L,
#'                       mc.cores = 1,
#'                       width.cutoff = 500L
#'                       )
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param func Encoded definition of a function for preparation of raw data
#' matrices. Two arguments are required: conns (list of Opal connections), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param symbol Encoded vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{func} are ignored.
#' @param ncomp Number of common dimensions
#' @param scale A logical value indicating if variables are scaled to unit
#' variance. Default, FALSE. See \code{MBAnalysis::ComDim}.
#' @param scale.block A logical value indicating if each block of variables is
#' divided by the square root of its inertia. Default, TRUE.
#' See \code{MBAnalysis::ComDim}.
#' @param threshold if the difference of fit<threshold then break the iterative
#' loop (default 1e-8).
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @returns A \code{ComDim} object. See \code{MBAnalysis::ComDim}.
#' @importFrom DSI datashield.logout datashield.errors datashield.symbols
#' datashield.assign
#' @importFrom arrow write_to_raw
#' @importFrom parallel mclapply detectCores
#' @export
federateComDim <- function(loginFD, logins, func, symbol,
                           ncomp = 2,
                           scale = FALSE,
                           scale.block = TRUE,
                           threshold = 1e-8,
                           chunk = 500L, mc.cores = 1,
                           TOL = .Machine$double.eps,
                           width.cutoff = 500L) {
    .printTime("federateComDim started")
    funcPreProc <- .decode.arg(func)
    querytables <- .decode.arg(symbol)
    ntab <- length(querytables)
    mc.cores <- min(mc.cores, detectCores())
    
    if (!is.logical(scale)) stop("scale must be logical.")
    if (scale) stop("Only scale=FALSE is considered.")
    if (!is.logical(scale.block)) stop("scale.block must be logical.")
    
    ## compute SSCP matrix for each centered data table
    XX_query <- .federateSSCP(loginFD=loginFD, logins=logins, 
                              funcPreProc=funcPreProc, querytables=querytables,
                              byColumn=TRUE,
                              chunk=chunk, mc.cores=mc.cores,
                              width.cutoff=width.cutoff, TOL=TOL, 
                              connRes=T)
    XX <- XX_query$sscp
    querysamples <- XX_query$samples
    queryvariables <- XX_query$variables
    opals <- XX_query$conns
    nnode <- length(opals)
    mc.nodes <- min(nnode, mc.cores)
    
    ## function: compute the total variance from a XX' matrix
    inertie <- function(tab) {
        return (sum(diag(tab)))
    }
    
    ## function: normalize a vector to a unit vector
    normv <- function(x) {
        normx <- norm(x, type='2')
        if (normx==0) normx <- 1
        return (x/normx)
    }
    
    ##### ComDim algorithm inherited from MBAnalysis #####
    
    ##- 0. Preliminary tests ----
    if (any(sapply(XX, is.na)))
        stop("No NA values are allowed.")
    if (!all(sapply(XX, isSymmetric, check.attributes=T)))
        stop("XX elements should be symmetric.")
    if (length(unique(sapply(XX, nrow))) != 1)
        stop("XX elements should be the same dimension.")
    ## number of samples
    # nind <- unique(unlist(apply(sapply(XX, dim), 1, unique)))
    # if (length(nind) > 1)
    #     stop("XX elements should be symmetric of the same dimension")
    # samples <- sapply(XX, function(x) intersect(rownames(x), colnames(x)))
    # if (is.list(samples) && max(lengths(samples))==0) {
    #     XX <- lapply(XX, function(x) {
    #         rownames(x) <- colnames(x) <- paste("Ind", 1:nind, sep='.')
    #         return (x)
    #     })
    #     samples <- sapply(XX, function(x) union(rownames(x), colnames(x)))
    # }
    # if (is.list(samples) || is.list(apply(samples, 1, unique)))
    #     stop("XX elements should have the same rownames and colnames")
    # 
    ##-----
    
    ##- 1. Output preparation ----
    nind <- nrow(XX[[1]])
    samples <- unlist(querysamples)
    if (any(samples!=rownames(XX[[1]])))
        stop("Wrong samples order.")
    nvar <- lengths(queryvariables)
    if (ncomp > 2 && min(sapply(XX, rankMatrix)) <= ncomp) {
        print(paste0("Security issue: maximum ",
                     min(sapply(XX, rankMatrix)) - 1,
                     " components could be inquired. ncomp will be set to 2."))
        ncomp <- 2
    }
    if (ncomp < 1) {
        print("ncomp should be at least 1. ncomp will be set to 2.")
        ncomp <- 2
    }
    compnames <- paste("Dim.", 1:ncomp, sep="")
    
    ## optimal value of the criterion
    optimalcrit  <- vector("numeric", ncomp)
    names(optimalcrit) <- compnames
    
    ## Specific weights for each dataset and each dimension
    LAMBDA <- matrix(1, nrow=ntab, ncol=ncomp,
                     dimnames=list(querytables, compnames))
    
    ## Normed global components
    Q <- matrix(0, nrow=nind, ncol=ncomp,
                dimnames=list(samples, compnames))

    ## Block components
    Q.b <- array(0, dim=c(nind, ncomp, ntab),
                 dimnames=list(samples, compnames, querytables))

    ## Percentage of explained inertia of each block and global
    explained.X  <- matrix(0, nrow=ntab+1, ncol=ncomp,
                           dimnames=list(c(querytables, 'Global'), compnames))
    cumexplained <- matrix(0, nrow=ncomp, ncol=2,
                           dimnames=list(compnames, c("%explX", "cum%explX")))
    
    ## Block results
    Block <- NULL
    ## Output
    comdimObj <- NULL
    call  <- NULL
    ##-----
    
    ##- 2. Required parameters and data preparation ----
    ## Pre-processing: block weighting to set each block inertia equal to 1
    if (scale.block) {
        inertia <- sapply(XX, inertie)
        XX <- lapply(querytables, function(tab) {
            XX[[tab]]/inertia[tab]
        })
        names(XX) <- querytables
        inertia0.sqrt <- sqrt(inertia)
    }
    XX0 <- XX
    IT.X <- sapply(XX, inertie) # Inertia of each block of variable
    IT.X <- c(IT.X, sum(IT.X[1:ntab]))
    
    ## Computation of the cross-product matrices among individuals
    ## (or association matrices)
    Trace <- IT.X[1:ntab]
    Itot  <- 0
    ##-----
    
    ##- 3. Computation of Q, LAMBDA and scores for the various dimensions ----
    ## Iterative computation of the various components
    for (comp in 1:ncomp)  {
        previousfit <- 0
        deltafit <- threshold + 1
        
        ## Interatively compute the comp-th common component
        while (deltafit > threshold) {
            ## Weighted sum of XX'
            P <- Reduce("+", lapply(1:ntab, function(k)
                LAMBDA[k, comp] * XX[[k]]
            ))
            ## NB. svd is sensitive to noise (~1e-15) in P
            #reseig    <- eigen(P)
            #Q[, comp] <- reseig$vectors[, 1]
            ressvd    <- svd(P, nu=1, nv=1)
            Q[, comp] <- ressvd$u
            
            LAMBDA[, comp] <- sapply(1:ntab, function(k) {
                crossprod(Q[, comp, drop=F],
                          crossprod(XX[[k]],
                                    Q[, comp, drop=F]))
            })
            fit <- sum(LAMBDA[, comp]^2)
            
            if (previousfit==0)
                deltafit <- threshold + 1
            else
                deltafit <- (fit - previousfit)/previousfit
            previousfit <- fit
        }
        optimalcrit[comp] <- sum(LAMBDA[, comp]^2)

        ## Percentage of explained inertia of each block and global
        explained.X[1:ntab, comp] <- sapply(1:ntab, function(k) {
            inertie(
                tcrossprod(
                    crossprod(tcrossprod(Q[, comp, drop=F]),
                              XX[[k]]),
                    tcrossprod(Q[, comp, drop=F])))
        })
        explained.X[ntab+1, comp] <- sum(explained.X[1:ntab, comp])
        Q.b[, comp, ] <- sapply(1:ntab, function(k) {
            crossprod(XX[[k]],
                      Q[, comp, drop=F])
        })
        
        ## Deflation of XX
        proj <- diag(1, nind) - tcrossprod(Q[, comp, drop=F])
        XX <- mclapply(querytables,
                       mc.cores=min(ntab, mc.cores),
                       function(tab) {
                           #proj %*% xx %*% t(proj)
                           tcrossprod(crossprod(proj, XX[[tab]]), proj)
        })
        names(XX) <- querytables
    }
    
    ## Explained inertia for X
    explained.X       <- sweep(explained.X, 1, IT.X, "/")
    cumexplained[, 1] <- explained.X[ntab+1, 1:ncomp]
    cumexplained[, 2] <- cumsum(cumexplained[, 1])
    
    ## Global components (scores of individuals)
    Scor.g <- tcrossprod(Q, sqrt(diag(colSums(LAMBDA), ncol = ncol(LAMBDA))))
    colnames(Scor.g) <- compnames
    ##-----
    
    ##- 4. Loadings ----
    size <- c(0, lengths(querysamples))
    Qlist <- setNames(lapply(2:length(size), function(i) {
        Qi <- Q[(cumsum(size)[i-1]+1):cumsum(size)[i], , drop=F]
        rownames(Qi) <- querysamples[[i-1]]
        return (Qi)
    }), names(opals))
    chunkList <- mclapply(names(opals), mc.cores=mc.nodes, function(opn) {
        ql <- Qlist[[opn]]
        nblocksrow <- ceiling(nrow(ql)/chunk)
        sepblocksrow <- rep(ceiling(nrow(ql)/nblocksrow), nblocksrow-1)
        sepblocksrow <- c(sepblocksrow, nrow(ql) - sum(sepblocksrow))
        tcpblocks <- .partitionMatrix(ql, seprow=sepblocksrow, sepcol=ncomp)
        return (mclapply(
            tcpblocks,
            mc.cores=max(1, floor(mc.cores/mc.nodes)),
            function(tcpb) {
                return (lapply(tcpb, function(tcp) {
                    return (.encode.arg(write_to_raw(tcp)))
                }))
            }))
    })
    names(chunkList) <- names(opals)

    tryCatch({
        ## send Qlist from FD to opals
        # TOCHECK: security on pushed data
        invisible(mclapply(names(opals), mc.cores=mc.nodes, function(opn) {
            mc.cl1 <- max(1,
                          min(length(chunkList[opn]),
                              floor(mc.cores/mc.nodes)))
            mclapply(1:length(chunkList[opn]), mc.cores=mc.cl1, function(i) {
                mc.cl2 <- max(1,
                              min(length(chunkList[opn]),
                                  floor(mc.cores/(mc.nodes*mc.cl1))))
                mclapply(
                    1:length(chunkList[[i]]),
                    mc.cores=mc.cl2,
                    function(j) {
                        lapply(1:length(chunkList[[i]][[j]]), function(k) {
                            datashield.assign(
                                opals[opn], paste(c('FD', i, j, k),
                                                  collapse="__"),
                                as.call(list(as.symbol("matrix2DscMate"),
                                             chunkList[opn][[i]][[j]][[k]])),
                                async=T)
                        })
                    })
            })
            datashield.assign(
                opals[opn],
                paste("pushed", 'FD', sep="_"),
                as.call(list(as.symbol("rebuildMatrixVar"),
                             symbol='FD',
                             len1=length(chunkList[opn]),
                             len2=lengths(chunkList[opn]),
                             len3=lapply(chunkList[opn], lengths),
                             querytables="common")),
                async=T)
        }))
        ## compute loadings X'*Qlist
        datashield.assign(
            opals,
            "loadings", 
            as.call(list(as.symbol("crossProd"),
                         x=as.symbol("centeredData"),
                         y=as.symbol("pushed_FD"),
                         chunk=chunk)),
            async=T)
        ## send loadings from opals to FD
        command <- list(as.symbol("pushToDscFD"),
                        as.symbol("FD"),
                        as.symbol("loadings"),
                        async=T)
        cat("Command: pushToDscFD(FD, loadings)", "\n")
        loadingsDSC <- datashield.aggregate(opals, as.call(command), async=T)
        loadingsLoc <- mclapply(
            loadingsDSC,
            mc.cores=mc.nodes,
            function(dscblocks) {
                cps <- lapply(names(dscblocks), function(dscn) {
                    cpsi <- .rebuildMatrixDsc(
                        dscblocks[[dscn]],
                        mc.cores=max(1, floor(mc.cores/mc.nodes)))
                    colnames(cpsi) <- paste0("Comp.", 1:ncomp)
                    rownames(cpsi) <- queryvariables[[gsub("__common" ,
                                                           "",
                                                           dscn)]]
                    return (cpsi)
                })
                names(cps) <- gsub("__common" , "", names(dscblocks))
                return (cps)
            })
        .printTime("federatedComDim Loadings communicated to FD")
    }, error = function(e) {
        .printTime(paste0("LOADING COMPUTATION PROCESS: ", e))
        return (paste0("LOADING COMPUTATION PROCESS: ", e,
                       ' --- ', datashield.symbols(opals),
                       ' --- ', datashield.errors()))
    }, finally = {
        datashield.assign(opals, 'crossEnd',
                          as.symbol("crossLogout(FD)"), async=T)
        datashield.logout(opals)
    })

    ## Weights for the block components
    mc.tabs <- min(ntab, mc.cores)
    W.b <- mclapply(querytables, mc.cores=mc.tabs, function(tab) {
        Reduce('+', lapply(loadingsLoc, function(ll) ll[[tab]]))
    })
    names(W.b) <- querytables
    if (scale.block) {
        W.b <- mclapply(querytables, mc.cores=mc.tabs, function(tab) {
            W.b[[tab]]/inertia0.sqrt[tab]
        })
        names(W.b) <- querytables
    }
    
    ## Loadings for the global components: normed
    Load.g <- tcrossprod(do.call(rbind, W.b),
                         sqrt(diag(colSums(LAMBDA), ncol = ncol(LAMBDA))))
    Load.g <- t(t(Load.g)/colSums(Scor.g^2))
    colnames(Load.g) <- compnames
    
    ## Global weights: normed
    W.g <- do.call(rbind, mclapply(1:ntab, mc.cores=mc.tabs, function(k) {
        tcrossprod(W.b[[k]], diag(LAMBDA[k,], nrow=ncomp, ncol=ncomp))
    }))
    mc.comps <- min(ncomp, mc.cores)
    W.g <- do.call(cbind, mclapply(1:ncomp, mc.cores=mc.comps, function(comp) {
        normv(W.g[, comp])
    }))
    colnames(W.g) <- compnames
    
    ## Global projection
    Proj.g <- W.g %*% solve(crossprod(Load.g, W.g), tol=1e-150)
    colnames(Proj.g) <- compnames
    ##-----
    
    ##- 5. Results ----
    ## Global
    comdimObj$components   <- c(ncomp=ncomp)
    comdimObj$optimalcrit  <- optimalcrit
    comdimObj$saliences    <- LAMBDA
    comdimObj$T.g          <- Q
    comdimObj$Scor.g       <- Scor.g
    comdimObj$W.g          <- W.g
    comdimObj$Load.g       <- Load.g
    comdimObj$Proj.g       <- Proj.g
    comdimObj$explained.X  <- round(100 * explained.X[1:ntab, 1:ncomp,
                                                      drop = FALSE],
                              2)
    comdimObj$cumexplained <- round(100 * cumexplained[1:ncomp, ],
                              2)
    
    ## Blocks
    Block$T.b <- Q.b
    Block$W.b <- W.b
    comdimObj$Block <- Block
    call$size.block <- nvar
    call$name.block <- querytables
    call$ncomp <- ncomp
    call$scale <- scale
    call$scale.block <- scale.block
    comdimObj$call <- call
    class(comdimObj) <- c("ComDim", "list", "federateComDim")
    ##-----

    return (comdimObj)
}


#' @title Federated SNF
#' @description Function for SNF federated analysis on the virtual cohort
#' combining multiple cohorts.
#' @usage federateSNF(loginFD,
#'                    logins,
#'                    func,
#'                    symbol,
#'                    metric = 'euclidean',
#'                    K = 20,
#'                    sigma = 0.5,
#'                    t = 20,
#'                    chunk = 500L,
#'                    mc.cores = 1,
#'                    TOL = .Machine$double.eps,
#'                    width.cutoff = 500L)
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param func Encoded definition of a function for preparation of raw data
#' matrices. Two arguments are required: conns (list of DSConnection-classes), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param symbol Encoded vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{func} are ignored.
#' @param metric Either \code{euclidean} or \code{correlation} for distance
#' metric between samples. 
#' For Euclidean distance, the data from each cohort will be centered (not
#' scaled) for each variable.
#' For correlation-based distance, the data from each cohort will be centered
#' scaled for each sample.
#' @param K Number of neighbors in K-nearest neighbors part of the algorithm,
#' see \code{SNFtool::SNF}.
#' @param sigma Variance for local model, see \code{SNFtool::affinityMatrix}.
#' @param t Number of iterations for the diffusion process,
#' see \code{SNFtool::SNF}.
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @returns The overall status matrix derived W.
#' @importFrom SNFtool SNF affinityMatrix
#' @export
federateSNF <- function(loginFD, logins, func, symbol,
                        metric = "euclidean",
                        K = 20, sigma = 0.5, t = 20,
                        chunk = 500L, mc.cores = 1,
                        TOL = .Machine$double.eps,
                        width.cutoff = 500L) {
    .printTime("federateSNF started")
    funcPreProc <- .decode.arg(func)
    querytables <- .decode.arg(symbol)
    ntab <- length(querytables)
    metric <- match.arg(metric, choices=c("euclidean", "correlation"))
    
    ## compute SSCP matrix for each centered data table
    XX_query <- .federateSSCP(loginFD=loginFD, logins=logins, 
                              funcPreProc=funcPreProc,
                              querytables=querytables,
                              byColumn=(metric=="euclidean"),
                              chunk=chunk, mc.cores=mc.cores,
                              width.cutoff=width.cutoff, TOL=TOL, 
                              connRes=F)
    mc.tabs <- min(ntab, mc.cores)
    XX <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        if (metric == "correlation") {
            ## compute (1 - correlation) distance between samples
            1 - XX_query$sscp[[i]]/(length(XX_query$variables[[i]])-1)
        } else if (metric == "euclidean") {
            ## compute Euclidean distance between samples
            .toEuclidean(XX_query$sscp[[i]])
        }
    })
    names(XX) <- querytables
    
    ## take common samples
    commons <- Reduce(intersect, lapply(XX, rownames))
    XX <- mclapply(querytables, mc.cores=mc.tabs, function(tab) {
        XX[[tab]][commons, commons, drop=F]
    })
    names(XX) <- querytables
    ## similarity graphs
    Ws <- mclapply(querytables, mc.cores=mc.tabs, function(tab) {
        affinityMatrix(XX[[tab]], K, sigma)
    })
    names(Ws) <- querytables
    ## fuse similarity graphs
    W <- SNF(Ws, K, t)
    
    class(W) <- c("federateSNF", "list")
    
    return (W)
}


#' @title Federated UMAP
#' @description Function for UMAP federated analysis on the virtual cohort
#' combining multiple cohorts.
#' @usage federateUMAP(loginFD,
#'                     logins,
#'                     func,
#'                     symbol,
#'                     metric = 'euclidean',
#'                     chunk = 500L,
#'                     mc.cores = 1,
#'                     TOL = .Machine$double.eps,
#'                     width.cutoff = 500L,
#'                     ...)
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param func Encoded definition of a function for preparation of raw data
#' matrices. Two arguments are required: conns (list of DSConnection-classes), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param symbol Encoded vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{func} are ignored.
#' @param metric Either \code{euclidean} or \code{correlation} for distance
#' metric between samples. 
#' For Euclidean distance, the data from each cohort will be centered (not
#' scaled) for each variable.
#' For correlation-based distance, the data from each cohort will be centered
#' scaled for each sample.
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @param ... see \code{uwot::umap}
#' @returns A matrix of optimized coordinates.
#' @importFrom uwot umap
#' @importFrom stats as.dist
#' @export
federateUMAP <- function(loginFD, logins, func, symbol,
                         metric = "euclidean",
                         chunk = 500L, mc.cores = 1,
                         TOL = .Machine$double.eps,
                         width.cutoff = 500L, ...) {
    .printTime("federateUMAP started")
    funcPreProc <- .decode.arg(func)
    querytables <- .decode.arg(symbol)
    ntab <- length(querytables)
    metric <- match.arg(metric, choices=c("euclidean", "correlation"))

    ## compute SSCP matrix for each centered data table
    XX_query <- .federateSSCP(loginFD=loginFD, logins=logins, 
                              funcPreProc=funcPreProc,
                              querytables=querytables,
                              byColumn=(metric=="euclidean"),
                              chunk=chunk, mc.cores=mc.cores,
                              width.cutoff=width.cutoff, TOL=TOL, 
                              connRes=F)
    mc.tabs <- min(ntab, mc.cores)
    XX <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        if (metric == "correlation") {
            ## compute (1 - correlation) distance between samples
            return (as.dist(
                1 - XX_query$sscp[[i]]/(length(XX_query$variables[[i]])-1)
                ))
        } else if (metric == "euclidean") {
            ## compute Euclidean distance between samples
            return (as.dist(
                .toEuclidean(XX_query$sscp[[i]])
                ))
        }
    })
    names(XX) <- querytables
    
    ## umap
    umapObjs <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        uwot::umap(XX[[i]], ...)
        #ret_model = FALSE, ret_nn = FALSE, ret_extra = c(),
    })
    names(umapObjs) <- querytables
    
    class(umapObjs) <- c("federateUMAP", "list")
    
    return (umapObjs)
}


#' @title Federated hdbscan
#' @description Function for hdbscan federated analysis on the virtual cohort
#' combining multiple cohorts.
#' @usage federateHdbscan(loginFD,
#'                        logins,
#'                        func,
#'                        symbol,
#'                        metric = 'euclidean',
#'                        minPts = 10,
#'                        chunk = 500L,
#'                        mc.cores = 1,
#'                        TOL = .Machine$double.eps,
#'                        width.cutoff = 500L,
#'                        ...)
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param func Encoded definition of a function for preparation of raw data
#' matrices. Two arguments are required: conns (list of DSConnection-classes), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param symbol Encoded vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{func} are ignored.
#' @param metric Either \code{euclidean} or \code{correlation} for distance
#' metric between samples. 
#' For Euclidean distance, the data from each cohort will be centered (not
#' scaled) for each variable.
#' For correlation-based distance, the data from each cohort will be centered
#' scaled for each sample.
#' @param minPts Minimum size of clusters, see \code{dbscan::hdbscan}.
#' Default, 10.
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @param ... see \code{dbscan::hdbscan}
#' @returns An object of class \code{hdbscan}.
#' @importFrom dbscan hdbscan
#' @export
federateHdbscan <- function(loginFD, logins, func, symbol,
                            metric = "euclidean", minPts = 10, 
                            chunk = 500L, mc.cores = 1,
                            TOL = .Machine$double.eps,
                            width.cutoff = 500L, ...) {
    .printTime("federateHdbscan started")
    funcPreProc <- .decode.arg(func)
    querytables <- .decode.arg(symbol)
    ntab <- length(querytables)
    metric <- match.arg(metric, choices=c("euclidean", "correlation"))
    
    ## compute SSCP matrix for each centered data table
    XX_query <- .federateSSCP(loginFD=loginFD, logins=logins, 
                              funcPreProc=funcPreProc,
                              querytables=querytables,
                              byColumn=(metric=="euclidean"),
                              chunk=chunk, mc.cores=mc.cores,
                              width.cutoff=width.cutoff, TOL=TOL, 
                              connRes=F)
    mc.tabs <- min(ntab, mc.cores)
    XX <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        if (metric == "correlation") {
            ## compute (1 - correlation) distance between samples
            return (as.dist(
                1 - XX_query$sscp[[i]]/(length(XX_query$variables[[i]])-1)
            ))
        } else if (metric == "euclidean") {
            ## compute Euclidean distance between samples
            return (as.dist(
                .toEuclidean(XX_query$sscp[[i]])
            ))
        }
    })
    names(XX) <- querytables
    
    ## hdbscan
    hdbscanObjs <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        dbscan::hdbscan(XX[[i]], minPts = minPts, ...)
    })
    names(hdbscanObjs) <- querytables
    
    class(hdbscanObjs) <- c("federateHdbscan", "list")
    
    return (hdbscanObjs)
}


#' @title Test federateSSCP
#' @description Deprecated
#' @param loginFD Login information of the FD server
#' @param logins Login information of data repositories
#' @param func Encoded definition of a function for preparation of raw data
#' matrices. 
#' Two arguments are required: conns (list of DSConnection-classes), 
#' symbol (name of the R symbol) (see datashield.assign).
#' @param symbol Encoded vector of names of the R symbols to assign in the
#' DataSHIELD R session on each server in \code{logins}.
#' The assigned R variables will be used as the input raw data.
#' Other assigned R variables in \code{func} are ignored.
#' @param metric Either \code{euclidean} or \code{correlation} for distance
#' metric between samples. 
#' For Euclidean distance, the data from each cohort will be centered (not
#' scaled) for each variable.
#' For correlation-based distance, the data from each cohort will be centered
#' scaled for each sample.
#' @param chunk Size of chunks into what the resulting matrix is partitioned.
#' Default, 500L.
#' @param mc.cores Number of cores for parallel computing. Default, 1.
#' @param TOL Tolerance of 0. Default, \code{.Machine$double.eps}.
#' @param width.cutoff Default, 500L. See \code{deparse1}.
#' @keywords internal
testSSCP <- function(loginFD, logins, func, symbol,
                     metric = "euclidean",
                     chunk = 500L, mc.cores = 1,
                     TOL = .Machine$double.eps,
                     width.cutoff = 500L, ...) {
    .printTime("testSSCP started")
    funcPreProc <- .decode.arg(func)
    querytables <- .decode.arg(symbol)
    ntab <- length(querytables)
    metric <- match.arg(metric, choices=c("euclidean", "correlation"))
    
    ## compute SSCP matrix for each centered data table
    XX_query <- .federateSSCP(loginFD=loginFD, logins=logins, 
                              funcPreProc=funcPreProc,
                              querytables=querytables,
                              byColumn=(metric=="euclidean"),
                              chunk=chunk, mc.cores=mc.cores,
                              width.cutoff=width.cutoff, TOL=TOL, 
                              connRes=F)
    mc.tabs <- min(ntab, mc.cores)
    XX <- mclapply(1:ntab, mc.cores=mc.tabs, function(i) {
        if (metric == "correlation") {
            ## compute (1 - correlation) distance between samples
            return (as.dist(
                1 - XX_query$sscp[[i]]/(length(XX_query$variables[[i]])-1)
            ))
        } else if (metric == "euclidean") {
            ## compute Euclidean distance between samples
            return (as.dist(
                .toEuclidean(XX_query$sscp[[i]])
            ))
        }
    })
    names(XX) <- querytables
    
    return (XX)
}
vanduttran/dsMOdual documentation built on Jan. 19, 2025, 6:36 a.m.