We demonstrate the possibility of fitting statistical models stratified by sites in a manner that brings computation to the data that may be distributed across sites or more generally, partitioned in some manner. (For simplicity, we will call these partitions, sites.) The infrastructure consists of a single master process that issues queries to worker processes running at each of the sites. A query is merely a function call, more specifically a request to each site to evaluate a pre-defined function $f(\beta)$ on the data at that site, for a given value of parameters $\beta$. The master process uses these queries to aggregate and execute an optimization algorithm resulting in a model fit, the results of which should be indistinguishable from those that might be obtained if all the data had been in a single place. Of course, this assumes a lossless serialization format, like Google Protocol Buffers for example, but we make do with JSON for now. Also, in comparisons below, we don't use exactly the same iterations as the survival package and so minor differences will be seen.
The advantages are many, chief among them the fact that no raw data needs to be shared between sites. The modeling entity, however, can make an unlimited number of queries of the sites, where each query is a request to compute a model-specific function for a specified value of parameters. This may pose a security concern that we ignore for now. However, it leads to some further interesting questions regarding what may be learned such computation.
It must be noted that for users to be able to knit
this document, or
to run these examples in an R session, an opencpu
server must be
running with appropriate settings. On MacOS and Unix, this is done by
designating an empty directory as workspace and adding the following
lines to the ${HOME}/.Rprofile
.
library(distcomp) distcompSetup(workspace = "full_path_to_workspace_directory", ssl_verifyhost = 0L, ssl_verifypeer = 0L)
On windows, the same should be done in the RHOME\etc\Rprofile.site
file.
Note: On Yosemite (MacOS 10.10.4 and below), we have found that
references to localhost
sometimes fail in the opencpu URL; rather
the explicit IP address 127.0.0.1
is needed.
In what follows, we assume that such initialization profile has been
done. Furthermore, we assume that the opencpu
server has been
started in the same session as the one where this markdown document
is being knitted via library(opencpu)
. This is merely a
convenience that allows us to refer to the opencpu
server URL
programmatically via opencpu$url()
; otherwise, alternative means
would have to be found to refer to the opencpu
URL in a permanent
manner. For example, if opencpu
is started in another terminal thus:
library(opencpu) ocpu_server_start()
one could note down the URL that it displays after starting, and use it in the following function definition, as we have done.
opencpu <- list(url = function() "http://localhost:5656/ocpu")
to ensure that the expression opencpu$url()
returns the
appropriate URL.
To summarize: assuming that an opencpu
server has been started via
library(opencpu)
with a proper R profile and the expression
opencpu$url()
has been set up appropriately, one can proceed to knit
this document in that R session.
We take a simple example from the survival
package, the ovarian
dataset.
library(knitr) library(survival) data(ovarian) str(ovarian) kable(ovarian)
A simple Cox model fit estimates the effect of age on survival, stratified by drug.
cp <- coxph(Surv(futime, fustat) ~ age + strata(rx), data = ovarian, ties = "breslow") print(cp)
The above shows the intial and final log likelihood values at 0 and the estimated coefficient respectively and the actual estimated coefficient in the last line.
For our setting, we can pretend that this data set is actually from
two sites, one containing the control or placebo group (rx = 1
)and
the other containing the drug group (rx = 2
).
We first need to define the computation. The available computations can be listed:
print(availableComputations())
So, we can define the ovarian data computation as follows.
ovarianDef <- data.frame(compType = names(availableComputations())[2], formula = "Surv(futime, fustat) ~ age", he = FALSE, id = "Ovarian", stringsAsFactors=FALSE)
We split the ovarian
data into two sites as indicated earlier.
siteData <- with(ovarian, split(x = ovarian, f = rx))
We can now set up each site with its own data. A site is merely a list
of two items, a (unique) name
and an opencpu
URL.
nSites <- length(siteData) sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i), url = opencpu$url()))
By default, on each site, data for a computation is stored under the
name data.rds
and the definition itself is stored under the name
defn.rds
. If the sites are physically separate, then everything
proceeds smoothly. However, here, in our case, we are using the same
opencpu
server for simulating both sites. We therefore have to save
the files under different names, just for this experiment, say
site1.rds
and site2.rds
for this example. This is all taken care
of by the code which checks to see if the opencpu
URLs refer to
local hosts or not. (In fact, as this code is executing, one can
examine the contents of the workspace to see what is happening)
We now Map
the upload function to each site so that the computation
becomes well-defined.
ok <- Map(uploadNewComputation, sites, lapply(seq.int(nSites), function(i) ovarianDef), siteData) stopifnot(all(as.logical(ok)))
We are now ready to reproduce the original aggregated analysis. We first create a master object using the same definition.
master <- CoxMaster$new(defn = ovarianDef)
We then add the worker sites specifying a name and a URL for each. names.
for (i in seq.int(nSites)) { master$addSite(name = sites[[i]]$name, url = sites[[i]]$url) }
And we now maximize the partial likelihood, by calling the run
method of the master.
result <- master$run()
We then print the summary.
master$summary()
As we can see, the results we get from the distributed analysis are the same as we got for the original aggregated analysis. We print them separately here for comparison.
We turn to a larger the example from Therneau and Grambsch using the
pbc
data where the stratifying variable is ascites
.
data(pbc) pbcCox <- coxph(Surv(time, status==2) ~ age + edema + log(bili) + log(protime) + log(albumin) + strata(ascites), data = pbc, ties = "breslow") print(pbcCox)
We split the data using ascites
and proceed the usual way as shown above
pbcDef <- data.frame(compType = names(availableComputations())[2], he = FALSE, formula = paste("Surv(time, status==2) ~ age + edema +", "log(bili) + log(protime) + log(albumin)"), id = "pbc", stringsAsFactors = FALSE) siteData <- with(pbc, split(x = pbc, f = ascites)) nSites <- length(siteData) sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i), url = opencpu$url())) ok <- Map(uploadNewComputation, sites, lapply(seq.int(nSites), function(i) pbcDef), siteData) stopifnot(all(as.logical(ok))) master <- CoxMaster$new(defn = pbcDef) for (site in sites) { master$addSite(site$name, site$url) }
result <- master$run()
We then print the summary.
kable(master$summary())
The results should be comparable to the aggregated fit above.
This uses the bmt
data from Klein and Moschberger. Some variable
renaming, first.
if (!require("KMsurv")) { stop("Please install the KMsurv package before proceeding") }
## ## BMT data ## library(KMsurv) data(bmt) bmt$tnodis <- bmt$t2 ## time to disease relapse/death bmt$inodis <- bmt$d3 ## disease relapse/death indicator bmt$tplate <- bmt$tp ## time to platelet recovery bmt$iplate <- bmt$dp ## platelet recovery bmt$agep <- bmt$z1 ## age of patient in years bmt$aged <- bmt$z2 ## age of donor in years bmt$fab <- bmt$z8 ## fab grade 4 or 5 + AML bmt$imtx <- bmt$z10 ## MTX used bmt <- bmt[order(bmt$tnodis), ] ## order by time to disease relapse/death bmt <- cbind(1:nrow(bmt)[1], bmt) names(bmt)[1] <- "id" ## ##### ## bmt$agep.c <- bmt$agep - 28 bmt$aged.c <- bmt$aged - 28 bmt$imtx <- factor(bmt$imtx)
bmt.cph <- coxph(formula = Surv(tnodis, inodis) ~ fab + agep.c * aged.c + factor(group) + strata(imtx), data = bmt, ties = "breslow") print(bmt.cph)
We'll use imtx
for splitting data into sites.
bmtDef <- data.frame(compType = names(availableComputations())[2], he = FALSE, formula = paste("Surv(tnodis, inodis) ~ fab +", "agep.c * aged.c + factor(group)"), id = "bmt", stringsAsFactors = FALSE) siteData <- with(bmt, split(x = bmt, f = imtx)) nSites <- length(siteData) sites <- lapply(seq.int(nSites), function(i) list(name = paste0("site", i), url = opencpu$url())) ok <- Map(uploadNewComputation, sites, lapply(seq.int(nSites), function(i) bmtDef), siteData) stopifnot(all(as.logical(ok))) master <- CoxMaster$new(defn = bmtDef) for (site in sites) { master$addSite(site$name, site$url) }
result <- master$run()
We then print the summary.
kable(master$summary())
This example is the largest of them all and also has four strata rather than 2.
prostate <- readRDS("prostate.RDS")
pcph <- coxph(Surv(dtime, status) ~ stage + strata(rx) + age + wt + pf + hx + sbp + dbp + ekg + hg + sz + sg + ap + bm, data = prostate) print(pcph)
The distributed fit for this particular example doesn't work in the
current implementation. This is because the $X$ matrix for each site
is singular. The math holds, obviously, but the current implementation
is based on re-using as much of the survival
package as possible. We
have to work harder to implement the distributed computation in the
situation where $X$ is singular at at least one site. This affects the
computation of the variance (or equivalently, the information
matrix). Some work needs to be done to work around this and figure out
how best to reuse what's already in the survival
package.
However, in order to demonstrate that the distributed fit really works, we show below an alternative implementation that yields the same result as the aggregated one.
Here's a reference object for each site. It has several fields: a
data
field containing the data, a formula
field (as used in the
well-known survival
R package) describing the model being fit. These
two are the only ones needed for initializing a site. Other fields
that are generated based on these two fields are modelDataFrame
containing the actual model data used for fitting the model and a
modelMatrix
.
site <- setRefClass("siteObject", fields = list( data = "data.frame", formula = "formula", modelDataFrame = "data.frame", coxControl = "list", modelMatrix = "matrix"), methods = list( initialize = function(formula, data) { 'Initialize the object with a formula and dataset' formula <<- formula data <<- data temp <- coxph.control() temp$iter.max <- 0 coxControl <<- temp stopifnot(kosher()) }, kosher = function() { 'Check that the class data passes sanity checks' modelDataFrame <<- model.frame(formula, data = data) lhs <- modelDataFrame[, 1] ordering <- order(lhs[, 1]) modelDataFrame <<- modelDataFrame[ordering, ] data <<- data[ordering, ] modelMatrix <<- model.matrix(formula, data = modelDataFrame) TRUE }, dimP = function() { 'Return the number of covariates' ncol(modelMatrix) - 1 }) ) site$accessors(c("data", "formula", "modelDataFrame", "modelMatrix"))
The initialize
method above mostly sets the fields, generates values
for other fields, and does a mild sanity check. It will not proceed
further if the function kosher
returns false. For now the kosher
function merely orders the data frame by follow-up time, but in a
production system a number of other checks might be performed, such as
ensuring all named variables are available at the site.
The method dimP
merely returns the number of columns of the model
matrix.
For our example, the (partial) log likelihood (named localLogLik
) is simple
and can be computed directly. Assuming failure times $t_i$ and
event indicators $\delta_i$, it is
precisely:
$$
l(\beta) = \sum_{i=1}^n\delta_i\biggl[z_i\beta -\log\bigl(\sum_{j\in R(t_i)} \exp(z_j\beta)\bigr)\biggr]
$$
where $\beta$ is the vector of parameters, $z_i$ is row $i$ of the
model matrix (covariates for subject $i$) and $R(t_i)$ is the risk set
at time $t_i$.
The first derivative with respect to $\beta$ is: $$ l'(\beta) = Z^T\delta - \sum_{i=1}^n\delta_i\frac{\sum_{j\in R(t_i)} \exp(z_j\beta)z_j^T}{\sum_{j\in R(t_i)} \exp(z_j\beta)}. $$
localLogLik <- function(beta) { beta <- c(0, beta) ## model matrix has intercept in model z <- modelMatrix delta <- modelDataFrame[, 1][, 2] ## event indicators zBeta <- z %*% beta sum.exp.zBeta <- rev(cumsum(rev(exp(zBeta)))) ld <- delta %*% (z - apply(diag(as.numeric(zBeta)) %*% z, 2, function(x) rev(cumsum(rev(x)))) / sum.exp.zBeta) result <- sum(delta * (zBeta - log(sum.exp.zBeta))) # assuming Breslow attr(result, "gradient") <- ld result } site$methods(logLik = localLogLik)
We are now ready to do the alternative distributed fit. We split the
prostate
data into two sites as indicated earlier.
siteData <- with(prostate, split(x = prostate, f = rx)) sites <- lapply(siteData, function(x) { site$new(data = x, formula = Surv(dtime, status) ~ stage + age + wt + pf + hx + sbp + dbp + ekg + hg + sz + sg + ap + bm) })
Ok, now we can reproduce the original aggregated analysis by writing a full likelihood routine.
logLik <- function(beta, sites) { sum(sapply(sites, function(x) x$logLik(beta))) }
All that remains is to maximize this log likelihood.
mleResults <- nlm(f=function(x) -logLik(x, sites), p = rep(0, sites[[1]]$dimP()), gradtol = 1e-10, iterlim = 1000)
We print the coefficient estimates side-by-side for comparison.
d <- data.frame(distCoef = mleResults$estimate, aggCoef = pcph$coefficients) rownames(d) <- names(pcph$coefficients) kable(d)
sessionInfo()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.