Introduction

We demonstrate the possibility of fitting statistical models stratified by sites in a manner that brings computation to the data that may be distributed across sites or more generally, partitioned in some manner. (For simplicity, we will call these partitions, sites.) The infrastructure consists of a single master process that issues queries to worker processes running at each of the sites. A query is merely a function call, more specifically a request to each site to evaluate a pre-defined function $f(\beta)$ on the data at that site, for a given value of parameters $\beta$. The master process uses these queries to aggregate and execute an optimization algorithm resulting in a model fit, the results of which should be indistinguishable from those that might be obtained if all the data had been in a single place. Of course, this assumes a lossless serialization format, like Google Protocol Buffers for example, but we make do with JSON for now. Also, in comparisons below, we don't use exactly the same iterations as the survival package and so minor differences will be seen.

The advantages are many, chief among them the fact that no raw data needs to be shared between sites. The modeling entity, however, can make an unlimited number of queries of the sites, where each query is a request to compute a model-specific function for a specified value of parameters. This may pose a security concern that we ignore for now. However, it leads to some further interesting questions regarding what may be learned such computation.

Setup

It must be noted that for users to be able to knit this document, or to run these examples in an R session, an opencpu server must be running with appropriate settings. On MacOS and Unix, this is done by designating an empty directory as workspace and adding the following lines to the ${HOME}/.Rprofile.

library(distcomp)
distcompSetup(workspace = "full_path_to_workspace_directory",
              ssl_verifyhost = 0L, ssl_verifypeer = 0L)

On windows, the same should be done in the RHOME\etc\Rprofile.site file.

Note: On Yosemite (MacOS 10.10.4 and below), we have found that references to localhost sometimes fail in the opencpu URL; rather the explicit IP address 127.0.0.1 is needed.

In what follows, we assume that such initialization profile has been done. Furthermore, we assume that the opencpu server has been started in the same session as the one where this markdown document is being knitted via library(opencpu). This is merely a convenience that allows us to refer to the opencpu server URL programmatically via opencpu$url(); otherwise, alternative means would have to be found to refer to the opencpu URL in a permanent manner. For example, if opencpu is started in another terminal thus:

library(opencpu)
ocpu_server_start()

one could note down the URL that it displays after starting, and use it in the following function definition, as we have done.

opencpu <- list(url = function() "http://localhost:5656/ocpu")

to ensure that the expression opencpu$url() returns the appropriate URL.

To summarize: assuming that an opencpu server has been started via library(opencpu) with a proper R profile and the expression opencpu$url() has been set up appropriately, one can proceed to knit this document in that R session.

A simple example

We take a simple example from the survival package, the ovarian dataset.

library(knitr)
library(survival)
data(ovarian)
str(ovarian)
kable(ovarian)

A Cox fit on the aggregated data

A simple Cox model fit estimates the effect of age on survival, stratified by drug.

cp <- coxph(Surv(futime, fustat) ~ age + strata(rx), data = ovarian, ties = "breslow")
print(cp)

The above shows the intial and final log likelihood values at 0 and the estimated coefficient respectively and the actual estimated coefficient in the last line.

For our setting, we can pretend that this data set is actually from two sites, one containing the control or placebo group (rx = 1)and the other containing the drug group (rx = 2).

The model definition

We first need to define the computation. The available computations can be listed:

print(availableComputations())

So, we can define the ovarian data computation as follows.

ovarianDef <- data.frame(compType = names(availableComputations())[2],
                         formula = "Surv(futime, fustat) ~ age",
                         he = FALSE,
                         id = "Ovarian", stringsAsFactors=FALSE)

The data for each site

We split the ovarian data into two sites as indicated earlier.

siteData <- with(ovarian, split(x = ovarian, f = rx))

Setting up the sites

We can now set up each site with its own data. A site is merely a list of two items, a (unique) name and an opencpu URL.

nSites <- length(siteData)
sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i),
                                                  url = opencpu$url()))

By default, on each site, data for a computation is stored under the name data.rds and the definition itself is stored under the name defn.rds. If the sites are physically separate, then everything proceeds smoothly. However, here, in our case, we are using the same opencpu server for simulating both sites. We therefore have to save the files under different names, just for this experiment, say site1.rds and site2.rds for this example. This is all taken care of by the code which checks to see if the opencpu URLs refer to local hosts or not. (In fact, as this code is executing, one can examine the contents of the workspace to see what is happening) We now Map the upload function to each site so that the computation becomes well-defined.

ok <- Map(uploadNewComputation, sites,
          lapply(seq.int(nSites), function(i) ovarianDef),
          siteData)

stopifnot(all(as.logical(ok)))

Reproducing original aggregated analysis in a distributed fashion

We are now ready to reproduce the original aggregated analysis. We first create a master object using the same definition.

