# Copyright 2021 Penn Computing Inference Learning (PennCIL) lab
# https://penncil.med.upenn.edu/team/
# This file is part of pda
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# https://style.tidyverse.org/functions.html#naming
# https://ohdsi.github.io/Hades/codeStyle.html#OHDSI_code_style_for_R
#' @useDynLib pda
#' @title Function to upload object to cloud as json
#'
#' @usage pdaPut(obj,name,config,upload_without_confirm)
#' @param obj R object to encode as json and uploaded to cloud
#' @param name of file
#' @param config a list of variables for cloud configuration
#' @param upload_without_confirm logical. TRUE if want silent upload, no interactive confirm
#' @importFrom utils menu
#' @return NONE
#' @seealso \code{pda}
#' @export
pdaPut <- function(obj,name,config,upload_without_confirm,silent_message=F){
mymessage <- function(mes, silent=silent_message) if(silent==F) message(mes)
obj_Json <- jsonlite::toJSON(obj, digits = 8)
file_name <- paste0(name, '.json')
if(!is.null(config$uri)){
mymessage(paste("Put",file_name,"on public cloud:"))
}else{
mymessage(paste("Put",file_name,"on local directory", config$dir, ':'))
}
mymessage(obj_Json)
# if(interactive()) {
if(upload_without_confirm==F) {
authorize = menu(c("Yes", "No"), title="Allow transfer?")
} else {
authorize = "1"
}
if (authorize != 1) {
warning("file not transferred. You can specify file transfer setting in pda().")
return(FALSE)
}
# the file to upload
if (is.character(config$dir)) {
file_path <- paste0(config$dir,'/', file_name)
} else {
file_path <- paste0(tempdir(),'/', file_name)
}
write(obj_Json, file_path)
if (is.character(config$uri)) {
# create the url target of the file
url <- file.path(config$uri, file_name)
# webdav PUT request to send a file to cloud
r<-httr::PUT(url, body = httr::upload_file(file_path), httr::authenticate(config$site_id, config$secret, 'digest'))
mymessage(paste("putting:",url))
}
}
#' @useDynLib pda
#' @title Function to list available objects
#' @usage pdaList(config)
#' @param config a list of variables for cloud configuration
#' @importFrom rvest html_nodes
#' @import httr
#' @return A list of (json) files on the cloud
#' @seealso \code{pda}
#' @export
pdaList <- function(config){
if (is.character(config$uri)) {
res<-httr::GET(config$uri, httr::authenticate(config$site_id, config$secret, 'digest'))
files<-rvest::html_nodes(httr::content(res), xpath = "//table//td/a")
files<-files[lapply(files,length)>0]
files<-regmatches(files, gregexpr("(?<=\")(.*?)(json)(?=\")", files, perl = TRUE))
} else if (is.character(config$dir)) {
files<-list.files(config$dir,pattern = "\\.json$")
} else {
files<-list.files(tempdir(),pattern = "\\.json$")
}
files<-substr(files,1,nchar(files)-5)
return(files)
}
#' @useDynLib pda
#' @title Function to download json and return as object
#' @usage pdaGet(name,config)
#' @param name of file
#' @param config cloud configuration
#' @return A list of data objects from the json file on the cloud
#' @seealso \code{pda}
#' @export
pdaGet <- function(name,config){
file_name <- paste0(name, '.json')
#print(paste("Get",file_name,"from public cloud:"))
# the file to upload
if (is.character(config$dir)) {
file_path <- paste0(config$dir,'/', file_name)
} else {
file_path <- paste0(tempdir(),'/', file_name)
}
if (is.character(config$uri)) {
url <- file.path(config$uri, file_name)
#write the file from GET request to file_path
res<-httr::GET(url, httr::write_disk(file_path, overwrite = TRUE), httr::authenticate(config$site_id, config$secret, 'digest'))
#print(paste("getting:",url))
}
obj<-jsonlite::fromJSON(file_path)
return(obj)
}
#' @useDynLib pda
#' @title gather cloud settings into a list
#' @usage getCloudConfig(site_id,dir,uri,secret)
#' @param site_id site identifier
#' @param dir shared directory path if flat files
#' @param uri web uri if web service
#' @param secret web token if web service
#' @return A list of cloud parameters: site_id, secret and uri
#' @seealso \code{pda}
#' @export
getCloudConfig <- function(site_id,dir=NULL,uri=NULL,secret=NULL,silent_message=T){
mymessage <- function(mes, silent=silent_message) if(silent==F) message(mes)
config<-list()
pda_user<-Sys.getenv('PDA_USER')
pda_secret<-Sys.getenv('PDA_SECRET')
pda_uri<-Sys.getenv('PDA_URI')
pda_dir<-Sys.getenv('PDA_DIR')
config$site_id=site_id
if(!is.null(secret)) {
config$secret = secret
} else if (pda_secret!='') {
config$secret = pda_secret
}
if(!is.null(uri)) {
config$uri = uri
} else if (pda_uri!='') {
config$uri = pda_uri
} else{
mymessage('no cloud uri found! ')
}
if(!is.null(dir)) {
config$dir = dir
} else if (pda_dir!='') {
config$dir = pda_dir
}else{
mymessage('no public or local directory supplied, use local temporary:', tempdir())
config$dir = tempdir()
}
config;
}
#' @useDynLib pda
#' @title PDA: Privacy-preserving Distributed Algorithm
#'
#' @description Fit Privacy-preserving Distributed Algorithms for linear, logistic,
#' Poisson and Cox PH regression with possible heterogeneous data across sites.
#' @usage pda(ipdata,site_id,control,dir,uri,secret)
#' @param ipdata Local IPD data in data frame, should include at least one column for the outcome and one column for the covariates
#' @param site_id Character site name
#' @param control pda control data
#' @param dir directory for shared flat file cloud
#' @param uri Universal Resource Identifier for this run
#' @param secret password to authenticate as site_id on uri
#' @param upload_without_confirm logical. TRUE if want silent upload, no interactive confirm
#' @param hosdata (for dGEM) hospital-level data, should include the same name as defined in the control file
#' @return control
#' @seealso \code{pdaPut}, \code{pdaList}, \code{pdaGet}, \code{getCloudConfig} and \code{pdaSync}.
#' @import stats survival rvest jsonlite data.table httr Rcpp metafor
#'
#' @references
#' Michael I. Jordan, Jason D. Lee & Yun Yang (2019) Communication-Efficient Distributed Statistical Inference, \cr
#' \emph{Journal of the American Statistical Association}, 114:526, 668-681 \cr
#' \doi{10.1080/01621459.2018.1429274}.\cr
#' (DLM) Yixin Chen, et al. (2006) Regression cubes with lossless compression and aggregation.
#' IEEE Transactions on Knowledge and Data Engineering, 18(12), pp.1585-1599. \cr
#' (DLMM) Chongliang Luo, et al. (2020) Lossless Distributed Linear Mixed Model with Application to Integration of Heterogeneous Healthcare Data.
#' medRxiv, \doi{10.1101/2020.11.16.20230730}. \cr
#' (DPQL) Chongliang Luo, et al. (2021) dPQL: a lossless distributed algorithm for generalized linear mixed model with application to privacy-preserving hospital profiling. \cr
#' medRxiv, \doi{10.1101/2021.05.03.21256561}. \cr
#' (ODAL) Rui Duan, et al. (2020) Learning from electronic health records across multiple sites: \cr
#' A communication-efficient and privacy-preserving distributed algorithm. \cr
#' \emph{Journal of the American Medical Informatics Association}, 27.3:376–385,
#' \cr \doi{10.1093/jamia/ocz199}.\cr
#' (ODAC) Rui Duan, et al. (2020) Learning from local to global: An efficient distributed algorithm for modeling time-to-event data. \cr
#' \emph{Journal of the American Medical Informatics Association}, 27.7:1028–1036, \cr
#' \doi{10.1093/jamia/ocaa044}. \cr
#' (ODACH) Chongliang Luo, et al. (2021) ODACH: A One-shot Distributed Algorithm for Cox model with Heterogeneous Multi-center Data. \cr
#' \emph{medRxiv}, \doi{10.1101/2021.04.18.21255694}. \cr
#' (ODAH) Mackenzie J. Edmondson, et al. (2021) An Efficient and Accurate Distributed Learning Algorithm for Modeling Multi-Site Zero-Inflated Count Outcomes.
#' medRxiv, pp.2020-12. \cr
#' \doi{10.1101/2020.12.17.20248194}. \cr
#' (ADAP) Xiaokang Liu, et al. (2021) ADAP: multisite learning with high-dimensional heterogeneous data via A Distributed Algorithm for Penalized regression. \cr
#' (dGEM) Jiayi Tong, et al. (2022) dGEM: Decentralized Generalized Linear Mixed Effects Model \cr
#' @examples
#' require(survival)
#' require(data.table)
#' require(pda)
#' data(lung)
#'
#' ## In the toy example below we aim to analyze the association of lung status with
#' ## age and sex using logistic regression, data(lung) from 'survival', we randomly
#' ## assign to 3 sites: 'site1', 'site2', 'site3'. we demonstrate using PDA ODAL can
#' ## obtain a surrogate estimator that is close to the pooled estimate. We run the
#' ## example in local directory. In actual collaboration, account/password for pda server
#' ## will be assigned to the sites at the server https://pda.one.
#' ## Each site can access via web browser to check the communication of the summary stats.
#'
#' ## for more examples, see demo(ODAC) and demo(ODAP)
#'
#' # Create 3 sites, split the lung data amongst them
#' sites = c('site1', 'site2', 'site3')
#' set.seed(42)
#' lung2 <- lung[,c('status', 'age', 'sex')]
#' lung2$sex <- lung2$sex - 1
#' lung2$status <- ifelse(lung2$status == 2, 1, 0)
#' lung_split <- split(lung2, sample(1:length(sites), nrow(lung), replace=TRUE))
#' ## fit logistic reg using pooled data
#' fit.pool <- glm(status ~ age + sex, family = 'binomial', data = lung2)
#'
#'
#' # ############################ STEP 1: initialize ###############################
#' control <- list(project_name = 'Lung cancer study',
#' step = 'initialize',
#' sites = sites,
#' heterogeneity = FALSE,
#' model = 'ODAL',
#' family = 'binomial',
#' outcome = "status",
#' variables = c('age', 'sex'),
#' optim_maxit = 100,
#' lead_site = 'site1',
#' upload_date = as.character(Sys.time()) )
#'
#'
#' ## run the example in local directory:
#' ## specify your working directory, default is the tempdir
#' mydir <- tempdir()
#' ## assume lead site1: enter "1" to allow transferring the control file
#' pda(site_id = 'site1', control = control, dir = mydir)
#' ## in actual collaboration, account/password for pda server will be assigned, thus:
#' \dontrun{pda(site_id = 'site1', control = control, uri = 'https://pda.one', secret='abc123')}
#' ## you can also set your environment variables, and no need to specify them in pda:
#' \dontrun{Sys.setenv(PDA_USER = 'site1', PDA_SECRET = 'abc123', PDA_URI = 'https://pda.one')}
#' \dontrun{pda(site_id = 'site1', control = control)}
#'
#' ##' assume remote site3: enter "1" to allow tranferring your local estimate
#' pda(site_id = 'site3', ipdata = lung_split[[3]], dir=mydir)
#'
#' ##' assume remote site2: enter "1" to allow tranferring your local estimate
#' pda(site_id = 'site2', ipdata = lung_split[[2]], dir=mydir)
#'
#' ##' assume lead site1: enter "1" to allow tranferring your local estimate
#' ##' control.json is also automatically updated
#' pda(site_id = 'site1', ipdata = lung_split[[1]], dir=mydir)
#'
#' ##' if lead site1 initialized before other sites,
#' ##' lead site1: uncomment to sync the control before STEP 2
#' \dontrun{pda(site_id = 'site1', control = control)}
#' \dontrun{config <- getCloudConfig(site_id = 'site1')}
#' \dontrun{pdaSync(config)}
#'
#' #' ############################' STEP 2: derivative ############################
#' ##' assume remote site3: enter "1" to allow tranferring your derivatives
#' pda(site_id = 'site3', ipdata = lung_split[[3]], dir=mydir)
#'
#' ##' assume remote site2: enter "1" to allow tranferring your derivatives
#' pda(site_id = 'site2', ipdata = lung_split[[2]], dir=mydir)
#'
#' ##' assume lead site1: enter "1" to allow tranferring your derivatives
#' pda(site_id = 'site1', ipdata = lung_split[[1]], dir=mydir)
#'
#'
#' #' ############################' STEP 3: estimate ############################
#' ##' assume lead site1: enter "1" to allow tranferring the surrogate estimate
#' pda(site_id = 'site1', ipdata = lung_split[[1]], dir=mydir)
#'
#' ##' the PDA ODAL is now completed!
#' ##' All the sites can still run their own surrogate estimates and broadcast them.
#'
#' ##' compare the surrogate estimate with the pooled estimate
#' config <- getCloudConfig(site_id = 'site1', dir=mydir)
#' fit.odal <- pdaGet(name = 'site1_estimate', config = config)
#' cbind(b.pool=fit.pool$coef,
#' b.odal=fit.odal$btilde,
#' sd.pool=summary(fit.pool)$coef[,2],
#' sd.odal=sqrt(diag(solve(fit.odal$Htilde)/nrow(lung2))))
#'
#' ## see demo(ODAL) for more optional steps
#'
#' @return control
#' @export
pda <- function(ipdata=NULL,site_id,control=NULL,dir=NULL,uri=NULL,secret=NULL,
upload_without_confirm=F, silent_message=F,
hosdata=NULL # for dGEM
){
config <- getCloudConfig(site_id,dir,uri,secret,silent_message)
mymessage <- function(mes, silent=silent_message) if(silent==F) message(mes)
#add a control if one was provided
if(!(is.null(control)) && config$site_id==control$lead_site) { # control$sites[1]
pdaPut(obj=control,name='control',config=config,upload_without_confirm,silent_message)
return(control) # ?
}
control = pdaGet('control',config)
mymessage('You are performing Privacy-preserving Distributed Algorithm (PDA, https://github.com/Penncil/pda): ')
mymessage('your site = ', config$site_id)
## specify pda steps and family based on model
if(control$model=='ODAL'){
ODAL.steps<-c('initialize','derive','estimate','synthesize')
ODAL.family<-'binomial'
}else if(control$model=='ADAP'){
ADAP.steps<-c('initialize','derive','estimate')
ADAP.family<-'lasso'
}else if(control$model=='ODAP'){
ODAP.steps<-c('initialize','derive','estimate','synthesize')
# for ODAP need to specify family (poisson, ztpoisson, quasipoisson, ztquasipoisson) in control
ODAP.family<-control$family
}else if(control$model=='ODAPB'){
ODAPB.steps<-c('initialize','derive','estimate','synthesize')
# for ODAPB need to specify family (poisson) in control
ODAPB.family<-control$family
}else if(control$model=='ODAH'){
ODAH.steps<-c('initialize','derive','estimate','synthesize')
# family = 'hurdle' in control
ODAH.family<-control$family
if(ODAH.family!='hurdle'){
warning("currently only support family='hurdle'" )
ODAH.family<-'hurdle'
}
}else if(control$model=='ODAC'){
if(control$heterogeneity==F){ # ODAC
ODAC.steps<-c('initialize','deriveUWZ','derive','estimate','synthesize')
}else{ # ODACH with heterogeneous baseline hazards across sites
ODAC.steps<-c('initialize','derive', 'estimate','synthesize')
}
ODAC.family<-'cox'
}else if(control$model=='ODACAT'){ # multi-category
ODACAT.steps <- c('initialize','derive','estimate','synthesize')
ODACAT.family <- 'multicategory'
}else if(control$model=='ODACATH'){ # added by Jessie & Ken on Feb 24, 2023
ODACATH.steps <- c('initialize','derive','estimate','synthesize')
ODACATH.family <- 'multicategory'
if(control$heterogeneity==T){
mymessage("You specified control$heterogeneity = T, so you are using the hetero-version of ODACAT.")
}
}else if(control$model=='DLM'){
DLM.steps<-c('initialize', 'estimate')
DLM.family<-'gaussian'
if(control$heterogeneity==T){
if(is.null(control$heterogeneity_effect)){
stop('You specified control$heterogeneity = T, please also specify control$heterogeneity_effect as either "fixed" or "random"! ')
} else if(control$heterogeneity_effect!='fixed' & control$heterogeneity_effect!='random'){
stop('You specified control$heterogeneity = T, please also specify control$heterogeneity_effect as either "fixed" or "random"! ')
}
if(length(setdiff(control$variables_heterogeneity, c(control$variables, "Intercept")))!=0)
stop('You specified control$heterogeneity = T, please also specify control$variables_heterogeneity as a SUBSET of "Intercept" and control$variables!')
if(is.null(control$variables_heterogeneity)){
mymessage('You specified control$heterogeneity = T, but no control$variables_heterogeneity, use "Intercept" as default!')
control$variables_heterogeneity <- 'Intercept'
}
}
}else if(control$model=='DPQL'){
# control$maxround: prespecified number of rounds
# "derive_1" "estimate_1" "derive_2" "estimate_2" "derive_3" "estimate_3"
DPQL.steps<-paste0(rep(c('derive', 'estimate'), control$maxround), '_', rep(1:control$maxround, each=2))
DPQL.family<-control$family # can be any glm family...
# if(control$heterogeneity==T){
# if(is.null(control$heterogeneity_effect)){ # glmm with fixed site-specific effects?
# stop('You specified control$heterogeneity = T, please also specify control$heterogeneity_effect as either "fixed" or "random"! ')
# } else if(control$heterogeneity_effect!='fixed' & control$heterogeneity_effect!='random'){
# stop('You specified control$heterogeneity = T, please also specify control$heterogeneity_effect as either "fixed" or "random"! ')
# }
if(length(setdiff(control$variables_heterogeneity, c(control$variables, "Intercept")))!=0)
stop('Please specify control$variables_heterogeneity as a SUBSET of "Intercept" and control$variables!')
# stop('You specified control$heterogeneity = T, please also specify control$variables_heterogeneity as a SUBSET of "Intercept" and control$variables!')
if(is.null(control$variables_heterogeneity)){
mymessage('No control$variables_heterogeneity, use "Intercept" as default!')
# message('You specified control$heterogeneity = T, but no control$variables_heterogeneity, use "Intercept" as default!')
control$variables_heterogeneity <- 'Intercept'
}
# }
}else if(control$model == 'dGEM'){
dGEM.steps<-c('initialize','derive','estimate','synthesize')
dGEM.family<-'binomial'
variables_site_level <- control$variables_site_level
}else if(control$model == 'OLGLM'){
OLGLM.steps<-c('initialize')
OLGLM.family<-'binomial'
}else if(control$model == 'OLGLMM'){
OLGLMM.steps<-c('initialize')
OLGLMM.family<-'binomial'
}else if(control$model == 'ODACH_CC'){
ODACH_CC.steps<-c('initialize','derive', 'estimate','synthesize')
ODACH_CC.family<-'cox'
}
family = get(paste0(control$model,'.family'))
## prepare the ipdata: make factor of all categorical variables to make dummy design matrix,
# in case some X's are degenerate at some site, see model.matrix(contrasts=...)
# xlev.contrasts <- control$xlev
# for(ii in names(control$xlev)){
# ipdata[,ii] = factor(ipdata[,ii], levels = control$xlev[[ii]])
# xlev.contrasts[[ii]] = 'contr.treatment' # options("contrasts") default
# }
if (!is.null(ipdata)){
n = nrow(ipdata)
if(family=='hurdle'){ # count and zero parts for hurdle, Xcount first
variables <- control$variables_hurdle_count
}else{
variables <- control$variables
}
formula <- as.formula(paste(control$outcome, paste(variables, collapse = "+"), sep = '~'))
mf <- model.frame(formula, ipdata, xlev=control$variables_lev)
}
# Data sanity check: columns with no variation, or missed categorical levels...
# if detected, may need to revise the data at this site, or
# exclude problematic variables from the protocol, or
# exclude the site
svd_d = svd(model.matrix(formula, mf))$d
if(sum(svd_d < 1e-10)>=1) warning(site_id, ': data degeneration detected!!! Please discuss with your collaborators!')
## this is used in model.matrix(contrasts=...)
# if(options()$contrasts['unordered']=="contr.treatment") options(contrasts = c("contr.treatment", "contr.poly"))
# create ipdata via model.matrix to make dummy variables for categorical covariates...
# the resulted ipdata format will be used in later functions, i.e. ODAX etc
if(control$model=='ODAC'){
ipdata = data.table::data.table(time=as.numeric(model.response(mf))[1:n],
status=as.numeric(model.response(mf))[-c(1:n)],
model.matrix(formula, mf)[,-1])
ipdata = data.table(data.frame(ipdata))
control$risk_factor = colnames(ipdata)[-c(1:2)]
}else if(control$model=='ODAL'){
ipdata = data.table::data.table(status=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
}else if(control$model=='ADAP'){
ipdata = data.table::data.table(status=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
}else if(control$model=='ODAH'){ # count and zero parts for hurdle
X_count = data.table::data.table(model.matrix(formula, mf))
# also make design X_zero
formula <- as.formula(paste(control$outcome, paste(control$variables_hurdle_zero, collapse = "+"), sep = '~'))
mf <- model.frame(formula, ipdata)
X_zero = data.table::data.table(model.matrix(formula, mf))
if(is.character(control$offset)){
offset <- ipdata[,control$offset]
}else{
offset = rep(0, n)
}
ipdata <- list(ipdata=ipdata, X_count=X_count, X_zero=X_zero, offset=offset)
control$risk_factor = c('Intercept', control$variables_hurdle_count, 'Intercept', control$variables_hurdle_zero)
}else if(control$model=='ODAP'){
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)),
offset=ifelse(is.character(control$offset), ipdata[,control$offset], 0),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-c(1:2)]
}else if(control$model=='ODAPB'){
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)),
offset=ifelse(is.character(control$offset), ipdata[,control$offset], 0),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-c(1:2)]
}else if(control$model=='ODACAT'){
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)), ## multi-category y is 1:q
model.matrix(formula, mf))[,-2] # remove the intercept column. ODACAT does not need that column.
control$risk_factor = colnames(ipdata)[-1]
}else if(control$model=='ODACATH'){ # added by Jessie & Ken on Feb 24, 2023
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)), ## multi-category y is 1:q
model.matrix(formula, mf))[,-2] # remove the intercept column. ODACATH does not need that column.
control$risk_factor = colnames(ipdata)[-1]
}else if(control$model=='DLM'){
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
control$risk_factor_heterogeneity = control$risk_factor[grepl(paste0(control$variables_heterogeneity, collapse='|'), control$risk_factor)]
}else if(control$model=='DPQL'){
ipdata = data.table::data.table(outcome=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1] # may induce more cols for dummy vars
control$risk_factor_heterogeneity = control$risk_factor[grepl(paste0(control$variables_heterogeneity, collapse='|'), control$risk_factor)]
}else if(control$model=='dGEM'){
if (!is.null(ipdata)){
ipdata = data.table::data.table(status=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
}
}else if(control$model=='OLGLM'){
if(control$step == "initialize"){
ipdata = data.table::data.table(status=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
}else{
control$risk_factor = colnames(ipdata)[-1]
}
} else if(control$model=='OLGLMM'){
if (!is.null(ipdata)){
if(control$step == "initialize"){
ipdata = data.table::data.table(status=as.numeric(model.response(mf)),
model.matrix(formula, mf))
control$risk_factor = colnames(ipdata)[-1]
}else{
control$risk_factor = colnames(ipdata)[-1]
}
}
} else if(control$model=='ODACH_CC'){
if (!is.null(ipdata)){
ipdata = data.table::data.table(time=as.numeric(model.response(mf))[1:n],
status=as.numeric(model.response(mf))[-c(1:n)],
subcohort = ipdata$subcohort,
# sampling_weight = ipdata$sampling_weight,
model.matrix(formula, mf)[,-1])
# convert irregular risk factor names, e.g. `Group (A,B,C) B` to Group..A.B.C..B
# this should (and will) apply to all other models...
ipdata = data.table(data.frame(ipdata))
control$risk_factor = colnames(ipdata)[-c(1:3)]
}
}
## execute the current step function
if(is.character(control$step)){
step_function <- paste0(control$model,'.', gsub('[^[:alpha:]]', '',control$step)) # "derive_1" for dPQL
if(control$model == 'dGEM'){
if(!is.null(ipdata)){
if(control$step == 'derive'){
step_obj <- get(step_function)(ipdata, control, config, hosdata)
}else if (control$step == 'synthesize'){
if(config$site_id != control$lead_site){
stop("Only lead site or coordinating center can perform the last step (i.e., synthesize)")
}else{
step_obj <- get(step_function)(control, config)
}
} else {
step_obj <- get(step_function)(ipdata, control, config)
}
}else {
if (control$step == 'synthesize'){
step_obj <- get(step_function)(control, config)
}
}
}else if(control$model == "OLGLMM"){
if(is.null(ipdata) & (config$site_id == control$lead_site)){
print("As the leading site, you are going to produce the final results")
}else{
step_obj <- get(step_function)(ipdata, control, config)
}
}
else{
step_obj <- get(step_function)(ipdata, control, config)
}
## pda completion message
if(control$step=='estimate'){
if(control$model=='DLM'){
mymessage("Congratulations, the PDA is completed! The result is guaranteed to be identical to the pooled analysis")
}else{
if(control$model=='dGEM'){
mymessage("Congratulations, this the final step: you are transfering the counterfactural event rate. The lead site or coordinating center will broadcast the final results")
}else{
mymessage("Congratulations, the PDA is completed! You can continue broadcasting your surrogate estimate to further synthesize them.")
}
}
} else if(control$step==paste0('estimate_', control$maxround)){ # dPQL
mymessage("Congratulations, the PDA is completed! The result is guaranteed to be identical to the pooled analysis")
}
## write output to .json file
if(!is.null(ipdata)){
pdaPut(step_obj,paste0(config$site_id,'_',control$step),config,upload_without_confirm,silent_message)
}else{
if(control$model == "dGEM"){
if(control$step == "synthesize"){
pdaPut(step_obj,paste0(config$site_id,'_',control$step),config,upload_without_confirm,silent_message)
}
}
}
## synchronize control file (at lead site)
if(config$site_id==control$lead_site) {
control<-pdaSync(config,upload_without_confirm,silent_message)
}
}
invisible(control)
}
#' @useDynLib pda
#' @title pda control synchronize
#'
#' @description update pda control if ready (run by lead)
#' @usage pdaSync(config)
#' @param config cloud configuration
#' @return control
#' @seealso \code{pda}
#' @export
pdaSync <- function(config,upload_without_confirm,silent_message=F){
control = pdaGet('control',config)
mymessage <- function(mes, silent=silent_message) if(silent==F) message(mes)
if(control$model=='ODAL'){
ODAL.steps<-c('initialize','derive','estimate','synthesize')
ODAL.family<-'binomial'
}else if(control$model=='ADAP'){
ADAP.steps<-c('initialize','derive','estimate')
ADAP.family<-'lasso'
}else if(control$model=='ODAP'){
ODAP.steps<-c('initialize','derive','estimate','synthesize')
# for ODAP need to specify family (poisson, ztpoisson, quasipoisson, ztquasipoisson) in control
ODAP.family<-control$family
}else if(control$model=='ODAPB'){
ODAPB.steps<-c('initialize','derive','estimate','synthesize')
# for ODAP need to specify family (poisson, ztpoisson, quasipoisson, ztquasipoisson) in control
ODAPB.family<-control$family
}else if(control$model=='ODAH'){
ODAH.steps<-c('initialize','derive','estimate','synthesize')
# for ODAH family = 'hurdle' in control
ODAH.family<-'hurdle' # control$family
}else if(control$model=='ODAC'){
if(control$heterogeneity==F){ # ODAC
ODAC.steps<-c('initialize','deriveUWZ','derive','estimate','synthesize')
}else{ # ODACH with heterogeneous baseline hazards across sites
ODAC.steps<-c('initialize','derive', 'estimate','synthesize')
}
ODAC.family<-'cox'
} else if(control$model=='ODACAT'){
ODACAT.steps<-c('initialize','derive','estimate','synthesize')
ODACAT.family<-'multicategory'
} else if(control$model=='ODACATH'){ # added by Jessie & Ken on Feb 24, 2023
ODACATH.steps<-c('initialize','derive','estimate','synthesize')
ODACATH.family<-'multicategory'
} else if(control$model=='DLM'){
DLM.steps<-c('initialize','estimate')
DLM.family<-'gaussian'
} else if(control$model=='DPQL'){
DPQL.steps<-paste0(rep(c('derive', 'estimate'), control$maxround), '_', rep(1:control$maxround, each=2))
DPQL.family<-control$family
} else if(control$model=='dGEM'){
dGEM.steps<-c('initialize','derive','estimate','synthesize')
dGEM.family<-'binomial'
}else if(control$model == 'OLGLM'){
OLGLM.steps<-c('initialize')
OLGLM.family<-'binomial'
}else if(control$model == 'OLGLMM'){
OLGLMM.steps<-c('initialize')
OLGLMM.family<-'binomial'
}else if(control$model=='ODACH_CC'){ # ODACH with case-cohort design
ODACH_CC.steps<-c('initialize','derive', 'estimate','synthesize')
ODACH_CC.family<-'cox'
}
files<-pdaList(config)
if(all(paste0(control$sites,"_",control$step) %in% files)){ # all init are ready
if(control$step=="initialize"){
if(control$lead_site %in% control$sites){
init_i <- pdaGet(paste0(control$lead_site,'_initialize'),config)
}
if(control$model=='DLM'){
# DLM does not need derivative, thus estimate after initialize...
}else if(control$model=='ODAH'){
bhat_zero <-init_i$bhat_zero_i
vbhat_zero <- init_i$Vhat_zero_i
bhat_count <-init_i$bhat_count_i
vbhat_count <- init_i$Vhat_count_i
for(site_i in control$sites){
if(site_i!=control$lead_site){
init_i <- pdaGet(paste0(site_i,'_initialize'),config)
bhat_zero = rbind(bhat_zero, init_i$bhat_zero_i)
vbhat_zero = rbind(vbhat_zero, init_i$Vhat_zero_i)
bhat_count = rbind(bhat_count, init_i$bhat_count_i)
vbhat_count = rbind(vbhat_count, init_i$Vhat_count_i)
}
}
#estimate from meta-analysis
bmeta_zero = apply(bhat_zero/vbhat_zero,2,function(x){sum(x, na.rm = TRUE)})/apply(1/vbhat_zero,2,function(x){sum(x, na.rm = TRUE)})
vmeta_zero = 1/apply(1/vbhat_zero,2,function(x){sum(x, na.rm = TRUE)})
bmeta_count = apply(bhat_count/vbhat_count,2,function(x){sum(x, na.rm = TRUE)})/apply(1/vbhat_count,2,function(x){sum(x, na.rm = TRUE)})
vmeta_count = 1/apply(1/vbhat_count,2,function(x){sum(x, na.rm = TRUE)})
res = list(bmeta_zero = bmeta_zero, vmeta_zero = vmeta_zero,
bmeta_count = bmeta_count, vmeta_count = vmeta_count)
mymessage('meta analysis (inverse variance weighted average) result:')
#print(res)
control$beta_zero_init = bmeta_zero
control$beta_count_init = bmeta_count
}else if(control$model=='ADAP'){
bhat <- init_i$bhat_i
# vhbat = rep(nrow(ipdata), ncol(ipdata)-1) stores each site's sample size
vbhat <- init_i$Vhat_i
for(site_i in control$sites){
if(site_i!=control$lead_site){
init_i <- pdaGet(paste0(site_i,'_initialize'),config)
bhat = rbind(bhat, init_i$bhat_i)
vbhat = rbind(vbhat, init_i$Vhat_i)
}
}
#estimate with weighted average
bmeta = apply(diag(vbhat[,1])%*%bhat,2,function(x){sum(x, na.rm = TRUE)})/sum(vbhat[,1], na.rm = TRUE)
vmeta = NA
res = list(bmeta = bmeta, vmeta = vmeta)
mymessage('sample size weighted average result:')
#print(res)
control$beta_init = bmeta
}else if(control$model == "OLGLM"){
K <- length(control$sites)
if(control$heterogeneity == FALSE){
read_AD <- pdaGet(paste0(control$sites[1],'_initialize'),config)
# print(read_AD$SXY)
SXY <- read_AD$SXY
Xtable <- as.data.frame(matrix(unlist(read_AD$Xtable), ncol = (length(control$variables) + 2)))
colnames(Xtable) <- c("intercept", control$variables, "n")
Xcat <- Xtable[,colnames(Xtable)!='n']
Xcat <- as.matrix(Xcat)
counts <- Xtable$n
for(site_i in control$sites[-1]){
KSiteAD <- pdaGet(paste0(site_i,'_initialize'),config)
SXY <- SXY+KSiteAD$SXY
Xtable <- as.data.frame(matrix(unlist(KSiteAD$Xtable), ncol = (length(control$variables) + 2)))
colnames(Xtable) <- c("intercept", control$variables, "n")
counts <- counts + Xtable$n
}
}else{
read_AD <- pdaGet(paste0(control$sites[1],'_initialize'),config)
SXY.intercept <- read_AD$SXY[1]
SXY.cov <- read_AD$SXY[-1]
Xtable <-as.data.frame(matrix(unlist(read_AD$Xtable), ncol = (length(control$variables) + 2)))
colnames(Xtable) <- c("intercept", control$variables, "n")
Xcat0 <- Xtable[,colnames(Xtable)!='n']
Xcat <- Xcat0[,-1]
counts <- Xtable$n
for(site_i in control$sites[-1]){
KSiteAD <- pdaGet(paste0(control$sites[1],'_initialize'),config)
SXY.intercept <- c(SXY.intercept,KSiteAD$SXY[1])
SXY.cov <- SXY.cov+KSiteAD$SXY[-1]
Xtable <- as.data.frame(matrix(unlist(KSiteAD$Xtable), ncol = (length(control$variables) + 2)))
colnames(Xtable) <- c("intercept", control$variables, "n")
Xcat0 <- Xtable[,colnames(Xtable)!='n']
Xcat_tmp <- Xcat0[,-1]
Xcat <- rbind(Xcat,Xcat_tmp)
counts <- c(counts, Xtable$n)
}
SXY <- c(SXY.intercept,SXY.cov)
SiteID <- rep(1:K, each = nrow(Xcat0))
new.siteID <- sapply(1:K,function(i) ifelse(SiteID==i,1,0))
colnames(new.siteID) <- paste0("Site", 1:K)
Xcat <- cbind(new.siteID,Xcat)
Xcat <- as.matrix(Xcat)
}
logLik_AD <- function(beta){
-(sum(SXY*beta)-sum(log(1+exp(Xcat%*%c(beta)))*counts))/sum(counts)
}
fit.AD <- optim(par = rep(0, ncol(Xcat)), logLik_AD, method = "BFGS")
se <- sqrt(diag(solve(hessian(func = function(x) logLik_AD(x)*sum(counts), x = fit.AD$par))))
res <- data.frame(est = fit.AD$par, se = se)
rownames(res) <- colnames(Xcat)
control$final_output = res
}else if(control$model == "OLGLMM"){
for(site_i in control$site[1]){
AD = pdaGet(paste0(site_i,'_initialize'),config)
Xtable <- AD$Xtable
SY <- Xtable$SY
count <- Xtable$n
new_Xtable <- Xtable[,1:(1+length(control$variables))]
new_X <- new_Xtable[rep(seq_len(nrow(new_Xtable)), times = count), ]
generate_Y <- function(Y_count, overall_count){
sub_Y <- rep(c(1,0), times = c(Y_count, overall_count- Y_count))
}
new_Y <- unlist(mapply(generate_Y,SY,overall_count = count))
output_0 = cbind(new_Y, new_X)
colnames(output_0) = c(control$outcome,"intercept",control$variables)
output_0$site <- site_i
}
for(site_i in control$sites[-1]){
AD = pdaGet(paste0(site_i,'_initialize'),config)
Xtable <- AD$Xtable
SY <- Xtable$SY
count <- Xtable$n
new_Xtable <- Xtable[,1:(1+length(control$variables))]
new_X <- new_Xtable[rep(seq_len(nrow(new_Xtable)), times = count), ]
generate_Y <- function(Y_count, overall_count){
sub_Y <- rep(c(1,0), times = c(Y_count, overall_count- Y_count))
}
new_Y <- unlist(mapply(generate_Y,SY,overall_count = count))
output = cbind(new_Y, new_X)
colnames(output) = c(control$outcome,"intercept",control$variables)
output$site <- site_i
output_0 <- rbind(output_0, output)
}
fit.pool.recons <- MASS::glmmPQL(as.formula(paste(control$outcome, paste(control$variables, collapse = "+"), sep = '~')),
~1|site,
data = output_0,
family='binomial')
control$est <- fit.pool.recons$coefficients
control$varFix <- fit.pool.recons$varFix
control$rand_se <- fit.pool.recons$sigma
}else{
if(control$lead_site %in% control$sites){
bhat <-init_i$bhat_i
vbhat <- init_i$Vhat_i
if(control$model == "ODACATH"){
bhat_eta = init_i$bhat_eta_i
}
for(site_i in control$sites){
if(site_i!=control$lead_site){
init_i <- pdaGet(paste0(site_i,'_initialize'),config)
bhat = rbind(bhat, init_i$bhat_i)
vbhat = rbind(vbhat, init_i$Vhat_i)
if (control$model == "ODACATH"){
bhat_eta = rbind(bhat_eta, init_i$bhat_eta_i)
}
}
}
}else{ # all other ODAX..
init_i = pdaGet(paste0(control$sites[1],'_initialize'),config)
bhat <-init_i$bhat_i
vbhat <- init_i$Vhat_i
for(site_i in control$sites[-1]){
init_i <- pdaGet(paste0(site_i,'_initialize'),config)
bhat = rbind(bhat, init_i$bhat_i)
vbhat = rbind(vbhat, init_i$Vhat_i)
}
}
site_size = c()
for(site_i in control$sites){
init_i <- pdaGet(paste0(site_i,'_initialize'),config)
site_size <- c(site_size, init_i$site_size)
}
# print(site_size)
## estimate for pda init: meta, or median, or lead est?...
if(control$init_method == 'meta'){
binit = apply(bhat/vbhat,2,function(x){sum(x, na.rm = TRUE)})/apply(1/vbhat,2,function(x){sum(x, na.rm = TRUE)})
# vinit = 1/apply(1/vbhat,2,function(x){sum(x, na.rm = TRUE)})
mymessage('meta (inv var weighted avg) as initial est:')
} else if(control$init_method == 'median'){
binit = apply(bhat, 2, median, na.rm=T)
mymessage('median as initial est:')
} else if(control$init_method == 'weighted.median'){
binit = apply(bhat, 2, function(x) weighted.median(x, site_size))
mymessage('median (site size weighted) as initial est:')
} else if(control$init_method == 'lead'){
binit = bhat[control$sites==control$lead_site,]
mymessage('lead site est as initial est:')
} #print(res)
control$beta_init = binit
if(any(is.na(control$beta_init))){
control$beta_init[is.na(control$beta_init)] = 0
warning('some coefs are all NAs and use 0 as init est, use caution and check your X variables!!!')
}
if (control$model == "ODACATH"){
control$bhat_eta = bhat_eta
}
}
mes <- 'beta_init added, step=2 (derivatives)! \n'
}
if(control$step=='derive'){
if(control$model == "dGEM"){
# get b_meta as initial bbar
ghat <- c()
vghat <- c()
hosdata <- c()
for(site_i in control$sites){
i = 1
init_i <- pdaGet(paste0(site_i,'_derive'),config)
ghat = rbind(ghat, init_i$gammahat_i)
vghat = rbind(vghat, init_i$Vgammahat_i)
hosdata = rbind(hosdata, init_i$hosdata)
i = i + 1
}
# meta-regression
colnames(hosdata) = control$variables_site_level
formula <- as.formula(paste("", paste(control$variables_site_level, collapse = "+"), sep = '~'))
gamma_meta_reg_new = rma.uni(ghat, vghat, mods = formula, data = hosdata)
gamma_BLUP <- blup(gamma_meta_reg_new)$pred
control$estimated_hospital_effect = gamma_BLUP
}else if(control$model == "OLGLM"){
mymessage("You are done!")
}
## robust var est for ODACH_CC est (might also for other ODAX later?)
if (control$model == "ODACH_CC"){
px = length(control$beta_init)
K = length(control$sites)
S_i_sum = array(NA,c(px,px, K))
for(i in 1:K){
site_i = control$sites[i]
init_i <- pdaGet(paste0(site_i,'_derive'),config)
S_i_sum[,,i] <- init_i$S_i
}
control$S_i_sum = apply(S_i_sum,c(1,2),sum,na.rm=T)
# print(control$S_i_sum)
}
}
}
if(control$step=='synthesize'){
if(control$model == "dGEM"){
control$final_event_rate = pdaGet(paste0(control$lead_site,'_synthesize'),config)$final_event_rate
}
}
# for DPQL, attach intermediate estimate to control after each round
if(control$model=='DPQL' & control$step==paste0('estimate_', control$round)){
est <- pdaGet(paste0(config$site_id,'_estimate_',control$round),config)
control$bhat <- est$bhat
control$uhat <- est$uhat
if(control$round<control$maxround) control$round <- control$round + 1
}
## update control with next step
steps = get(paste0(control$model,'.steps'))
current_index <- which(steps==control$step)
if(current_index < length(steps)) {
next_index <- current_index + 1
next_step <- steps[next_index]
mes <- paste0('next step=',next_index,' (',next_step,')! \n')
control$step = next_step
} else {
mes <- paste0('finished! \n')
control$step = NULL
}
mymessage(mes)
pdaPut(control,'control',config,upload_without_confirm,silent_message)
control
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.