This notebook is adapted from John Snow Labs workshop Jupyter/Python tutorial "5.Text_Classification_with_ClassifierDL" (https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/5.Text_Classification_with_ClassifierDL.ipynb)

library(yardstick, warn.conflicts = FALSE)
library(purrr, warn.conflicts = FALSE)
library(sparklyr, warn.conflicts = FALSE)
library(sparknlp, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

version <- Sys.getenv("SPARK_VERSION", unset = "2.4.5")

config <- sparklyr::spark_config()
config$`sparklyr.shell.driver-memory` <- "8g"

options(sparklyr.sanitize.column.names.verbose = TRUE)
options(sparklyr.verbose = TRUE)
options(sparklyr.na.omit.verbose = TRUE)
options(sparklyr.na.action.verbose = TRUE)

sc <- sparklyr::spark_connect(master = "local", version = version, config = config)

cat("Apache Spark version: ", sc$home_version, "\n")
cat("Spark NLP version: ", nlp_version())

Load Dataset

train_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_train.csv")
test_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_test.csv")
trainDataset <- spark_read_csv(sc, train_data_file)

head(trainDataset, 20)
sdf_nrow(trainDataset)
trainDataset %>% 
  count(category)
testDataset <- spark_read_csv(sc, test_data_file)

testDataset %>% 
  count(category)
# if we want to split the dataset

#splitData <- sdf_random_split(trainDataset, training = 0.7, test = 0.3)
#trainingData <- splitData$training
#testData <- splitData$test

ClassifierDL with Word Embeddings and Text Preprocessing

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)

lemma <- nlp_lemmatizer_pretrained(sc, name = "lemma_antbnc", input_cols = c("cleanTokens"), output_col = "lemma")

glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "lemma"), output_col = "embeddings", case_sensitive = FALSE)

embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "embeddings"), output_col = "sentence_embeddings",
                                              pooling_strategy = "AVERAGE")

classifierdl <- nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class",
                                  label_col = "category", max_epochs = 3, enable_output_logs = TRUE)

clf_pipeline = ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  lemma,
  glove_embeddings,
  embeddingsSentence,
  classifierdl
)
# Train
system.time(
  clf_pipelineModel <- ml_fit(clf_pipeline, trainDataset)
)
system2("ls", c("-lt ~/annotator_logs/classifier_dl*"))

log_files <- Filter(function(s) grepl("classifier_dl", s), list.files("~/annotator_logs", full.names = TRUE))
log_file_dates <- file.info(log_files)$mtime
latest_log_file <- log_files[which.max(log_file_dates)]
system2("cat", latest_log_file)
# get the predictions on test set

preds <- ml_transform(clf_pipelineModel, testDataset)

preds %>% 
  mutate(class_result = to_json(class.result),
         description = substr(description, 1, 50)) %>% 
  select(category, description, class_result)
preds_summary <- preds %>% 
  mutate(class_result = explode(class.result)) %>% 
  select(category, class_result) %>% 
  collect()

cm <- conf_mat(preds_summary %>% 
                 mutate(category= factor(category),
                        class_result = factor(class_result)), 
               category, class_result)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

ClassifierDL with Universal Sentence Embeddings

document <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

use <- nlp_univ_sent_encoder_pretrained(sc, input_cols = c("document"), output_col = "sentence_embeddings")

classifierdl = nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class",
                                 label_col = "category", max_epochs = 5, enable_output_logs = TRUE)

use_clf_pipeline <- ml_pipeline(document, use, classifierdl)
use_pipelineModel <- ml_fit(use_clf_pipeline, trainDataset)
system2("ls", c("-lt ~/annotator_logs/classifier_dl*"))

log_files <- Filter(function(s) grepl("classifier_dl", s), list.files("~/annotator_logs", full.names = TRUE))
log_file_dates <- file.info(log_files)$mtime
latest_log_file <- log_files[which.max(log_file_dates)]
system2("cat", latest_log_file)

Getting prediction from Trained model

light_model <- nlp_light_pipeline(use_pipelineModel)
testDataset %>% select(description) %>% head(2)
text <- "Fearing the fate of Italy, the centre-right government has threatened to be merciless with those who flout tough restrictions.\n 
As of Wednesday it will also include all shops being closed across Greece, with the exception of supermarkets. Banks, pharmacies, pet-stores, mobile phone stores, opticians, bakers, mini-markets, couriers and food delivery outlets are among the few that will also be allowed to remain open."

result <- nlp_annotate(light_model, text)

unlist(result$class)

Saving the trained model

ml_stages(use_pipelineModel)
ml_save(ml_stages(use_pipelineModel)[[3]], "ClassifierDL_USE_20200407_e5", overwrite = TRUE)
classifierdlmodel <- ml_load(sc, "ClassifierDL_USE_20200407_e5")


r-spark/sparknlp documentation built on Oct. 15, 2022, 10:50 a.m.