knitr::opts_chunk$set(echo = TRUE, message = FALSE , warning = FALSE)
library(dplyr)
library(parallelMap)
library(mlr)
devtools::load_all()

Objectif

A partir d'un extrait en hourly de la database historique (2 ans), réaliser un benchrmark de plusieurs explorative constructions pour TSA, tant en daily qu'en hourly. Les données daily devront être calculées à partir des hourly.

Afin de réduire drastiquement les temps de calcul et de réaliser des visualisations graphiques interactives légères, il conviendra de ne pas réaliser les benchmarks d'EC sur l'ensemble des 2 ans de données mais sur plusieurs (5 par exemple) sous-échantillons tirés aléatoirements (suggestion faite par P. Bogaerts lors du CA d'avril 2019).

Initialisation du projet

Il convient de créer un folder dans lequel nous sauvegarderons les fichiers nécessaires à la conduction d'un benchmark. Ce folder servira de working directory. A noter qu'en travaillant depuis RStudio, le working directory se place automatiquement à la racine du folder considéré comme folder du projet. Dans le contexte de cet exemple nous le créerons au chemin suivant : ~/Rprojects/project_bmr_from_bash.

Un serveur RStudio dédié a été configuré et est disponible à l'adresse http://agromet.cra.wallonie.be:8787

Création des jeux de donnnées.

Charger l'extract hourly de la database

Ici nous récupérons un extract de la db au format json que nous chargeons dans un object R que nous appelerons dataset. Cet extract au format .json a été créer par JP et mis à disposition via FTP.

Le script suivant permet de récupérer les data via le FTP depuis R :

# get data from FTP and save it in the current WD - should work if the port is open
threadr::download_ftp_file(
  file_remote = "ftp://178.32.44.217/spCleandataSensorstsaForallFm2016-01-01To2017-12-31.json",
  file_output = "./data-raw/spCleandataSensorstsaForallFm2016-01-01To2017-12-31.json",
  credentials = paste0("agromet:",
    Sys.getenv("FTP_PASSWORD")),
  curl = FALSE, verbose = FALSE, progress = "none")

Ici supposons que le fichier .json a déjà été récupéré. Injectons-le maintenant dans une variable dataset_hourly grâce à notre fonction agrometeoR::makeDataset().

dataset_hourly = makeDataset(json = "../data-raw/extdata/AGROMET/spCleandataSensorstsaForallFm2016-01-01To2017-12-31.json", sensors = "tsa" )
dataset_hourly = dataset_hourly$output$value

A noter qu'il est également possible de réaliser un appel API spcleandata pour récupérer ces data. Cependant vu la taille du fichier à récupérer, cela résulterait en un timeout.

Visualisons un extrait d'un jeu de donnée d'une heure :

DT::datatable(dataset_hourly[[2]])

Création du jeu de données daily

Celui-ci doit évidemment être créé avant de réaliser l'échantillonage des subsets (sinon nous ne disposons pas de série de 24h continues nécessaires au calcul des informations journalières). Le script ci-dessous montre comment calculer les données daily sur base des hourly.

## create the daily datasets and tasks for Pameseb and Pameseb + IRm
summarize_by_day = function(l){

  split_tibble = function(tibble, column = 'col') {
  tibble %>% split(., .[,column]) %>% lapply(., function(x) x[,setdiff(names(x),column)])
  }

  dataset = l %>% purrr::map_df(.,c, .id = "datetime")
  a = dataset %>%
    dplyr::mutate_at("datetime", function(x){
      x = substr(x, start = 1, stop = 8)
    })

  b = a %>% group_by(datetime, sid) %>%
    summarise(
      tsa_min = min(tsa, na.rm = TRUE),
      tsa_max = max(tsa, na.rm = TRUE),
      tsa_mean = mean(tsa, na.rm = TRUE))

  b = b %>%
    dplyr::rename("mtime" = "datetime")

  b = b %>%
    dplyr::left_join(dplyr::select(stations.df, one_of(c("x", "y", "elevation", "sid"))), by = "sid")

  c = b %>% split_tibble("mtime")

  d = lapply(seq_along(c), function(x){
    mtime = names(c)[x]
    mtime = rep.int(mtime, nrow(c[[x]]))
    df = c[[x]] %>%
      dplyr::mutate(mtime = mtime)
    return(df)
  })

  names(d) = names(c)
  d = d %>% purrr::map(as.data.frame)
  return(d)
}

dataset_daily =  dataset_hourly %>% summarize_by_day()

Visualisons un extrait des data aggrégées en daily :

DT::datatable(dataset_daily[[2]])

Création des sous-échantillons aléatoires.

Réaliser un benchmark sur deux ans de données horaires est très couteux en terme de puissance et temps de calcul. On choisit donc de réaliser plusieurs sous-benchmarks sur 10 % des data tirées aléatoirement. Pour réduire les temps de calcul dans le contexte de cet exemple, nous ne prendrons que 0.1 % des données dans chacun de nos 5 sous-échantillons. Pour assurer la reproductibilité des benchmarks, il est nécessaire de réaliser les tirages aléatoires en se basant sur une "seed". Nous nous limiterons à l'exemple pour les data en hourly.

# definition of the function to create the subsamples
create_subset = function(seed, dataset, percentage){
  set.seed(seed)
dataset_sample = sample(
  x = dataset,
  size = as.integer(percentage*length(dataset)/100))
}

# creating the vector of the seeds that will be used to generate 5 subsets
seeds = c(2019, 2018, 2017, 2016, 2015)

# creating the 5 subsets of 0.1 % of the whole dataset (we should use 10 to respect 10 % but for the example we only take 0.1 %)
subsets_hourly = lapply(seeds, create_subset, dataset_hourly, 0.1)

Visualisons le premier jeu de données horaires du premier subset

DT::datatable(subsets_hourly[[1]][[1]])
# in your console, you can simply use :
subsets_hourly[[1]][[1]]

Création de doublons ne contenant que les data Pameseb

Comme nous souhaitons également évaluer l'apport de l'intégration des data IRM, nous allons dédoubler chacun de nos 5 sous échantillons en passant un filtre excluant les données des stations IRM pour chacun des groupes d'enregistrements horaires.

subsets_hourly_without_IRM = lapply(subsets_hourly, function(x){ # here each x correspond to one of our 5 subsets
  x_without_irm = x %>% purrr::map(., ~dplyr::filter(.,sid < 1000))
})

Visualisons le premier jeu de données horaires du premier subset filtré IRM

DT::datatable(subsets_hourly_without_IRM[[1]][[1]])
# in your console, you can use :
# subsets_hourly_without_IRM[[1]][[1]]

le filtre a bel et bien fonctionné, nous n'avons plus de stations avec des sid's au-delà de 1000.

Conclusion concernant la création de jeux de données

Nous disposons maintenant de deux jeux de 5 subsets tirés aléatoirement et contenant chacun 0.1 % des data originales (un Pameseb only et un Pameseb + IRM). Nous pouvons maintenant convertir chacun des subsets de ces deux grands ensembles en tâches de machine learning

Tâches de machine learning

Création des tâches

Pour chacun des 5 subsets de nos deux groupes Pameseb et Pameseb + IRM, nous pouvons créer nos tâches de machine learning.

# creation of the tasks for each of the subsets from Pameseb + IRM
tasks_hourly = lapply(subsets_hourly, function(x){ # each x correspond to one of our 5 subsets
  task = x %>% purrr::map(makeTask, target = "tsa")
})

# creatin of the tasks for each of the subsets from Pameseb
tasks_hourly_without_IRM = lapply(subsets_hourly_without_IRM, function(x){ # each x correspond to one of our 5 subsets
  task = x %>% purrr::map(makeTask, target = "tsa")
})

# extract the tasks from the outputs
tasks_hourly_data = lapply(tasks_hourly, function(x){ # each x correspond to one of our 5 subsets
  x = x %>% purrr::modify_depth(1, ~.$output$value$task)
}) 

# extract the tasks from the outputs
tasks_hourly_without_IRM_data = lapply(tasks_hourly_without_IRM, function(x){ # each x correspond to one of our 5 subsets
  x = x %>% purrr::modify_depth(1, ~.$output$value$task)
}) 

Visualisons la première tâche du premier de nos subsets :

tasks_hourly_data[[1]][[1]]

Et la même tâche excluant les data IRM :

tasks_hourly_without_IRM_data[[1]][[1]]

Nous voyons qu'en ayant passer un filter supprimant les data des stations IRM, nous passons de 43 à 27 observations.

Récupération des metadata concernant la création de tâches

La fonction agrometeor::makeTask() supprime automatiquement les observations pour lesquelles il y a une donnée manquante. A noter que c'est le backend de JP qui détermine les données manquantes (ex : TSa state qui n'est pas égal à 1).

