This notebook is adapted from John Snow Labs workshop Jupyter/Python tutorial "5.1_Text_classification_examples_in_SparkML_SparkNLP" (https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/5.1_Text_classification_examples_in_SparkML_SparkNLP.ipynb)
library(yardstick, warn.conflicts = FALSE) library(purrr, warn.conflicts = FALSE) library(sparklyr, warn.conflicts = FALSE) library(sparknlp, warn.conflicts = FALSE) library(dplyr, warn.conflicts = FALSE) version <- Sys.getenv("SPARK_VERSION", unset = "2.4.5") config <- sparklyr::spark_config() config$`sparklyr.shell.driver-memory` <- "8g" options(sparklyr.sanitize.column.names.verbose = TRUE) options(sparklyr.verbose = TRUE) options(sparklyr.na.omit.verbose = TRUE) options(sparklyr.na.action.verbose = TRUE) sc <- sparklyr::spark_connect(master = "local", version = version, config = config) cat("Apache Spark version: ", sc$home_version, "\n") cat("Spark NLP version: ", nlp_version())
training_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_train.csv") test_data_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/data/news_category_test.csv")
newsDF <- spark_read_csv(sc, training_data_file) newsDF %>% mutate(description = substr(description, 1, 50)) %>% head(20)
newsDF %>% count(category, sort = TRUE)
Tokenizer: Tokenization
stopwordsRemover: Remove Stop Words
countVectors: Count vectors ("document-term vectors")
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token") normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized") stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE) stemmer <- nlp_stemmer(sc, input_cols = c("cleanTokens"), output_col = "stem") finisher <- nlp_finisher(sc, input_cols = c("stem"), output_col = "token_features", output_as_array = TRUE, clean_annotations = FALSE) countVectors <- ft_count_vectorizer(sc, input_col = "token_features", output_col = "features", vocab_size = 10000, min_df = 5) label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label") nlp_pipeline = ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher, countVectors, label_stringIdx ) nlp_model <- ml_fit(nlp_pipeline, newsDF) processed <- ml_transform(nlp_model, newsDF) sdf_nrow(processed)
processed %>% mutate(description = substr(description, 1, 50), token_features = substr(to_json(token_features), 1, 50)) %>% select(description, token_features) %>% head(20)
processed %>% mutate(token_features = to_json(token_features)) %>% select(token_features) %>% head(2)
processed %>% select(features) %>% head(2)
processed %>% mutate(description = substr(description, 1, 50)) %>% select(description, features, label) %>% head(20)
splits <- sdf_random_split(processed, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
sdf_schema(trainingData)
lr <- ml_logistic_regression(sc, max_iter = 10, reg_param = 0.3, elastic_net_param = 0) lrModel <- ml_fit(lr, trainingData) predictions <- ml_transform(lrModel, testData) predictions %>% filter(prediction == 0) %>% mutate(description = substr(description, 1, 30)) %>% select(description, category, probability, label, prediction) %>% head(10)
evaluator <- ml_multiclass_classification_evaluator(sc, prediction_col = "prediction") ml_evaluate(evaluator, predictions)
y_true <- select(predictions, label) %>% collect() y_pred <- select(predictions, prediction) %>% collect()
y_pred %>% count(prediction)
cnf_matrix <- yardstick::conf_mat(bind_cols(y_true, y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cnf_matrix summary(cnf_matrix)
hashingTF <- ft_hashing_tf(sc, input_col = "token_features", output_col = "rawFeatures", num_features = 10000) idf <- ft_idf(sc, input_col = "rawFeatures", output_col = "features", min_doc_freq = 5) #min_doc_freq: remove sparse terms nlp_pipeline <- ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher, hashingTF, idf, label_stringIdx ) nlp_model_tf <- ml_fit(nlp_pipeline, newsDF) processed_tf <- ml_transform(nlp_model_tf, newsDF) sdf_nrow(processed_tf)
processed %>% mutate(description = substr(description, 1, 20)) %>% select(description, features, label) %>% head(20)
splits <- sdf_random_split(processed_tf, training = 0.7, test = 0.3) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lrModel_tf <- ml_fit(lr, trainingData) predictions_tf <- ml_predict(lrModel_tf, testData) # Cache the predictions data frame so that it doesn't get recomputed over and over predictions_tf <- sdf_register(predictions_tf, "predictions_tf") tbl_cache(sc, "predictions_tf") predictions_tf %>% mutate(probability_str = substr(string(probability), 1, 30), description = substr(description, 1, 30)) %>% arrange(desc(probability)) %>% select(description, category, probability = probability_str, label, prediction) %>% head(20)
y_true <- predictions_tf %>% select(label) %>% collect() y_pred <- predictions_tf %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
rf <- ml_random_forest_classifier(sc, label_col = "label", features_col = "features", num_trees = 100, max_depth = 4, max_bins = 32) rfModel <- ml_fit(rf, trainingData) predictions_rf <- ml_transform(rfModel, testData) # Cache the predictions data frame so that it doesn't get recomputed over and over predictions_tf <- sdf_register(predictions_tf, "predictions_tf") tbl_cache(sc, "predictions_tf")
predictions_tf %>% mutate(probability_str = substr(string(probability), 1, 30), description = substr(description, 1, 30)) %>% arrange(desc(probability)) %>% select(description, category, probability = probability_str, label, prediction) %>% head(20)
y_true <- predictions_tf %>% select(label) %>% collect() y_pred <- predictions_tf %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token") normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized") stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE) glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "cleanTokens"), output_col = "embeddings", case_sensitive = FALSE) embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "embeddings"), output_col = "sentence_embeddings", pooling_strategy = "AVERAGE") embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"), output_as_vector = TRUE, clean_annotations = FALSE) explodeVectors <- ft_sql_transformer(sc, "SELECT EXPLODE(finished_sentence_embeddings) AS features, * FROM __THIS__") label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label") nlp_pipeline_w2v <- ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, glove_embeddings, embeddingsSentence, embeddings_finisher, explodeVectors, label_stringIdx ) nlp_model_w2v <- ml_fit(nlp_pipeline_w2v, newsDF) processed_w2v <- ml_transform(nlp_model_w2v, newsDF) sdf_nrow(processed_w2v)
processed_w2v %>% select(features) %>% head(1)
processed_w2v %>% mutate(description = substr(description, 1, 30)) %>% select(description, features, label) %>% head(20)
# set seed for reproducibility splits <- sdf_random_split(processed_w2v, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lrModel_w2v <- ml_fit(lr, trainingData)
predictions_w2v <- ml_transform(lrModel_w2v, testData) predictions_tf %>% mutate(probability_str = substr(string(probability), 1, 30), description = substr(description, 1, 30)) %>% arrange(desc(probability)) %>% select(description, category, probability = probability_str, label, prediction) %>% head(20)
y_true <- predictions_w2v %>% select(label) %>% collect() y_pred <- predictions_w2v %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
processed_w2v %>% mutate(description = substr(description, 1, 50), cleanTokens = substr(to_json(cleanTokens.result), 1, 50)) %>% select(description, cleanTokens) %>% head(20)
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token") normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized") stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE) bert_embeddings <- nlp_bert_embeddings_pretrained(sc, name = "bert_base_cased", input_cols = c("document", "cleanTokens"), output_col = "bert", case_sensitive = FALSE, pooling_layer = 0) embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "bert"), output_col = "sentence_embeddings", pooling_strategy = "AVERAGE") embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"), output_as_vector = TRUE, clean_annotations = FALSE) label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label") nlp_pipeline_bert <- ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, bert_embeddings, embeddingsSentence, embeddings_finisher, label_stringIdx ) nlp_model_bert <- ml_fit(nlp_pipeline_bert, newsDF) processed_bert <- ml_transform(nlp_model_bert, newsDF) sdf_nrow(processed_w2v)
processed_bert <- processed_bert %>% mutate(features = explode(finished_sentence_embeddings)) processed_bert %>% mutate(description = substr(description, 1, 30)) %>% select(description, features, label) %>% head(20)
# set seed for reproducibility splits <- sdf_random_split(processed_bert, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0) lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData) predictions %>% mutate(probability_str = substr(string(probability), 1, 30), description = substr(description, 1, 30)) %>% arrange(desc(probability)) %>% select(description, category, probability = probability_str, label, prediction) %>% head(20)
y_true <- predictions %>% select(label) %>% collect() y_pred <- predictions %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token") normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized") stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE) elmo_embeddings <- nlp_elmo_embeddings_pretrained(sc, input_cols = c("document", "cleanTokens"), output_col = "elmo", pooling_layer = "word_emb") embeddingsSentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "elmo"), output_col = "sentence_embeddings", pooling_strategy = "AVERAGE") embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("sentence_embeddings"), output_cols = c("finished_sentence_embeddings"), output_as_vector = TRUE, clean_annotations = FALSE) label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label") nlp_pipeline_elmo <- ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, elmo_embeddings, embeddingsSentence, embeddings_finisher, label_stringIdx ) nlp_model_elmo <- ml_fit(nlp_pipeline_elmo, newsDF) processed_elmo <- ml_transform(nlp_model_elmo, newsDF) sdf_nrow(processed_w2v)
processed_elmo <- processed_elmo %>% mutate(features = explode(finished_sentence_embeddings)) processed_elmo %>% mutate(description = substr(description, 1, 30)) %>% select(description, features, label) %>% head(20)
# set seed for reproducibility splits <- sdf_random_split(processed_elmo, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0) lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData) predictions %>% mutate(probability_str = substr(string(probability), 1, 30), description = substr(description, 1, 30)) %>% arrange(desc(probability)) %>% select(description, category, probability = probability_str, label, prediction) %>% head(20)
y_true <- predictions %>% select(label) %>% collect() y_pred <- predictions %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") useEmbeddings <- nlp_univ_sent_encoder_pretrained(sc, input_cols = c("document"), output_col = "use_embeddings") embeddings_finisher <- nlp_embeddings_finisher(sc, input_cols = c("use_embeddings"), output_cols = c("finished_use_embeddings"), output_as_vector = TRUE, clean_annotations = FALSE) label_stringIdx <- ft_string_indexer(sc, input_col = "category", output_col = "label") use_pipeline <- ml_pipeline( document_assembler, useEmbeddings, embeddings_finisher, label_stringIdx )
use_df <- ml_fit_and_transform(use_pipeline, newsDF)
use_df %>% mutate(finished_use_embeddings = substr(to_json(finished_use_embeddings), 1, 30)) %>% select(finished_use_embeddings) %>% head(3)
use_df <- use_df %>% mutate(features = explode(finished_use_embeddings)) use_df %>% head(2)
# set seed for reproducibility splits <- sdf_random_split(use_df, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0) lrModel <- ml_fit(lr, trainingData)
predictions <- ml_transform(lrModel, testData) predictions %>% mutate(description = substr(description, 1, 30)) %>% select(description, category, label, prediction) %>% head(20)
y_true <- predictions %>% select(label) %>% collect() y_pred <- predictions %>% select(prediction) %>% collect() cm <- conf_mat(data.frame(label = y_true, prediction = y_pred) %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
lr <- ml_logistic_regression(sc, max_iter = 20, reg_param = 0.3, elastic_net_param = 0) lrModel <- ml_fit(lr, use_df)
test_df <- spark_read_csv(sc, test_data_file)
test_df <- ml_fit_and_transform(use_pipeline, test_df)
test_df <- test_df %>% mutate(features = explode(finished_use_embeddings))
test_df %>% head(2)
predictions <- ml_transform(lrModel, test_df)
df <- predictions %>% select(description, category, label, prediction) %>% collect()
df <- df %>% mutate(label = case_when(category == "World" ~ 2.0, category == "Sports" ~ 3.0, category == "Business" ~ 0.0, category == "Sci/Tech" ~ 1.0))
df %>% mutate(description = substr(description, 1, 20)) %>% head(10)
cm <- conf_mat(df %>% mutate(label = factor(label), prediction = factor(prediction)), label, prediction) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
document <- nlp_document_assembler(sc, input_col = "description", output_col = "document") use <- ml_load(sc, "/home/davek/cache_pretrained/tfhub_use_en_2.4.4_2.4_1583158595769") %>% nlp_set_input_cols(c("document")) %>% nlp_set_output_col("sentence_embeddings") classifierdl <- nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class", label_col = "category", max_epochs = 5, enable_output_logs = TRUE) pipeline <- ml_pipeline( document, use, classifierdl )
# set seed for reproducibility splits <- sdf_random_split(newsDF, training = 0.7, test = 0.3, seed = 100) trainingData <- splits$training testData <- splits$test print(paste0("Training Dataset Count: ", sdf_nrow(trainingData))) print(paste0("Test Dataset Count: ", sdf_nrow(testData)))
pipelineModel <- ml_fit(pipeline, trainingData)
df <- ml_transform(pipelineModel, testData) %>% mutate(class_result = class.result) %>% select(category, description, class_result) %>% collect() cm <- conf_mat(df %>% mutate(class_result = factor(map_chr(class_result, function(x) x[[1]])), category = factor(category)), category, class_result) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = "document", output_col = "token") lemma <- nlp_lemmatizer_pretrained(sc, name = "lemma_antbnc", input_cols = c("token"), output_col = "lemma") glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "lemma"), output_col = "embeddings", case_sensitive = FALSE) lemma_pipeline <- ml_pipeline( document_assembler, tokenizer, lemma, glove_embeddings )
ml_fit_and_transform(lemma_pipeline, head(trainingData, 1000)) %>% head(30)
document_assembler <- nlp_document_assembler(sc, input_col = "description", output_col = "document") tokenizer <- nlp_tokenizer(sc, input_cols = "document", output_col = "token") normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized") stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE) lemma <- nlp_lemmatizer_pretrained(sc, name = "lemma_antbnc", input_cols = c("cleanTokens"), output_col = "lemma") glove_embeddings <- nlp_word_embeddings_pretrained(sc, input_cols = c("document", "lemma"), output_col = "embeddings", case_sensitive = FALSE) embeddings_sentence <- nlp_sentence_embeddings(sc, input_cols = c("document", "embeddings"), output_col = "sentence_embeddings", pooling_strategy = "AVERAGE") classifierdl <- nlp_classifier_dl(sc, input_cols = c("sentence_embeddings"), output_col = "class", label_col = "category", max_epochs = 10, enable_output_logs = TRUE) clf_pipeline <- ml_pipeline( document_assembler, tokenizer, normalizer, stopwords_cleaner, lemma, glove_embeddings, embeddings_sentence, classifierdl )
clf_pipelineModel <- ml_fit(clf_pipeline, trainingData)
df <- ml_transform(clf_pipelineModel, testData) %>% mutate(class_result = class.result) %>% select(category, description, class_result) %>% collect() cm <- conf_mat(df %>% mutate(class_result = factor(map_chr(class_result, function(x) x[[1]])), category = factor(category)), category, class_result) cm summary(cm) ggplot2::autoplot(cm, type = "heatmap")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.