master <- CoxMaster$new(defn = ovarianDef)

We then add the worker sites specifying a name and a URL for each. names.

for (i in seq.int(nSites)) {
    master$addSite(name = sites[[i]]$name, url = sites[[i]]$url)
}

And we now maximize the partial likelihood, by calling the run method of the master.

result <- master$run()

We then print the summary.

master$summary()

As we can see, the results we get from the distributed analysis are the same as we got for the original aggregated analysis. We print them separately here for comparison.

A larger example

We turn to a larger the example from Therneau and Grambsch using the pbc data where the stratifying variable is ascites.

The aggregated fit

data(pbc)
pbcCox <- coxph(Surv(time, status==2) ~ age + edema + log(bili) +
                  log(protime) + log(albumin) + strata(ascites), data = pbc,
                ties = "breslow")
print(pbcCox)

The distributed fit

We split the data using ascites and proceed the usual way as shown above

pbcDef <- data.frame(compType = names(availableComputations())[2],
                     he = FALSE,
                     formula = paste("Surv(time, status==2) ~ age + edema +",
                       "log(bili) + log(protime) + log(albumin)"),
                     id = "pbc", stringsAsFactors = FALSE)
siteData <- with(pbc, split(x = pbc, f = ascites))
nSites <- length(siteData)
sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i),
                                                  url = opencpu$url()))
ok <- Map(uploadNewComputation, sites,
          lapply(seq.int(nSites), function(i) pbcDef),
          siteData)
stopifnot(all(as.logical(ok)))
master <- CoxMaster$new(defn = pbcDef)
for (site in sites) {
    master$addSite(site$name, site$url)
}
result <- master$run()

We then print the summary.

kable(master$summary())

The results should be comparable to the aggregated fit above.

Bone Marrow Transplant Example

This uses the bmt data from Klein and Moschberger. Some variable renaming, first.

if (!require("KMsurv")) {
  stop("Please install the KMsurv package before proceeding")
}
##
## BMT data
##

library(KMsurv)
data(bmt)
bmt$tnodis <- bmt$t2 ## time to disease relapse/death
bmt$inodis <- bmt$d3 ## disease relapse/death indicator
bmt$tplate <- bmt$tp ## time to platelet recovery
bmt$iplate <- bmt$dp ## platelet recovery
bmt$agep <- bmt$z1 ## age of patient in years
bmt$aged <- bmt$z2 ## age of donor in years
bmt$fab <- bmt$z8 ## fab grade 4 or 5 + AML
bmt$imtx <- bmt$z10 ## MTX used
bmt <- bmt[order(bmt$tnodis), ] ## order by time to disease relapse/death
bmt <- cbind(1:nrow(bmt)[1], bmt)
names(bmt)[1] <- "id"

##
#####
##
bmt$agep.c <- bmt$agep - 28
bmt$aged.c <- bmt$aged - 28

bmt$imtx <- factor(bmt$imtx)

The aggregated fit

bmt.cph <- coxph(formula = Surv(tnodis, inodis) ~ fab + agep.c * aged.c +
                 factor(group) + strata(imtx), data = bmt, ties = "breslow")

print(bmt.cph)

The distributed fit

We'll use imtx for splitting data into sites.

bmtDef <- data.frame(compType = names(availableComputations())[2],
                     he = FALSE,
                     formula = paste("Surv(tnodis, inodis) ~ fab +",
                       "agep.c * aged.c + factor(group)"),
                     id = "bmt", stringsAsFactors = FALSE)
siteData <- with(bmt, split(x = bmt, f = imtx))
nSites <- length(siteData)
sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i),
                                                  url = opencpu$url()))
ok <- Map(uploadNewComputation, sites,
          lapply(seq.int(nSites), function(i) bmtDef),
          siteData)

stopifnot(all(as.logical(ok)))
master <- CoxMaster$new(defn = bmtDef)
for (site in sites) {
    master$addSite(site$name, site$url)
}
result <- master$run()

We then print the summary.

kable(master$summary())

Byar and Greene Prostate Cancer Data Example

This example is the largest of them all and also has four strata rather than 2.

prostate <- readRDS("prostate.RDS")

The aggregated fit

pcph <- coxph(Surv(dtime, status) ~ stage + strata(rx) + age + wt + pf + hx +
                  sbp + dbp + ekg + hg + sz + sg + ap + bm, data = prostate)
print(pcph)

The distributed fit

The distributed fit for this particular example doesn't work in the current implementation. This is because the $X$ matrix for each site is singular. The math holds, obviously, but the current implementation is based on re-using as much of the survival package as possible. We have to work harder to implement the distributed computation in the situation where $X$ is singular at at least one site. This affects the computation of the variance (or equivalently, the information matrix). Some work needs to be done to work around this and figure out how best to reuse what's already in the survival package.

