This notebook is adapted from John Snow Labs workshop Jupyter/Python tutorial "2.Text_Preprocessing_with_SparkNLP_Annotators_Transformers" (https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/2.Text_Preprocessing_with_SparkNLP_Annotators_Transformers.ipynb)

Note Read this article if you want to understand the basic concepts in Spark NLP. https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59

1. Annotators and Transformer Concepts

In Spark NLP, all Annotators are either Estimators or Transformers as we see in Spark ML. An Estimator in Spark ML is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model. A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer that transforms a DataFrame with features into a DataFrame with predictions. In Spark NLP, there are two types of annotators: AnnotatorApproach and AnnotatorModel AnnotatorApproach extends Estimators from Spark ML, which are meant to be trained through fit(), and AnnotatorModel extends Transformers which are meant to transform data frames through transform(). Some of Spark NLP annotators have a Model suffix and some do not. The model suffix is explicitly stated when the annotator is the result of a training process. Some annotators, such as Tokenizer are transformers but do not contain the suffix Model since they are not trained, annotators. Model annotators have a pre-trained() on its static object, to retrieve the public pre-trained version of a model. Long story short, if it trains on a DataFrame and produces a model, it’s an AnnotatorApproach; and if it transforms one DataFrame into another DataFrame through some models, it’s an AnnotatorModel (e.g. WordEmbeddingsModel) and it doesn’t take Model suffix if it doesn’t rely on a pre-trained annotator while transforming a DataFrame (e.g. Tokenizer).

read.csv("https://gist.githubusercontent.com/vkocaman/e091605f012ffc1efc0fcda170919602/raw/fae33d25bd026375b2aaf1194b68b9da559c4ac4/annotators.csv")

By convention, there are three possible names:

Approach — Trainable annotator

Model — Trained annotator

nothing — Either a non-trainable annotator with pre-processing step or shorthand for a model

So for example, Stemmer doesn’t say Approach nor Model, however, it is a Model. On the other hand, Tokenizer doesn’t say Approach nor Model, but it has a TokenizerModel(). Because it is not “training” anything, but it is doing some preprocessing before converting into a Model. When in doubt, please refer to official documentation and API reference. Even though we will do many hands-on practices in the following articles, let us give you a glimpse to let you understand the difference between AnnotatorApproach and AnnotatorModel. As stated above, Tokenizer is an AnnotatorModel. So we need to call fit() and then transform().

Now let’s see how this can be done in Spark NLP using Annotators and Transformers. Assume that we have the following steps that need to be applied one by one on a data frame.

What’s actually happening under the hood?

When we fit() on the pipeline with Spark data frame (df), its text column is fed into DocumentAssembler() transformer at first and then a new column “document” is created in Document type (AnnotatorType). As we mentioned before, this transformer is basically the initial entry point to Spark NLP for any Spark data frame. Then its document column is fed into SentenceDetector() (AnnotatorApproach) and the text is split into an array of sentences and a new column “sentences” in Document type is created. Then “sentences” column is fed into Tokenizer() (AnnotatorModel) and each sentence is tokenized and a new column “token” in Token type is created. And so on.