Voici la liste des stations retenues pour la première tâche du premier subset pour le jeu de données Pameseb + IRM :

tasks_hourly[[1]][[1]]$output$stations$used

Et la même information pour le jeu de données excluant les stations IRM:

tasks_hourly_without_IRM[[1]][[1]]$output$stations$used

Implémentation des learners

Plusieurs learners viennent précompilés avec le package agrometeoR. Ceux-ci sont compilés dans le fichier ./data-raw.makeLearner.R du package.

Voici les learners implémentés :

data(agrometeorLearners)
learners = agrometeorLearners

Exécution du benchmark

Démarches préalables

Une fois que les tâches ont été créées, nous les sauvegardons sur disque, afin de pouvoir les ré-utiliser autant de fois que désiré sans avoir à les recompiler. Dans le contexte de notre exempe, nous nous limiterons seulement au benchmark des tâches contenant les data Pameseb + IRM stockées dans l'objet tasks_hourly_data. Voici le code à utiliser pour sauvegarder l'objet de tâches sur disque au niveau du folder ./outputs/tasks/ dans le fichier tasks_hourly_data.rds:

saveRDS(
  object = tasks_hourly_data,
  file = "./outputs/tasks/tasks_hourly_data.rds")

Pour réaliser le benchmark, il est préférable de travailler depuis un terminal.

