R/partition_data.R

Defines functions partition_data

Documented in partition_data

#' Partition Data Into Shards
#'
#' @description
#' A function to partition data into s shards for use in distributed estimation.
#' 
#' @usage partition_data(Data, s)
#' 
#' 
#' @param Data A list of containing either 'regdata' or 'lgtdata' and 'Z'(optional). If 'Data' contains 'lgtdata', it should also contain 'p' number of choice alternatives.
#' @param s The number of shards to partition the data into.
#' 
#' @return
#' A list of 's' shards where each shard contains:
#'
#' \item{p}{(integer) - Number of choice alternatives (only if 'Data' contains 'lgtdata')}
#' \item{lgtdata or regdata}{(list, length: n) - A list of n elements where each element contains 'X', 'y', 'beta', and 'tau'}
#' \item{Z}{(Matrix) - A n x nz matrix of units chars. Null if 'Data' does not contain Z [Optional]}
#' 
#' @author Federico Bumbaca, Leeds School of Business, University of Colorado Boulder, \email{federico.bumbaca@colorado.edu}
#' @references Bumbaca, F. (Rico), Misra, S., & Rossi, P. E. (2020). Scalable Target Marketing: Distributed Markov Chain Monte Carlo for Bayesian Hierarchical Models. Journal of Marketing Research, 57(6), 999-1018.
#' 
#' @examples
#' 
#' # Generate hierarchical linear data
#' R=1000 #number of draws
#' nreg=2000 #number of observational units
#' nobs=5 #number of observations per unit
#' nvar=3 #columns
#' nz=2
#' 
#' Z=matrix(runif(nreg*nz),ncol=nz) 
#' Z=t(t(Z)-apply(Z,2,mean))
#' Delta=matrix(c(1,-1,2,0,1,0), ncol = nz) 
#' tau0=.1
#' iota=c(rep(1,nobs)) 
#' 
#' ## create arguments for rmixture
#' tcomps=NULL
#' a = diag(1, nrow=3)
#' tcomps[[1]] = list(mu=c(-5,0,0),rooti=a) 
#' tcomps[[2]] = list(mu=c(5, -5, 2),rooti=a)
#' tcomps[[3]] = list(mu=c(5,5,-2),rooti=a)
#' tpvec = c(.33,.33,.34)                               
#' ncomp=length(tcomps)
#' regdata=NULL
#' betas=matrix(double(nreg*nvar),ncol=nvar) 
#' tind=double(nreg) 
#' for (reg in 1:nreg) { 
#'   tempout=bayesm::rmixture(1,tpvec,tcomps)
#'   if (is.null(Z)){
#'     betas[reg,]= as.vector(tempout$x)  
#'   }else{
#'     betas[reg,]=Delta%*%Z[reg,]+as.vector(tempout$x)} 
#'   tind[reg]=tempout$z
#'   X=cbind(iota,matrix(runif(nobs*(nvar-1)),ncol=(nvar-1))) 
#'   tau=tau0*runif(1,min=0.5,max=1) 
#'   y=X%*%betas[reg,]+sqrt(tau)*rnorm(nobs)
#'   regdata[[reg]]=list(y=y,X=X,beta=betas[reg,],tau=tau) 
#' }
#' 
#' Prior1=list(ncomp=ncomp) 
#' keep=1
#' Mcmc1=list(R=R,keep=keep)
#' Data1=list(list(regdata=regdata,Z=Z))
#' 
#' length(Data1)
#' 
#' Data2 = partition_data(Data1, s = 3)
#' length(Data2)
#'
#' @rdname partition_data
#' @export

partition_data = function(Data, s){
  s_in = length(Data)
  
  if (!is.null(Data[[1]]$regdata)){ #if regdata is not null, run regdata version
    Z = NULL #initialize Z
    if (length(Data) != 1) #if data is already partitioned, combine into 1 shard 
    {
      regdata = NULL
      for (i in 1:s_in)
      {
        regdata = c(regdata, Data[[i]]$regdata) #combine regdata over shards
        Z = rbind(Z,Data[[i]]$Z) #combine Z over shards
      }
      if (is.null(Z))
      {
        Data = list(regdata=regdata) 
      } else{
        Data = list(regdata=regdata, Z=Z)
      } #combine the data
    }
    
    
    if (s_in == s) {
      #print message if data is already in desired shards (KN)
      #message("Data already partitioned into ", s, " shard(s). Returning Data.")
      return(Data)
    }
    if (s == 1){
      return(list(Data))
    }
    Data = Data[[1]] 
    Z = Data$Z
    regdata = Data$regdata
    n = length(regdata) 
    data_s = NULL
    size = rep(floor(n/s), s) 
    if (n != sum(size))
    {
      for (i in 1:(n-sum(size)))
      {
        size[i] = size[i] + 1
      }
    }
    index1 = 1
    for (i in 1:s)
    {
      index2 = index1 + size[i] - 1
      if(is.null(Z))
      {
        data_s[[i]] = list(regdata=regdata[index1:index2])
      }else{  
        data_s[[i]] = list(regdata=regdata[index1:index2], Z=Z[index1:index2,])
      }
      index1 = index2 + 1 
    }
    return(data_s)
  }
  if (!is.null(Data[[1]]$lgtdata)){ 
    Z = NULL
    
    if (length(Data) != 1) #if data is already partitioned, combine into 1 shard 
    {
      lgtdata = NULL
      for (i in 1:s_in)
      {
        lgtdata = c(lgtdata, Data[[i]]$lgtdata) 
        Z = rbind(Z,Data[[i]]$Z)
      }
      p = Data[[1]]$p
      if (is.null(Z))
      {
        Data = list(p = p, lgtdata=lgtdata) 
      } else{
        Data = list(p=p,lgtdata=lgtdata, Z=Z)
      } #combine the data
    }
    if (s_in == s) {
      #print message if data is already in desired shards (KN)
      #message("Data already partitioned into ", s, " shard(s). Returning Data.")
      return(Data)
    }
    if (s == 1){
      return(list(Data))
    }
    Data = Data[[1]] 
    p = Data$p
    Z = Data$Z
    lgtdata = Data$lgtdata
    n = length(lgtdata) 
    data_s = NULL
    size = rep(floor(n/s), s) 
    if (n != sum(size))
    {
      for (i in 1:(n-sum(size)))
      {
        size[i] = size[i] + 1
      }
    }
    index1 = 1
    for (i in 1:s)
    {
      index2 = index1 + size[i] - 1
      if(is.null(Z))
      {
        data_s[[i]] = list(p=p, lgtdata=lgtdata[index1:index2])
      }else{  
        data_s[[i]] = list(p=p, lgtdata=lgtdata[index1:index2], Z=Z[index1:index2,])
      }
      index1 = index2 + 1 
    }
    return(data_s)
  }
}

Try the scalablebayesm package in your browser

Any scripts or data that you put into this service are public.

scalablebayesm documentation built on April 3, 2025, 7:55 p.m.