library(purrr, warn.conflicts = FALSE)
library(sparklyr, warn.conflicts = FALSE)
library(sparknlp, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

version <- Sys.getenv("SPARK_VERSION", unset = "2.4.5")

config <- sparklyr::spark_config()

options(sparklyr.sanitize.column.names.verbose = TRUE)
options(sparklyr.verbose = TRUE)
options(sparklyr.na.omit.verbose = TRUE)
options(sparklyr.na.action.verbose = TRUE)

sc <- sparklyr::spark_connect(master = "local", version = version, config = config)

cat("Apache Spark version: ", sc$home_version, "\n")
cat("Spark NLP version: ", nlp_version())
text <- "Peter Parker is a nice guy and lives in New York"

spark_df <- sdf_copy_to(sc, data.frame(text = text))
spark_df
sample_sentences_file <- pins::pin("https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/jupyter/annotation/english/spark-nlp-basics/sample-sentences-en.txt")

spark_df <- spark_read_text(sc, "sample_sentences", sample_sentences_file) %>% rename(text = line)
spark_df
spark_df %>% select(text)

Transformers

what are we going to do if our DataFrame doesn’t have columns in those type? Here comes transformers. In Spark NLP, we have five different transformers that are mainly used for getting the data in or transform the data from one AnnotatorType to another. Here is the list of transformers:

nlp_document_assembler: To get through the NLP process, we need to get raw data annotated. This is a special transformer that does this for us; it creates the first annotation of type Document which may be used by annotators down the road.

nlp_token_assembler: This transformer reconstructs a Document type annotation from tokens, usually after these have been normalized, lemmatized, normalized, spell checked, etc, to use this document annotation in further annotators.

nlp_doc2chunk: Converts DOCUMENT type annotations into CHUNK type with the contents of a chunkCol.

nlp_chunk2doc : Converts a CHUNK type column back into DOCUMENT. Useful when trying to re-tokenize or do further analysis on a CHUNK result.

nlp_finisher: Once we have our NLP pipeline ready to go, we might want to use our annotation results somewhere else where it is easy to use. The Finisher outputs annotation(s) values into a string.

each annotator accepts certain types of columns and outputs new columns in another type (we call this AnnotatorType).

In Spark NLP, we have the following types:

document, token, chunk, pos, word_embeddings, date, entity, sentiment, named_entity, dependency, labeled_dependency.

That is, the DataFrame you have needs to have a column from one of these types if that column will be fed into an annotator; otherwise, you’d need to use one of the Spark NLP transformers.

2. Document Assembler

In Spark NLP, we have five different transformers that are mainly used for getting the data in or transform the data from one AnnotatorType to another.

That is, the DataFrame you have needs to have a column from one of these types if that column will be fed into an annotator; otherwise, you’d need to use one of the Spark NLP transformers. Here is the list of transformers: nlp_document_assembler, nlp_token_assembler, nlp_doc2chunk, nlp_chunk2doc, and the nlp_finisher.

So, let’s start with nlp_document_assembler, an entry point to Spark NLP annotators.

To get through the process in Spark NLP, we need to get raw data transformed into Document type at first.

nlp_document_assembler is a special transformer that does this for us; it creates the first annotation of type Document which may be used by annotators down the road.

nlp_document_assembler See the full list here and the source code here.

inputCol -> the name of the column that will be converted. We can specify only one column here. It can read either a String column or an Array[String]

outputCol -> optional : the name of the column in Document type that is generated. We can specify only one column here. Default is ‘document’

idCol -> optional: String type column with id information

metadataCol -> optional: Map type column with metadata information

cleanupMode -> optional: Cleaning up options, possible values: disabled: Source kept as original. This is a default. inplace: removes new lines and tabs. inplace_full: removes new lines and tabs but also those which were converted to strings (i.e. \n) shrink: removes new lines and tabs, plus merging multiple spaces and blank lines to a single space. * shrink_full: remove new lines and tabs, including stringified values, plus shrinking spaces and blank lines.

documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document", cleanup_mode = "shrink")
doc_df <- ml_transform(documentAssembler, spark_df)
doc_df %>% mutate(document = to_json(document))
sdf_schema(doc_df)
doc_df %>% mutate(document = to_json(document))

The new column is in an array of struct type and has the parameters shown above. The annotators and transformers all come with universal metadata that would be filled down the road depending on the annotators being used. Unless you want to append other Spark NLP annotators to DocumentAssembler(), you don’t need to know what all these parameters mean for now. So we will talk about them in the following articles. You can access all these parameters with {column name}.{parameter name}.

Let’s print out the first item’s result.

doc_df %>% mutate(docresult = explode(document.result)) %>% select(docresult) %>%  head(1)

If we would like to flatten the document column, we can do as follows

doc_df %>%
  mutate(document = explode(document)) %>% 
  sdf_separate_column("document", into = c("annotatorType", "begin", "end", "result", "metadata")) %>% 
  mutate(metadata = to_json(metadata)) %>% 
  select(-document)

3. Sentence Detector

Finds sentence bounds in raw text.

setCustomBounds(string): Custom sentence separator text

setUseCustomOnly(bool): Use only custom bounds without considering those of Pragmatic Segmenter. Defaults to false. Needs customBounds.

setUseAbbreviations(bool): Whether to consider abbreviation strategies for better accuracy but slower performance. Defaults to true.

setExplodeSentences(bool): Whether to split sentences into different Dataset rows. Useful for higher parallelism in fat rows. Defaults to false.

# we feed the document column coming from Document Assembler

sentenceDetector <- nlp_sentence_detector(sc, input_cols = c("document"), output_col = "sentences")
sent_df <- ml_transform(sentenceDetector, doc_df)
head(mutate(sent_df, sentences = to_json(sentences)))
head(
  sent_df %>% 
    mutate(sentences = to_json(sentences)) %>% 
    select(sentences), 
  n = 1)
text <- 'The patient was prescribed 1 capsule of Advil for 5 days . He was seen by the endocrinology service and she was discharged on 40 units of insulin glargine at night , 12 units of insulin lispro with meals , and metformin 1000 mg two times a day . It was determined that all SGLT2 inhibitors should be discontinued indefinitely fro 3 months .'
text
spark_df <- sdf_copy_to(sc, data.frame(text = text))
spark_df
# if you want to create a spark datfarme from a list of strings
sdf_copy_to(sc,
            data.frame(text = unlist(strsplit(text, "\\."))))
doc_df <- ml_transform(documentAssembler, spark_df)

sent_df <- ml_transform(sentenceDetector, doc_df)

sent_df %>% mutate(document = to_json(document), sentences = to_json(sentences))
sent_df %>% mutate(sent_result = to_json(sentences.result)) %>% select(sent_result)
sentenceDetector <- nlp_sentence_detector(sc, input_cols = c("document"), output_col = "sentences", explode_sentences = TRUE)

sent_df <- ml_transform(sentenceDetector, doc_df)

sent_df %>% mutate(sent_result = to_json(sentences.result)) %>% select(sent_result)

Tokenizer

Identifies tokens with tokenization open standards. It is an Annotator Approach, so it requires .fit().

A few rules will help customizing it if defaults do not fit user needs.

setExceptions(StringArray): List of tokens to not alter at all. Allows composite tokens like two worded tokens that the user may not want to split.

addException(String): Add a single exception

setExceptionsPath(String): Path to txt file with list of token exceptions

caseSensitiveExceptions(bool): Whether to follow case sensitiveness for matching exceptions in text

contextChars(StringArray): List of 1 character string to rip off from tokens, such as parenthesis or question marks. Ignored if using prefix, infix or suffix patterns.

splitChars(StringArray): List of 1 character string to split tokens inside, such as hyphens. Ignored if using infix, prefix or suffix patterns.

splitPattern (String): pattern to separate from the inside of tokens. takes priority over splitChars. setTargetPattern: Basic regex rule to identify a candidate for tokenization. Defaults to \S+ which means anything not a space

setSuffixPattern: Regex to identify subtokens that are in the end of the token. Regex has to end with \z and must contain groups (). Each group will become a separate token within the prefix. Defaults to non-letter characters. e.g. quotes or parenthesis

setPrefixPattern: Regex to identify subtokens that come in the beginning of the token. Regex has to start with \A and must contain groups (). Each group will become a separate token within the prefix. Defaults to non-letter characters. e.g. quotes or parenthesis

addInfixPattern: Add an extension pattern regex with groups to the top of the rules (will target first, from more specific to the more general).

minLength: Set the minimum allowed legth for each token

maxLength: Set the maximum allowed legth for each token

tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")
text <- 'Peter Parker (Spiderman) is a nice guy and lives in New York but has no e-mail!'
spark_df <- sdf_copy_to(sc, data.frame(text = text))
doc_df <- ml_transform(documentAssembler, spark_df)

token_df <- ml_fit_and_transform(tokenizer, doc_df)

token_df %>% mutate(document = to_json(document), token = to_json(token))
token_df %>% mutate(token_result = to_json(token.result)) %>% select(token_result)

Stacking Spark NLP Annotators in Spark ML Pipeline

Spark NLP provides an easy API to integrate with Spark ML Pipelines and all the Spark NLP annotators and transformers can be used within Spark ML Pipelines. So, it’s better to explain Pipeline concept through Spark ML official documentation.

What is a Pipeline anyway? In machine learning, it is common to run a sequence of algorithms to process and learn from data.

Apache Spark ML represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order.

In simple terms, a pipeline chains multiple Transformers and Estimators together to specify an ML workflow. We use Pipeline to chain multiple Transformers and Estimators together to specify our machine learning workflow.

The figure below is for the training time usage of a Pipeline.

A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. That is, the data are passed through the fitted pipeline in order. Each stage’s transform() method updates the dataset and passes it to the next stage. With the help of Pipelines, we can ensure that training and test data go through identical feature processing steps.

Now let’s see how this can be done in Spark NLP using Annotators and Transformers. Assume that we have the following steps that need to be applied one by one on a data frame.

And here is how we code this pipeline up in Spark NLP.

documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
sentenceDetector <- nlp_sentence_detector(sc, input_cols = "document", output_col = "sentences")
tokenizer <- nlp_tokenizer(sc, input_cols = c("sentences"), output_col = "token")

nlpPipeline <- ml_pipeline(documentAssembler,
                           sentenceDetector,
                           tokenizer)

empty_df <- sdf_copy_to(sc, data.frame(text = c("")))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
spark_df <- spark_read_text(sc, "sample_sentences", sample_sentences_file) %>% rename(text = line)
spark_df
result <- ml_transform(pipelineModel, spark_df)
result %>% mutate(document = substr(to_json(document), 0, 30),
                  sentences = substr(to_json(sentences), 0, 30),
                  token = substr(to_json(token), 0, 30))
sdf_schema(result)
result %>% mutate(sent_result = to_json(sentences.result)) %>% select(sent_result) %>% head(n = 3)
result %>% mutate(token = to_json(token)) %>% select(token) %>% head(n = 3)

Normalizer

Removes all dirty characters from text following a regex pattern and transforms words based on a provided dictionary

setCleanupPatterns(patterns): Regular expressions list for normalization, defaults [^A-Za-z]

setLowercase(value): lowercase tokens, default false

setSlangDictionary(path): txt file with delimited words to be transformed into something else

normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized", lowercase = TRUE, cleanup_patterns = c("[^\\w\\d\\s]"))
# remove punctuations (keep alphanumeric chars)
# if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])
documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")
nlpPipeline <- ml_pipeline(documentAssembler, tokenizer, normalizer)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
result <- ml_transform(pipelineModel, spark_df)
result %>% mutate(document = substr(to_json(document), 0, 30),
                  token = substr(to_json(token), 0, 30),
                  normalized = substr(to_json(normalized), 0, 30))
result %>% mutate(norm_result = to_json(normalized.result)) %>% select(norm_result) %>% head(n = 3)

Stopwords Cleaner

This annotator excludes from a sequence of strings (e.g. the output of a Tokenizer, Normalizer, Lemmatizer, and Stemmer) and drops all the stop words from the input sequences.

Functions:

setStopWords: The words to be filtered out. Array[String]

setCaseSensitive: Whether to do a case sensitive comparison over the stop words.

stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("token"), output_col = "cleanTokens", case_sensitive = FALSE)
documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

