This notebook is adapted from John Snow Labs workshop Jupyter/Python tutorial "5.1_Text_classification_examples_in_SparkML_SparkNLP" (https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/5.1_Text_classification_examples_in_SparkML_SparkNLP.ipynb)

library(yardstick, warn.conflicts = FALSE)
library(purrr, warn.conflicts = FALSE)
library(sparklyr, warn.conflicts = FALSE)
library(sparknlp, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

version <- Sys.getenv("SPARK_VERSION", unset = "2.4.5")

config <- sparklyr::spark_config()
config$`sparklyr.shell.driver-memory` <- "8g"

options(sparklyr.sanitize.column.names.verbose = TRUE)
options(sparklyr.verbose = TRUE)
options(sparklyr.na.omit.verbose = TRUE)
options(sparklyr.na.action.verbose = TRUE)

sc <- sparklyr::spark_connect(master = "local", version = version, config = config)

cat("Apache Spark version: ", sc$home_version, "\n")
cat("Spark NLP version: ", nlp_version())
training_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_train.csv")
test_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_test.csv")
newsDF <- spark_read_csv(sc, training_data_file)

newsDF %>% 
  mutate(description = substr(description, 1, 50)) %>% 
  head(20)
newsDF %>% 
  count(category, sort = TRUE)

Building Classification Pipeline

LogReg with CountVectorizer

Tokenizer: Tokenization

stopwordsRemover: Remove Stop Words

countVectors: Count vectors ("document-term vectors")

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)

stemmer <- nlp_stemmer(sc, input_cols = c("cleanTokens"), output_col = "stem")

finisher <- nlp_finisher(sc, input_cols = c("stem"), output_col = "token_features", output_as_array = TRUE, clean_annotations = FALSE)

countVectors <- ft_count_vectorizer(sc, input_col = "token_features", output_col = "features", vocab_size = 10000, min_df = 5)

label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label")

nlp_pipeline = ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  stemmer,
  finisher,
  countVectors,
  label_stringIdx
)

nlp_model <- ml_fit(nlp_pipeline, newsDF)

processed <- ml_transform(nlp_model, newsDF)

sdf_nrow(processed)
processed %>% 
  mutate(description = substr(description, 1, 50),
         token_features = substr(to_json(token_features), 1, 50)) %>% 
  select(description, token_features) %>% 
  head(20)
processed %>% 
  mutate(token_features = to_json(token_features)) %>% 
  select(token_features) %>% 
  head(2)
processed %>% 
  select(features) %>% 
  head(2)
processed %>% 
  mutate(description = substr(description, 1, 50)) %>% 
  select(description, features, label) %>% 
  head(20)
splits <- sdf_random_split(processed, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test

print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
sdf_schema(trainingData)
lr <- ml_logistic_regression(sc, max_iter = 10, reg_param = 0.3, elastic_net_param = 0)

lrModel <- ml_fit(lr, trainingData)

predictions <- ml_transform(lrModel, testData)

predictions %>% 
  filter(prediction == 0) %>% 
  mutate(description = substr(description, 1, 30)) %>% 
  select(description, category, probability, label, prediction) %>% 
  head(10)
evaluator <- ml_multiclass_classification_evaluator(sc, prediction_col = "prediction")
ml_evaluate(evaluator, predictions)
y_true <- select(predictions, label) %>% collect()

y_pred <- select(predictions, prediction) %>% collect()
y_pred %>% 
  count(prediction) 
cnf_matrix <- yardstick::conf_mat(bind_cols(y_true, y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)),
                                  label, prediction)
cnf_matrix
summary(cnf_matrix)

LogReg with TFIDF

hashingTF <- ft_hashing_tf(sc, input_col = "token_features", output_col = "rawFeatures", num_features = 10000)

idf <- ft_idf(sc, input_col = "rawFeatures", output_col = "features", min_doc_freq = 5) #min_doc_freq: remove sparse terms

nlp_pipeline <- ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  stemmer,
  finisher,
  hashingTF,
  idf,
  label_stringIdx
)

