This notebook is adapted from John Snow Labs workshop Jupyter/Python tutorial "5- How to use Spark NLP and Spark ML Pipelines.ipynb" (https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/jupyter/5-%20How%20to%20use%20Spark%20NLP%20and%20Spark%20ML%20Pipelines.ipynb)

Spark NLP and Spark ML Pipelines

Simple Topic Modeling

Spark NLP

Spark ML

library(dplyr)
library(sparklyr)
library(sparknlp)

Let's create a Spark Session for our app

version <- Sys.getenv("SPARK_VERSION", unset = "2.4.0")

config <- sparklyr::spark_config()

options(sparklyr.sanitize.column.names.verbose = TRUE)
options(sparklyr.verbose = TRUE)
options(sparklyr.na.omit.verbose = TRUE)
options(sparklyr.na.action.verbose = TRUE)
sc <- sparklyr::spark_connect(master = "local", version = version, config = config)

Let's download some scientific example from PubMed dataset:

pubmed_file <- paste0(tempdir(), "/pubmed-sample.csv")
download.file(url = "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv", 
              destfile = pubmed_file)
pubmed_df <- spark_read_csv(sc, pubmed_file, name = "pubmedsample") %>%
  filter(!is.null(AB)) %>%
  rename(text = AB) %>%
  select(-TI)
sdf_schema(pubmed_df)
head(pubmed_df)
sdf_nrow(pubmed_df)

Let's create Spark-NLP Pipeline

# Spark NLP Pipeline
document_assembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")

sentence_detector <- nlp_sentence_detector(sc, input_cols = c("document"), output_col = "sentence")

tokenizer <- nlp_tokenizer(sc, input_cols = c("sentence"), output_col = "token")

pos_tagger <- nlp_perceptron_pretrained(sc, input_cols = c("sentence", "token"), output_col = "pos")

chunker <- nlp_chunker(sc, input_cols = c("sentence", "pos"), 
                       output_col = "chunk", 
                       regex_parsers = c("<NNP>+", "<DT>?<JJ>*<NN>"))

finisher <- nlp_finisher(sc, input_cols = c("chunk"), include_metadata = FALSE)

nlp_pipeline <- ml_pipeline(document_assembler, 
                            sentence_detector, 
                            tokenizer, 
                            pos_tagger,
                            chunker, 
                            finisher)
nlp_pipeline_df <- ml_fit_and_transform(nlp_pipeline, pubmed_df)
head(nlp_pipeline_df)

Let's create Spark ML Pipeline

cv <- ft_count_vectorizer(sc, input_col = "finished_chunk", output_col = "features", vocab_size = 1000, min_df = 10.0, min_tf = 10.0)
idf = ft_idf(sc, input_col = "features", output_col = "idf")
lda = ml_lda(sc, k = 10, max_iter = 5)

spark_ml_pipeline <- ml_pipeline(cv, idf, lda)

We are going to train Spark ML Pipeline by using Spark-NLP Pipeline

ml_model <- ml_fit(spark_ml_pipeline, nlp_pipeline_df)
ml_pipeline_df <- ml_transform(ml_model, nlp_pipeline_df)
head(ml_pipeline_df)
lda_model <- ml_stage(ml_model, lda$uid)
ll <- ml_log_likelihood(lda_model, ml_pipeline_df)
lp <- ml_log_perplexity(lda_model, ml_pipeline_df)
print(paste("The lower bound on the log likelihood of the entire corpus: ", ll))
print(paste("The upper bound on perplexity: " , lp))
# Describe topics
head(ml_describe_topics(lda_model, max_terms_per_topic = 3))

Let's look at our topics

NOTE: More cleaning, filtering, playing around with CountVectorizer, and more iterations in LDA will result in better Topic Modelling results.

# Output topics. Each is a distribution over words (matching word count vectors)
print(paste("Learned topics (as distributions over vocab of", lda_model$vocab_size,"words):"))

topics <- lda_model$describe_topics(50)

vocab <- ml_stage(ml_model, 1)$vocabulary

topics_words <- topics %>% 
  collect() %>%
  mutate(topic_words = purrr::map(termIndices, function(i) {purrr::map(i, function(x) {vocab[[x + 1]]})}))

words <- NULL
cnames <- NULL
for (i in 1:nrow(topics_words)) {
  cnames <- c(cnames, paste0("Topic ", i))
  df <- data.frame(as.character(topics_words[i,]$topic_words[[1]]))
  words <- bind_cols(words, df)
}
colnames(words) <- cnames

words


r-spark/sparknlp documentation built on Oct. 15, 2022, 10:50 a.m.