However, in order to demonstrate that the distributed fit really works, we show below an alternative implementation that yields the same result as the aggregated one.

An object representing the sites

Here's a reference object for each site. It has several fields: a data field containing the data, a formula field (as used in the well-known survival R package) describing the model being fit. These two are the only ones needed for initializing a site. Other fields that are generated based on these two fields are modelDataFrame containing the actual model data used for fitting the model and a modelMatrix.

site <- setRefClass("siteObject",
                    fields = list(
                        data = "data.frame",
                        formula = "formula",
                        modelDataFrame = "data.frame",
                        coxControl = "list",
                        modelMatrix = "matrix"),
                    methods = list(
                        initialize = function(formula, data) {
                            'Initialize the object with a formula and dataset'
                            formula <<- formula
                            data <<- data
                            temp <- coxph.control()
                            temp$iter.max <- 0
                            coxControl <<- temp
                            stopifnot(kosher())
                        },
                        kosher = function() {
                            'Check that the class data passes sanity checks'
                            modelDataFrame <<- model.frame(formula, data = data)
                            lhs <- modelDataFrame[, 1]
                            ordering <- order(lhs[, 1])
                            modelDataFrame <<- modelDataFrame[ordering, ]
                            data <<- data[ordering, ]
                            modelMatrix <<- model.matrix(formula, data = modelDataFrame)
                            TRUE
                        },
                        dimP = function() {
                            'Return the number of covariates'
                            ncol(modelMatrix) - 1
                        })
                    )
site$accessors(c("data", "formula", "modelDataFrame", "modelMatrix"))

The initialize method above mostly sets the fields, generates values for other fields, and does a mild sanity check. It will not proceed further if the function kosher returns false. For now the kosher function merely orders the data frame by follow-up time, but in a production system a number of other checks might be performed, such as ensuring all named variables are available at the site.

The method dimP merely returns the number of columns of the model matrix.

The (partial) log likelihood function for each site

For our example, the (partial) log likelihood (named localLogLik) is simple and can be computed directly. Assuming failure times $t_i$ and event indicators $\delta_i$, it is precisely: $$ l(\beta) = \sum_{i=1}^n\delta_i\biggl[z_i\beta -\log\bigl(\sum_{j\in R(t_i)} \exp(z_j\beta)\bigr)\biggr] $$ where $\beta$ is the vector of parameters, $z_i$ is row $i$ of the model matrix (covariates for subject $i$) and $R(t_i)$ is the risk set at time $t_i$.

The first derivative with respect to $\beta$ is: $$ l'(\beta) = Z^T\delta - \sum_{i=1}^n\delta_i\frac{\sum_{j\in R(t_i)} \exp(z_j\beta)z_j^T}{\sum_{j\in R(t_i)} \exp(z_j\beta)}. $$

localLogLik <- function(beta) {
    beta <- c(0, beta) ## model matrix has intercept in model
    z <- modelMatrix
    delta <- modelDataFrame[, 1][, 2] ## event indicators
    zBeta <- z %*% beta
    sum.exp.zBeta <- rev(cumsum(rev(exp(zBeta))))
    ld <- delta %*% (z  - apply(diag(as.numeric(zBeta)) %*% z, 2, function(x) rev(cumsum(rev(x)))) / sum.exp.zBeta)
    result <- sum(delta * (zBeta - log(sum.exp.zBeta))) # assuming Breslow
    attr(result, "gradient") <- ld
    result
}
site$methods(logLik = localLogLik)

The alternative distributed fit.

We are now ready to do the alternative distributed fit. We split the prostate data into two sites as indicated earlier.

siteData <- with(prostate, split(x = prostate, f = rx))
sites <- lapply(siteData,
                function(x) {
                    site$new(data = x,
                             formula = Surv(dtime, status) ~ stage + age + wt + pf + hx + sbp + dbp + ekg + hg + sz + sg + ap + bm)
                })

Ok, now we can reproduce the original aggregated analysis by writing a full likelihood routine.

logLik <- function(beta, sites) {
    sum(sapply(sites, function(x) x$logLik(beta)))
}

All that remains is to maximize this log likelihood.

mleResults <- nlm(f=function(x) -logLik(x, sites),
                  p = rep(0, sites[[1]]$dimP()),
                  gradtol = 1e-10, iterlim = 1000)

We print the coefficient estimates side-by-side for comparison.

d <- data.frame(distCoef = mleResults$estimate, aggCoef = pcph$coefficients)
rownames(d) <- names(pcph$coefficients)
kable(d)

Session Information

sessionInfo()


hrpcisd/distcomp documentation built on Feb. 14, 2023, 4:56 p.m.