knitr::opts_chunk$set(collapse = TRUE)
To test the functionality we split the iris
dataset into 3 pieces and train on these three subsets without sharing data in a distributed fashion. This is done by averaging gradient descent updates.
set.seed(314159) df_train = iris[sample(nrow(iris)), ] idx_test = sample(x = seq_len(nrow(df_train)), size = 0.1 * nrow(df_train)) df_test = df_train[idx_test, ] df_train = df_train[-idx_test, ] splits = 3L breaks = seq(1, nrow(df_train), length.out = splits + 1) # Save single train sets: files = character(splits) for (i in seq_len(splits)) { files[i] = paste0("data/iris", i, ".csv") temp = df_train[breaks[i]:breaks[i + 1], ] write.csv(x = temp, file = files[i], row.names = FALSE) } # Save test set: write.csv(x = df_test, file = "data/iris_test.csv")
The files
object contains the different paths to the datasets which are required for the fitting process:
files
First of all, load the package using devtools
:
devtools::load_all()
To show what happens on one location we create a Model
object to specify the model we want to fit:
# "Load" iris dataset and create data matrix. In the final algorithm, this is done by a formula: X = cbind(Intercept = 1, Petal.Length = df_train[["Petal.Length"]], Sepal.Width = df_train[["Sepal.Width"]]) # Define target variable: y = df_train[["Sepal.Length"]] # Now we define a linear model as model for fitting: lin_mod = LinearModel$new(X, y) # With that model we can, i.e., calculate the MSE for a given parameter vector: lin_mod$calculateMSE(rnorm(3))
In order to be able to compare our distributed Gradient Descent algorithm we fit a linear model on the whole dataset:
myformula = formula(Sepal.Length ~ Petal.Length + Sepal.Width) mod_lm = lm(myformula, data = df_train) summary(mod_lm)
Finally, we run the optGradientDescent()
optimizer for 3000 epochs with a learning rate of 0.01 by starting at $\vec{0}$:
param_start = rep(0, ncol(X)) grad_desc_lm = optGradientDescent(lin_mod, param_start, 0.01, 3000, FALSE, FALSE) str(grad_desc_lm) knitr::kable(data.frame(lm = coef(mod_lm), grad.descent = grad_desc_lm$param_new, diff = coef(mod_lm) - grad_desc_lm$param_new))
The nice thing about the implementation is that we can pass the last parameter and continue training for lets say 2000 iterations with a smaller learning rate:
param_start_next = grad_desc_lm$param_new grad_desc_lm_next = optGradientDescent(lin_mod, param_start_next, 0.001, 2000, FALSE, FALSE) str(grad_desc_lm) knitr::kable(data.frame(lm = coef(mod_lm), grad.descent = grad_desc_lm_next$param_new, diff = coef(mod_lm) - grad_desc_lm_next$param_new))
The technique used is to create a model for each subset and conduct one or more gradient descent steps. The idea is to save the current state of the actual model on the disk and reload that setting every time a new update is available:
On server/location i do:
Go ahead to server/location i+1 or make a final average of updates if $i$ was the last one and therefore run again on server/location 1.
For demonstration we use the three files created above on which we want to learn a (in this case) Linear Model. First of all, it is necessary to define all data paths of the data locations:
files = c("data/iris1.csv", "data/iris2.csv", "data/iris3.csv")
Now we initialize the model by setting the formula which is applied on each of the specified datasets, the model we want to train as character as well as the optimizer, the output directory of the single steps (each step can be stored if desired), and stuff like the epochs and learning rate:
myformula = formula(Sepal.Length ~ Petal.Length + Sepal.Width) registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, file_reader = read.csv, overwrite = TRUE)
We can have a look at the created registry and the model object:
load("train_files/registry.rds") registry load("train_files/model.rds") model
This file is permanent and holds all necessary data to coordinate the fitting process of the training.
Now we train a the model by loading the files sequentially. What happens is that the program looks which files are available on the machine, grabs the files, and conduct one or more gradient steps. This step is stored as .rds
file in the output directory (if overwrite = TRUE
), otherwise it is deleted when it is not used anymore.
Finally, we can do an update by calling trainDistributedModel()
:
trainDistributedModel(regis = registry_dir) trainDistributedModel(regis = registry_dir) load("train_files/registry.rds") registry load("train_files/model.rds") model
To reduce the costs of communication it is also possible to specify how much iterations should be done within once server:
trainDistributedModel(regis = registry_dir, epochs_at_once = 10L) trainDistributedModel(regis = registry_dir, epochs_at_once = 10L) load("train_files/registry.rds") registry load("train_files/model.rds") model
In addition, the training of the model can also be done until the stopping criteria is hit. This is ether the maximal number of epochs or the relative improvement of the MSE (specified as mse_eps
r registry[["mse_eps"]]
). This can be checked by looking at model[["done"]]
. This returns TRUE
if it is done and FALSE
if not:
while(! model[["done"]]) { trainDistributedModel(regis = registry_dir, silent = TRUE) load("train_files/model.rds") }
We can have a final look on the estimated parameter by loading the model:
load("train_files/registry.rds") registry[["actual_iteration"]] load("train_files/model.rds") model knitr::kable(data.frame(lm = coef(mod_lm), actual.step = model[["beta"]], diff = coef(mod_lm) - model[["beta"]]))
unlink(registry_dir, recursive = TRUE)
registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, file_reader = read.csv, overwrite = TRUE) load("train_files/model.rds") time = proc.time() while(! model[["done"]]) { trainDistributedModel(regis = registry_dir, silent = TRUE) load("train_files/model.rds") } time_epochs_1 = proc.time() - time beta_epochs_1 = model[["beta"]] unlink(registry_dir, recursive = TRUE)
registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, file_reader = read.csv, overwrite = TRUE) load("train_files/model.rds") time = proc.time() while(! model[["done"]]) { trainDistributedModel(regis = registry_dir, silent = TRUE, epochs_at_once = 5L) load("train_files/model.rds") } time_epochs_5 = proc.time() - time beta_epochs_5 = model[["beta"]] unlink(registry_dir, recursive = TRUE)
registry_dir = initializeDistributedModel(formula = myformula, model = "LinearModel", optimizer = "optGradientDescent", out_dir = getwd(), files = files, epochs = 30000L, learning_rate = 0.01, mse_eps = 1e-10, file_reader = read.csv, overwrite = TRUE) load("train_files/model.rds") time = proc.time() while(! model[["done"]]) { trainDistributedModel(regis = registry_dir, silent = TRUE, epochs_at_once = 10L) load("train_files/model.rds") } time_epochs_10 = proc.time() - time beta_epochs_10 = model[["beta"]] unlink(registry_dir, recursive = TRUE)
knitr::kable(data.frame(lm = coef(mod_lm), beta_epochs_1, beta_epochs_5, beta_epochs_10))
| Iterations Per Epoch | Elapsed Time |
| -------------------- | --------------------- |
| 1 | r time_epochs_1[3]
|
| 5 | r time_epochs_5[3]
|
| 10 | r time_epochs_10[3]
|
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.