nlpPipeline <- ml_pipeline(documentAssembler, tokenizer, stopwords_cleaner)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
spark_df <- spark_read_text(sc, sample_sentences_file) %>% rename(text = line)

result <- ml_transform(pipelineModel, spark_df)

result %>% mutate(document = to_json(document), token = to_json(token), cleanTokens = to_json(cleanTokens))

Token Assembler

documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
sentenceDetector <- nlp_sentence_detector(sc, input_cols = c("document"), output_col = "sentences")
tokenizer <- nlp_tokenizer(sc, input_cols = c("sentences"), output_col = "token")
normalizer <- nlp_normalizer(sc, input_cols = c("token"), output_col = "normalized", lowercase = FALSE)
stopwords_cleaner <- nlp_stop_words_cleaner(sc, input_cols = c("normalized"), output_col = "cleanTokens", case_sensitive = FALSE)
tokenAssembler <- nlp_token_assembler(sc, input_cols = c("cleanTokens"), output_col = "clean_text")

nlpPipeline <- ml_pipeline(documentAssembler,
                           sentenceDetector,
                           tokenizer,
                           normalizer,
                           stopwords_cleaner,
                           tokenAssembler)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

# These next two lines could be combined by using ml_fit_and_transform
pipelineModel <- ml_fit(nlpPipeline, empty_df)