La stratégie est la suivante. Il s'agit de créer un petit script R exécutable depuis le terminal (+ d'info par ici) qui va charger les fichiers .rds contenant les tâches et s'en servir pour créer le benchmark.

Ce script sera appelé depuis une session bash ouverte sur le serveur de calcul et lancée depuis une session tmux (afin d'éviter toute interruption du calcul en cas de coupure réseau par exemple).

Ci-dessous nous est présenté le script R exécutable depuis bash et permettant de réaliser un benchmark. Il nous suffit de le copier et de l'enregistrer dans notre working directory. Dans cet exemple nous l'appellerons execute_bmr_from_bash.R

Le script prend 4 paramètres. Dans l'ordre : Le chemin relatif aui wd du fichier contenant les tâches à soumettre : ./outputs/tasks/tasks_hourly_data.rds le nom que l'on souhaite donner au fichier d'output du benchmark : bmr_hourly_from_terminal (le point rds est ajouté automatiquement par le script ci-dessous) le chemin du dossier dans lequel l'on souhaite sauvegarder l'output final du benchmark. ./outputs/bmr/ (le dossier doit exister !) le chemin du dossier dans lequel l'on souhaite sauvegarder les fichiers temporaires. ./outputs/tempfiles/ (le dossier doit exister !)

A noter que la fonction agrometeoR::makeBmrsBatch() exige pour le moment que le folder dans lequel l'on souhaite enregistrer les fichiers temporaires de benchmark existe. i lfaut donc préalablement le créer.

#!/usr/bin/env Rscript

# Rscript ./script_execute_bmr.R ./data-created/tasksdataPamesebIrmDailyForBmrs.rds daily_tsa_PamesebIrm

# parsing the 4 args that must be passed to bash CLI. These args are then passed to our benchmarking function. 
args = commandArgs(trailingOnly = TRUE)
message(paste0("The current working directory is ", getwd()))
message("The demanded task file is ", args[1])
message("Your output file will be prefixed by : ", args[2])
message("Your output file will be stored into : ", args[3])
message("Your temporary files will be stored into : ", args[4])

# storing the 4 bash command arguments in variables
tasks.file = args[1]
output.name = args[2]
output.folder = args[3]
output.tempdir = args[4]

# test if the task file exist else return an error
stopifnot(file.exists(paste0(tasks.file)))

# load the required libraries to perform the benchmark
message("loading the required libs")
suppressMessages(library(parallelMap))
suppressMessages(library(mlr))
suppressMessages(library(agrometeoR))
suppressMessages(library(dplyr))

# load the data for the bmrs
message(paste0("Reading and loading the tasks file", tasks.file))
tasks = readRDS(tasks.file)

# perform the benchmarks
message("Starting to conduct the benchmarks...")
bmrsResult = makeBmrsBatch(
  tasks = tasks,
  learners = agrometeorLearners,
  measures = list(mlr::rmse, mlr::mae, mlr::mse),
  keep.pred = TRUE,
  models = TRUE, # necessary to check if specific learner has been able to train the model. See https://mlr.mlr-org.com/articles/tutorial/configureMlr.html
  groupSize = 100,
  level = "mlr.benchmark",
  resamplings = "LOO",
  cpus = 8,
  prefix = output.name,
  temp_dir = output.tempdir,
  removeTemp = FALSE
)

# save the bmrs result
message("Saving the result...")
saveRDS(
  object = bmrsResult,
  file = paste0(
   output.folder,# "./data-created/",
    output.name,
    as.character(Sys.Date()),
    "_bmrsResult.rds"))

# purge the memory
# rm(list = ls())

Connexion au serveur via tmux

Pour utiliser ce script depuis le terminal : lançons bash, connectons-nous en ssh au serveur de calcul (il nous faut une clef SSH évidemment) et démarrons une session tmux :

```{bash, eval = FALSE} ssh agromet tmux ls #list

tmux new-session -s bmr

cd Rprojects/currentProject/

tmux attach

chmod +x execute_bmr_from_bash.R # make the script exécutable

./execute_bmr_from_bash.R ./outputs/tasks/tasks_hourly_data.rds bmr_hourly_from_terminal ./outputs/bmr/ ./outputs/tempfiles # execution du bmr en terminal/

type ctrl+b et puis d => hide the plex

tmux attach to see it again

type exit to kill a plex

```



pokyah/agrometeoR documentation built on May 26, 2019, 7 p.m.