knitr::opts_chunk$set(collapse = TRUE)

Create Test Datasets

To test the functionality we split the iris dataset into 3 pieces and train on these three subsets without sharing data in a distributed fashion. This is done by averaging gradient descent updates.

set.seed(314159)

df_train = iris[sample(nrow(iris)), ]
idx_test = sample(x = seq_len(nrow(df_train)), size = 0.1 * nrow(df_train))

df_test  = df_train[idx_test, ]
df_train = df_train[-idx_test, ]

splits = 3L
breaks = seq(1, nrow(df_train), length.out = splits + 1)

# Save single train sets:
files = character(splits)
for (i in seq_len(splits)) {
    files[i] = paste0("data/iris", i, ".csv")

    temp = df_train[breaks[i]:breaks[i + 1], ]
    write.csv(x = temp, file = files[i], row.names = FALSE)
}

# Save test set:
write.csv(x = df_test, file = "data/iris_test.csv")

The files object contains the different paths to the datasets which are required for the fitting process:

files

Run Ordinary Gradient Descent

First of all, load the package using devtools:

devtools::load_all()

To show what happens on one location we create a Model object to specify the model we want to fit:

# "Load" iris dataset and create data matrix. In the final algorithm, this is done by a formula:
X = cbind(Intercept = 1, Petal.Length = df_train[["Petal.Length"]], Sepal.Width = df_train[["Sepal.Width"]])

# Define target variable:
y = df_train[["Sepal.Length"]]

# Now we define a linear model as model for fitting:
lin_mod = LinearModel$new(X, y)

# With that model we can, i.e., calculate the MSE for a given parameter vector:
lin_mod$calculateMSE(rnorm(3))

In order to be able to compare our distributed Gradient Descent algorithm we fit a linear model on the whole dataset:

myformula = formula(Sepal.Length ~ Petal.Length + Sepal.Width)

mod_lm = lm(myformula, data = df_train)
summary(mod_lm)

Finally, we run the optGradientDescent() optimizer for 3000 epochs with a learning rate of 0.01 by starting at $\vec{0}$:

param_start = rep(0, ncol(X))
grad_desc_lm = optGradientDescent(lin_mod, param_start, 0.01, 3000, FALSE, FALSE)
str(grad_desc_lm)

knitr::kable(data.frame(lm = coef(mod_lm), grad.descent = grad_desc_lm$param_new, 
  diff = coef(mod_lm) - grad_desc_lm$param_new))

The nice thing about the implementation is that we can pass the last parameter and continue training for lets say 2000 iterations with a smaller learning rate:

param_start_next = grad_desc_lm$param_new
grad_desc_lm_next = optGradientDescent(lin_mod, param_start_next, 0.001, 2000, FALSE, FALSE)
str(grad_desc_lm)

knitr::kable(data.frame(lm = coef(mod_lm), grad.descent = grad_desc_lm_next$param_new, 
  diff = coef(mod_lm) - grad_desc_lm_next$param_new))

Run Distributed Gradient Descent

The technique used is to create a model for each subset and conduct one or more gradient descent steps. The idea is to save the current state of the actual model on the disk and reload that setting every time a new update is available:

  1. On server/location i do:

    1. Read i-th dataset
    2. Load settings or initialize them if not available:
      1. Define the model and the target variable and formula
      2. Load latest gradient descent update
    3. Conduct as many gradient descent steps as defined in advance
  2. Go ahead to server/location i+1 or make a final average of updates if $i$ was the last one and therefore run again on server/location 1.

Initialize model

For demonstration we use the three files created above on which we want to learn a (in this case) Linear Model. First of all, it is necessary to define all data paths of the data locations:

files = c("data/iris1.csv", "data/iris2.csv", "data/iris3.csv")

Now we initialize the model by setting the formula which is applied on each of the specified datasets, the model we want to train as character as well as the optimizer, the output directory of the single steps (each step can be stored if desired), and stuff like the epochs and learning rate:

myformula = formula(Sepal.Length ~ Petal.Length + Sepal.Width)

registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", 
  out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, 
  file_reader = read.csv, overwrite = TRUE)

We can have a look at the created registry and the model object:

load("train_files/registry.rds")
registry

load("train_files/model.rds")
model

This file is permanent and holds all necessary data to coordinate the fitting process of the training.

Now we train a the model by loading the files sequentially. What happens is that the program looks which files are available on the machine, grabs the files, and conduct one or more gradient steps. This step is stored as .rds file in the output directory (if overwrite = TRUE), otherwise it is deleted when it is not used anymore.

Finally, we can do an update by calling trainDistributedModel():

trainDistributedModel(regis = registry_dir)
trainDistributedModel(regis = registry_dir)

load("train_files/registry.rds")
registry

load("train_files/model.rds")
model

To reduce the costs of communication it is also possible to specify how much iterations should be done within once server:

trainDistributedModel(regis = registry_dir, epochs_at_once = 10L)
trainDistributedModel(regis = registry_dir, epochs_at_once = 10L)

load("train_files/registry.rds")
registry

load("train_files/model.rds")
model

In addition, the training of the model can also be done until the stopping criteria is hit. This is ether the maximal number of epochs or the relative improvement of the MSE (specified as mse_eps r registry[["mse_eps"]]). This can be checked by looking at model[["done"]]. This returns TRUE if it is done and FALSE if not:

while(! model[["done"]]) {
    trainDistributedModel(regis = registry_dir, silent = TRUE)
    load("train_files/model.rds")
}

We can have a final look on the estimated parameter by loading the model:

load("train_files/registry.rds")
registry[["actual_iteration"]]

load("train_files/model.rds")
model

knitr::kable(data.frame(lm = coef(mod_lm), actual.step = model[["beta"]], diff = coef(mod_lm) - model[["beta"]]))
unlink(registry_dir, recursive = TRUE)

Comparison of Different Epochs

registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", 
  out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, 
  file_reader = read.csv, overwrite = TRUE)

load("train_files/model.rds")

time = proc.time()
while(! model[["done"]]) {
  trainDistributedModel(regis = registry_dir, silent = TRUE)
  load("train_files/model.rds")
}
time_epochs_1 = proc.time() - time
beta_epochs_1 = model[["beta"]]

unlink(registry_dir, recursive = TRUE)
registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", 
  out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, 
  file_reader = read.csv, overwrite = TRUE)

load("train_files/model.rds")

time = proc.time()
while(! model[["done"]]) {
  trainDistributedModel(regis = registry_dir, silent = TRUE, epochs_at_once = 5L)
  load("train_files/model.rds")
}
time_epochs_5 = proc.time() - time
beta_epochs_5 = model[["beta"]]

unlink(registry_dir, recursive = TRUE)
registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", 
  out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, 
  file_reader = read.csv, overwrite = TRUE)

load("train_files/model.rds")

time = proc.time()
while(! model[["done"]]) {
  trainDistributedModel(regis = registry_dir, silent = TRUE, epochs_at_once = 10L)
  load("train_files/model.rds")
}
time_epochs_10 = proc.time() - time
beta_epochs_10 = model[["beta"]]

unlink(registry_dir, recursive = TRUE)

Estimated Parameter

knitr::kable(data.frame(lm = coef(mod_lm), beta_epochs_1, beta_epochs_5, beta_epochs_10))

Fitting Time

| Iterations Per Epoch | Elapsed Time | | -------------------- | --------------------- | | 1 | r time_epochs_1[3] | | 5 | r time_epochs_5[3] | | 10 | r time_epochs_10[3] |



schalkdaniel/distributed_lm documentation built on May 27, 2019, 3:32 p.m.