result <- ml_transform(pipelineModel, spark_df)

result
result %>% mutate(clean_text_result = to_json(clean_text.result)) %>% select(text, clean_text_result)
result %>% 
  select(clean_text) %>% 
  mutate(clean_text = explode(clean_text)) %>% 
  sdf_separate_column(column = "clean_text", into = c("annotatorType", "begin", "end", "result", "metadata")) %>% 
  mutate(sentence = metadata.sentence) %>% 
  select(-metadata, -clean_text, -annotatorType)
# if we hadn't used Sentence Detector, this would be what we got. (tokenizer gets document instead of sentences column)
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

nlpPipeline <- ml_pipeline(
  documentAssembler,
  tokenizer,
  normalizer,
  stopwords_cleaner,
  tokenAssembler
)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)

result <- ml_transform(pipelineModel, spark_df)

result %>% mutate(clean_text_result = to_json(clean_text.result)) %>% select(text, clean_text_result)
result %>% 
  select(clean_text) %>% 
  mutate(clean_text = explode(clean_text)) %>% 
  sdf_separate_column(column = "clean_text", into = c("annotatorType", "begin", "end", "result", "metadata")) %>% 
  mutate(sentence = metadata.sentence) %>% 
  select(-metadata, -clean_text, -annotatorType)