nlp_model_tf <- ml_fit(nlp_pipeline, newsDF)
processed_tf <- ml_transform(nlp_model_tf, newsDF)

sdf_nrow(processed_tf)
processed %>% 
  mutate(description = substr(description, 1, 20)) %>% 
  select(description, features, label) %>% 
  head(20)
splits <- sdf_random_split(processed_tf, training = 0.7, test = 0.3)
trainingData <- splits$training
testData <- splits$test

print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lrModel_tf <- ml_fit(lr, trainingData)

predictions_tf <- ml_predict(lrModel_tf, testData)

# Cache the predictions data frame so that it doesn't get recomputed over and over
predictions_tf <- sdf_register(predictions_tf, "predictions_tf")
tbl_cache(sc, "predictions_tf")

predictions_tf %>% 
  mutate(probability_str = substr(string(probability), 1, 30),
         description = substr(description, 1, 30)) %>% 
  arrange(desc(probability)) %>% 
  select(description, category, probability = probability_str, label, prediction) %>% 
  head(20)
y_true <- predictions_tf %>% 
  select(label) %>% 
  collect()

y_pred <- predictions_tf %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

Random Forest with TFIDF

rf <- ml_random_forest_classifier(sc,
                                  label_col = "label", 
                                  features_col = "features", 
                                  num_trees = 100, 
                                  max_depth = 4, 
                                  max_bins = 32)

rfModel <- ml_fit(rf, trainingData)
predictions_rf <- ml_transform(rfModel, testData)

# Cache the predictions data frame so that it doesn't get recomputed over and over
predictions_tf <- sdf_register(predictions_tf, "predictions_tf")
tbl_cache(sc, "predictions_tf")
predictions_tf %>% 
  mutate(probability_str = substr(string(probability), 1, 30),
         description = substr(description, 1, 30)) %>% 
  arrange(desc(probability)) %>% 
  select(description, category, probability = probability_str, label, prediction) %>% 
  head(20)
y_true <- predictions_tf %>% 
  select(label) %>% 
  collect()

y_pred <- predictions_tf %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

LogReg with Spark NLP Glove Word Embeddings

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)

glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "cleanTokens"), output_col = "embeddings",
                                                   case_sensitive = FALSE)

embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "embeddings"), output_col = "sentence_embeddings",
                                             pooling_strategy = "AVERAGE")

embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"),
                                               output_as_vector = TRUE, clean_annotations = FALSE)

explodeVectors <- ft_sql_transformer(sc, "SELECT EXPLODE(finished_sentence_embeddings) AS features, * FROM __THIS__")

label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label")

nlp_pipeline_w2v <- ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  glove_embeddings,
  embeddingsSentence,
  embeddings_finisher,
  explodeVectors,
  label_stringIdx
)

nlp_model_w2v <- ml_fit(nlp_pipeline_w2v, newsDF)

processed_w2v <- ml_transform(nlp_model_w2v, newsDF)

sdf_nrow(processed_w2v)
processed_w2v %>% select(features) %>% head(1)
processed_w2v %>% 
  mutate(description = substr(description, 1, 30)) %>% 
  select(description, features, label) %>% 
  head(20)
# set seed for reproducibility
splits <- sdf_random_split(processed_w2v, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test
print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lrModel_w2v <- ml_fit(lr, trainingData)
predictions_w2v <- ml_transform(lrModel_w2v, testData)

predictions_tf %>% 
  mutate(probability_str = substr(string(probability), 1, 30),
         description = substr(description, 1, 30)) %>% 
  arrange(desc(probability)) %>% 
  select(description, category, probability = probability_str, label, prediction) %>% 
  head(20)
y_true <- predictions_w2v %>% 
  select(label) %>% 
  collect()

y_pred <- predictions_w2v %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")
processed_w2v %>% 
  mutate(description = substr(description, 1, 50),
         cleanTokens = substr(to_json(cleanTokens.result), 1, 50)) %>% 
  select(description, cleanTokens) %>% 
  head(20)

LogReg with Spark NLP Bert Embeddings

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)