Stemmer

Returns hard-stems out of words with the objective of retrieving the meaningful part of the word

stemmer <- nlp_stemmer(sc, input_cols = c("token"), output_col = "stem")
documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")

nlpPipeline <- ml_pipeline(documentAssembler,
                           tokenizer,
                           stemmer)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
result <- ml_transform(pipelineModel, spark_df)
result
result %>% mutate(stem_result = to_json(stem.result)) %>% select(stem_result)
result %>%
  select(tokens = token, stems = stem) %>% 
  mutate(results = explode(arrays_zip(tokens.result, stems.result))) %>% 
  sdf_separate_column(column = "results", into = c("token", "stem")) %>% 
  select(token, stem)

Lemmatizer

Retrieves lemmas out of words with the objective of returning a base dictionary word

lemma_dict <- pins::pin("https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt")
lemmatizer <- nlp_lemmatizer(sc, input_cols = c("token"), output_col = "lemma", 
                             dictionary_path = lemma_dict, dictionary_value_delimiter = "\t", dictionary_key_delimiter = "->")
documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")
stemmer <- nlp_stemmer(sc, input_cols = c("token"), output_col = "stem")

nlpPipeline <- ml_pipeline(documentAssembler,
                           tokenizer,
                           stemmer,
                           lemmatizer)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
result <- ml_transform(pipelineModel, spark_df)
result
result %>% mutate(lemma_result = to_json(lemma.result)) %>% select(lemma_result)
result %>%
  select(tokens = token, stems = stem, lemmas = lemma) %>% 
  mutate(results = explode(arrays_zip(tokens.result, stems.result, lemmas.result))) %>% 
  sdf_separate_column(column = "results", into = c("token", "stem", "lemma")) %>% 
  select(token, stem, lemma)

NGram Generator