bert_embeddings <- nlp_bert_embeddings_pretrained(sc, name = "bert_base_cased", input_cols = c("document", "cleanTokens"),
                                                  output_col = "bert", case_sensitive = FALSE, pooling_layer = 0)

embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "bert"), output_col = "sentence_embeddings",
                                             pooling_strategy = "AVERAGE")

embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"),
                                               output_as_vector = TRUE, clean_annotations = FALSE)

label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label")

nlp_pipeline_bert <- ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  bert_embeddings,
  embeddingsSentence,
  embeddings_finisher,
  label_stringIdx
)

nlp_model_bert <- ml_fit(nlp_pipeline_bert, newsDF)

processed_bert <- ml_transform(nlp_model_bert, newsDF)

sdf_nrow(processed_w2v)
processed_bert <- processed_bert %>% 
  mutate(features = explode(finished_sentence_embeddings))

processed_bert %>% 
  mutate(description = substr(description, 1, 30)) %>% 
  select(description, features, label) %>% 
  head(20)
# set seed for reproducibility
splits <- sdf_random_split(processed_bert, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test
print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0)

lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData)

predictions %>% 
  mutate(probability_str = substr(string(probability), 1, 30),
         description = substr(description, 1, 30)) %>% 
  arrange(desc(probability)) %>% 
  select(description, category, probability = probability_str, label, prediction) %>% 
  head(20)
y_true <- predictions %>% 
  select(label) %>% 
  collect()

y_pred <- predictions %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

LogReg with ELMO Embeddings

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)

elmo_embeddings <- nlp_elmo_embeddings_pretrained(sc, input_cols = c("document", "cleanTokens"),
                                                  output_col = "elmo", pooling_layer = "word_emb")

embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "elmo"), output_col = "sentence_embeddings",
                                             pooling_strategy = "AVERAGE")

embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"),
                                               output_as_vector = TRUE, clean_annotations = FALSE)

label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label")

nlp_pipeline_elmo <- ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  elmo_embeddings,
  embeddingsSentence,
  embeddings_finisher,
  label_stringIdx
)

nlp_model_elmo <- ml_fit(nlp_pipeline_elmo, newsDF)

processed_elmo <- ml_transform(nlp_model_elmo, newsDF)

sdf_nrow(processed_w2v)
processed_elmo <- processed_elmo %>% 
  mutate(features = explode(finished_sentence_embeddings))

processed_elmo %>% 
  mutate(description = substr(description, 1, 30)) %>% 
  select(description, features, label) %>% 
  head(20)
# set seed for reproducibility
splits <- sdf_random_split(processed_elmo, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test
print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0)

lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData)



predictions %>% 
  mutate(probability_str = substr(string(probability), 1, 30),
         description = substr(description, 1, 30)) %>% 
  arrange(desc(probability)) %>% 
  select(description, category, probability = probability_str, label, prediction) %>% 
  head(20)
y_true <- predictions %>% 
  select(label) %>% 
  collect()

y_pred <- predictions %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

LogReg with Universal Sentence Encoder

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

useEmbeddings <- nlp_univ_sent_encoder_pretrained(sc, input_cols = c("document"), output_col = "use_embeddings")

embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("use_embeddings"), output_cols = c("finished_use_embeddings"),
                                               output_as_vector = TRUE, clean_annotations = FALSE)

label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label")

use_pipeline <- ml_pipeline(
  document_assembler,
  useEmbeddings,
  embeddings_finisher,
  label_stringIdx
)
use_df <- ml_fit_and_transform(use_pipeline, newsDF)
use_df %>%
  mutate(finished_use_embeddings = substr(to_json(finished_use_embeddings), 1, 30)) %>% 
  select(finished_use_embeddings) %>% 
  head(3)
use_df <- use_df %>% 
  mutate(features = explode(finished_use_embeddings))

use_df %>% head(2)
# set seed for reproducibility
splits <- sdf_random_split(use_df, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test
print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0)

lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData)

predictions %>% 
  mutate(description = substr(description, 1, 30)) %>% 
  select(description, category, label, prediction) %>% 
  head(20)
y_true <- predictions %>% 
  select(label) %>% 
  collect()

y_pred <- predictions %>% 
  select(prediction) %>% 
  collect()

cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

train on entire dataset

lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0)

lrModel <- ml_fit(lr, use_df)
test_df <- spark_read_csv(sc, test_data_file)
test_df <- ml_fit_and_transform(use_pipeline, test_df)
test_df <- test_df %>% 
  mutate(features = explode(finished_use_embeddings))
test_df %>% head(2)
predictions <- ml_transform(lrModel, test_df)
df <- predictions %>% 
  select(description, category, label, prediction) %>% 
  collect()
df <- df %>% 
  mutate(label = case_when(category == "World" ~ 2.0,
                           category == "Sports" ~ 3.0,
                           category == "Business" ~ 0.0,
                           category == "Sci/Tech" ~ 1.0))
df %>%
  mutate(description = substr(description, 1, 20)) %>% 
  head(10)
cm <- conf_mat(df %>% 
                 mutate(label = factor(label),
                        prediction = factor(prediction)), 
               label, prediction)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

ClassifierDL

document <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

use <- ml_load(sc, "/home/davek/cache_pretrained/tfhub_use_en_2.4.4_2.4_1583158595769") %>% 
  nlp_set_input_cols(c("document")) %>% 
  nlp_set_output_col("sentence_embeddings")

classifierdl <- nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class",
                                  label_col = "category", max_epochs = 5, enable_output_logs = TRUE)

pipeline <- ml_pipeline(
  document,
  use,
  classifierdl
)
# set seed for reproducibility
splits <- sdf_random_split(newsDF, training = 0.7, test = 0.3, seed = 100)
trainingData <- splits$training
testData <- splits$test
print(paste0("Training Dataset Count: ", sdf_nrow(trainingData)))
print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
pipelineModel <- ml_fit(pipeline, trainingData)
df <- ml_transform(pipelineModel, testData) %>% 
  mutate(class_result = class.result) %>% 
  select(category, description, class_result) %>% 
  collect()

cm <- conf_mat(df %>% 
                 mutate(class_result = factor(map_chr(class_result, function(x) x[[1]])),
                        category = factor(category)), 
               category, class_result)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")

Classifer DL + Glove + Basic text processing

document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = "document", output_col = "token")

lemma <- nlp_lemmatizer_pretrained(sc, name = "lemma_antbnc", input_cols = c("token"), output_col = "lemma")

glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "lemma"), output_col = "embeddings",
                                                   case_sensitive = FALSE)

lemma_pipeline <- ml_pipeline(
  document_assembler,
  tokenizer,
  lemma,
  glove_embeddings
)
ml_fit_and_transform(lemma_pipeline, head(trainingData, 1000)) %>% 
  head(30)
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document")

tokenizer <- nlp_tokenizer(sc, input_cols = "document", output_col = "token")

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized")

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens",
                                            case_sensitive = FALSE)

lemma <- nlp_lemmatizer_pretrained(sc, name = "lemma_antbnc", input_cols = c("cleanTokens"), output_col = "lemma")

glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "lemma"), output_col = "embeddings",
                                                   case_sensitive = FALSE)

embeddings_sentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "embeddings"), output_col = "sentence_embeddings",
                                               pooling_strategy = "AVERAGE")

classifierdl <- nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class",
                                  label_col = "category", max_epochs = 10, enable_output_logs = TRUE)

clf_pipeline <- ml_pipeline(
  document_assembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  lemma,
  glove_embeddings,
  embeddings_sentence,
  classifierdl
)
clf_pipelineModel <- ml_fit(clf_pipeline, trainingData)
df <- ml_transform(clf_pipelineModel, testData) %>% 
  mutate(class_result = class.result) %>% 
  select(category, description, class_result) %>% 
  collect()

cm <- conf_mat(df %>% 
                 mutate(class_result = factor(map_chr(class_result, function(x) x[[1]])),
                        category = factor(category)), 
               category, class_result)

cm
summary(cm)
ggplot2::autoplot(cm, type = "heatmap")


r-spark/sparknlp documentation built on Oct. 15, 2022, 10:50 a.m.