NGramGenerator annotator takes as input a sequence of strings (e.g. the output of a Tokenizer, Normalizer, Stemmer, Lemmatizer, and StopWordsCleaner).

The parameter n is used to determine the number of terms in each n-gram. The output will consist of a sequence of n-grams where each n-gram is represented by a space-delimited string of n consecutive words with annotatorType CHUNK same as the Chunker annotator.

Functions:

setN: number elements per n-gram (>=1)

setEnableCumulative: whether to calculate just the actual n-grams or all n-grams from 1 through n

setDelimiter: Glue character used to join the tokens

ngrams_cum <- nlp_ngram_generator(sc, input_cols = c("token"), output_col = "ngrams", n = 3, enable_cumulative = TRUE, delimiter = "_")

nlpPipeline <- ml_pipeline(documentAssembler,
                           tokenizer,
                           ngrams_cum)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)

result <- ml_transform(pipelineModel , spark_df)

result %>% mutate(result = to_json(ngrams.result)) %>% select(result)

TextMatcher

Annotator to match entire phrases (by token) provided in a file against a Document

Functions:

setEntities(path, format, options): Provides a file with phrases to match. Default: Looks up path in configuration.

path: a path to a file that contains the entities in the specified format.

readAs: the format of the file, can be one of {ReadAs.LINE_BY_LINE, ReadAs.SPARK_DATASET}. Defaults to LINE_BY_LINE.

options: a map of additional parameters. Defaults to {“format”: “text”}.

pubmed_sample <- pins::pin("https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/pubmed/pubmed-sample.csv")
pubMedDF <- spark_read_csv(sc, "pubmedsample", path = pubmed_sample) %>% 
  filter(!is.null(AB)) %>% 
  rename(text = AB) %>% 
  select(-TI)

head(pubMedDF)
# write the target entities to txt file

entities <- c('KCNJ9', 'GIRK', 'diabetes mellitus', 'nucleotide polymorphisms')

writeLines(entities, file("clinical_entities.txt"))
entity_extractor <- nlp_text_matcher(sc, input_cols = c("document", "token"), output_col = "matched_entities", path = "clinical_entities.txt")

nlpPipeline <- ml_pipeline(documentAssembler,
                           tokenizer,
                           entity_extractor)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)
result <- ml_transform(pipelineModel, head(pubMedDF, n = 10))
result %>% mutate(matched_entities_result = to_json(matched_entities.result)) %>% select(matched_entities_result) %>% head(1)
result_df <- result %>% 
  mutate(matched_entities = explode(matched_entities)) %>% 
  mutate(matched_entities_result = matched_entities.result, begin = matched_entities.begin, end = matched_entities.end) %>% 
  select(matched_entites = matched_entities_result, begin, end)

head(result_df, 10)

RegexMatcher

rules <- c("renal\\s\\w+, followed by 'renal'", "cardiac\\s\\w+, followed by 'cardiac'")

writeLines(rules, file("regex_rules.txt"))
documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
regex_matcher <- nlp_regex_matcher(sc, input_cols = c("document"), output_col = "regex_matches", strategy = "MATCH_ALL",
                                   rules_path = "regex_rules.txt", rules_path_delimiter = ",")

nlpPipeline <- ml_pipeline(documentAssembler, regex_matcher)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)

match_df <- ml_transform(pipelineModel, pubMedDF)

match_df %>%
  mutate(result = to_json(regex_matches.result)) %>% 
  select(result) %>% 
  head(3)
match_df %>% 
  mutate(matches = regex_matches.result) %>% 
  filter(size(matches) > 1) %>% 
  mutate(matches = to_json(matches),
         text = substr(text, 0, 30)) %>% 
  select(text, matches) %>% 
  head(20)

Text Cleaning with UDF

text <- '<h1 style="color: #5e9ca0;">Have a great <span  style="color: #2b2301;">birth</span> day!</h1>'

text_df <- sdf_copy_to(sc, data.frame(text = text))

clean_text <- function(s) data.frame(cleaned = gsub("<[^>]*>", "", s$text))

sdf_bind_cols(text_df, text_df %>% spark_apply(clean_text))

Finisher

Finisher: Once we have our NLP pipeline ready to go, we might want to use our annotation results somewhere else where it is easy to use. The Finisher outputs annotation(s) values into a string.

If we just want the desired output column in the final dataframe, we can use Finisher to drop previous stages in the final output and gte the result from the process.

This is very handy when you want to use the output from Spark NLP annotator as an input to another Spark ML transformer.

Settable parameters are:

setInputCols()

setOutputCols()

setCleanAnnotations(True) -> Whether to remove intermediate annotations

setValueSplitSymbol(“#”) -> split values within an annotation character

setAnnotationSplitSymbol(“@”) -> split values between annotations character

setIncludeMetadata(False) -> Whether to include metadata keys. Sometimes useful in some annotations.

setOutputAsArray(False) -> Whether to output as Array. Useful as input for other Spark transformers.

finisher <- nlp_finisher(sc, input_cols = c("regex_matches"), include_metadata = FALSE)

nlpPipeline <- ml_pipeline(
  documentAssembler,
  regex_matcher,
  finisher
)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)

match_df <- ml_transform(pipelineModel, pubMedDF)

match_df %>% 
  mutate(text = substr(text, 1, 49),
         finished_regex_matches = to_json(finished_regex_matches)) %>% 
  select(text, finished_regex_matches)
sdf_schema(match_df)
match_df %>% 
  filter(size(finished_regex_matches) > 1) %>% 
  mutate(text = substr(text, 0, 50),
         finished_regex_matches = to_json(finished_regex_matches))

LightPipeline

LightPipelines are Spark NLP specific Pipelines, equivalent to Spark ML Pipeline, but meant to deal with smaller amounts of data. They’re useful working with small datasets, debugging results, or when running either training or prediction from an API that serves one-off requests.

Spark NLP LightPipelines are Spark ML pipelines converted into a single machine but the multi-threaded task, becoming more than 10x times faster for smaller amounts of data (small is relative, but 50k sentences are roughly a good maximum). To use them, we simply plug in a trained (fitted) pipeline and then annotate a plain text. We don't even need to convert the input text to DataFrame in order to feed it into a pipeline that's accepting DataFrame as an input in the first place. This feature would be quite useful when it comes to getting a prediction for a few lines of text from a trained ML model.

It is nearly 20x faster than using Spark ML Pipeline

nlp_annotate(nlp_light_pipeline(someTrainedPipeline), someStringOrArray)

documentAssembler <- nlp_document_assembler(sc, input_col = "text", output_col = "document")
tokenizer <- nlp_tokenizer(sc, input_cols = c("document"), output_col = "token")
stemmer <- nlp_stemmer(sc, input_cols = c("token"), output_col = "stem")
lemmatizer <- nlp_lemmatizer(sc, input_cols = c("token"), output_col = "lemma", 
                             dictionary_path = lemma_dict, dictionary_value_delimiter = "\t", dictionary_key_delimiter = "->")

nlpPipeline <- ml_pipeline(documentAssembler,
                           tokenizer,
                           stemmer,
                           lemmatizer)

empty_df <- sdf_copy_to(sc, data.frame(text = ""))

pipelineModel <- ml_fit(nlpPipeline, empty_df)

light_model <- nlp_light_pipeline(pipelineModel)

light_result <- nlp_annotate(light_model, "John and Peter are brothers. However they don't support each other that much.")
names(light_result)
data.frame(token = unlist(light_result$token), stem = unlist(light_result$stem), lemma = unlist(light_result$lemma))
light_result <- nlp_annotate_full(light_model, "John and Peter are brothers. However they don't support each other that much.")
jsonlite::toJSON(light_result, force = TRUE)
text_list = c("How did serfdom develop in and then leave Russia ?",
"There will be some exciting breakthroughs in NLP this year.")

jsonlite::toJSON(nlp_annotate(light_model, text_list))


r-spark/sparknlp documentation built on Oct. 15, 2022, 10:50 a.m.