knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)

Setup

The first thing to do is to establish a connection with the Elasticsearch server. In our case, it is running locally on port 9200.

library(elasticquery)

con <- es_connect(
  host = "127.0.0.1", port = 9200,
  primary_index = "eios-items_all")
library(elasticquery)

con <- es_connect(
  host = "127.0.0.1", port = 9200,
  primary_index = "my-index")

This connection object is passed to queries so it knows where to go to run the query.

Note that we can also specify the primary_index, which is the index used by default by queries we build unless we specify otherwise in the query. Here we are querying "my-index" by default.

Queries

Overview

Two major types of queries are currently supported by this package.

  1. Aggregation: Documents are counted by specified fields in the data, query initiated with query_agg()
  2. Fetch: Documents are retrieved according to specified criteria, query initiated with query_fetch()

After a query is initiated, it can built upon by piping various operations:

Aggregation Queries

Aggregation queries are constructed by doing the following:

Initiating a Query

To initiate an aggregation query, we use the function query_agg(), and pass it our connection object.

query <- query_agg(con)

We can view the query's translation to an Elasticsearch search body string simply through printing the query.

query

Here, of course, the query is empty as we haven't specified aggregation dimensions yet.

Queries can be executed using the run() function.

run(query)

Since the query is empty, nothing is returned.

Getting a List of Queryable Fields

To begin specifying fields to aggregate on, it can be helpful to get a view of what fields are available to aggregate on. This can be done by passing the connection object to `queryable_fields().

queryable_fields(con)

Note that aggregations make most sense when done against categorical variables with some bounded cardinality. Typically keywords make the most sense to aggregate against, but even with keywords, you should use care to think about what fields make sense to aggregate.

Aggregating by Fields

Suppose we want to tabulate the frequency of all of the fields in the index. We can do this by adding agg_by_field() to our query, specifying the field name "tags".

query <- query_agg(con) %>%
  agg_by_field("tags")

The function agg_by_field(), and all subsequent query modifying functions take a query object as its input and emit a modified query object as its output. This makes these functions suitable for piping, which is a convenient and expressive way to build queries.

To see what this new query looks like:

query

Note that aggregation queries use composite aggregation with paging, and running the query will automatically take care of recurrent queries until paging is done and bind the results together, saving a lot of tedious work.

We can retrieve the result of this query by calling run().

run(query)

We can continue to add more dimensions to the aggregation using pipes. For example, to count the frequency of both the fields "tags" and "affectedCountriesIso":

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_field("affectedCountriesIso") %>%
  run()

Aggregating by Date Binning

Suppose we want to get daily counts for each tag in the data. We can use a function agg_by_date(), which by default aggregates daily.

Here, we aggregate on a document's field "processedOnDate".

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  run()

For finer control over the date binning, we can use functions calendar_interval() and fixed_interval().

For example, to bin on calendar week:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  run()

And to bin on every 10 days:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", fixed_interval("10d")) %>%
  run()

Filtering

We can further modify an aggregation query by specifying filters. Three types of filters are currently available:

Note that filters can apply to both aggregation and fetch queries.

Range Filters

Range filters are specifyind using filter_range(), specifying the field to filter, and then specifying one or both of from and to values for the range.

For example, to take our earlier aggregation query and filter it to dates later than 2018-01-01:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  run()

To filter on a date/time, the format is like the following:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate") %>%
  filter_range("processedOnDate", from = "2020-03-10T10:21:32") %>%
  run()

Terms Filters

The funtion filter_terms() adds a filter to a query that specifies certain values a field must have to be included in the aggregation.

For example, to add to our earlier query, suppose we require that "affectedCountriesIso" must contain "US" or "CA":

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  filter_range("processedOnDate", from = "2018-01-01") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  run()

Regexp Filters

The function filter_regexp() adds a filter to a query that provides a regular expression that a field must match to be included in the results. Note that unfortuately Elasticsearch regular expressions are case sensitive.

To aggregate tags but only for documents that have a tag that contain "Corona", for example:

query_agg(con) %>%
  agg_by_field("tags") %>%
  filter_regexp("tags", ".*Corona.*") %>%
  run()

Note that here we are filtering on a field that is an array of values for each document. Because of this, we will get more tags in our resulting aggregation than tags that include "Corona". Here, we are counting all tags that are present in each article that contains a tag matching "Corona".

Match Filters

The function filter_match() specifies a filter to only include documents where the specified field contains a match for the provided string.

For example, to further refine our aggregation to only include documents where a match for the string "disease" is found in the full text:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_date("processedOnDate", calendar_interval("1w")) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()

Simple query string filters

You can add a simple query string filter by using the function filter_sqs(). This allows you to specify field name or vector of field names to search, and then a query string to search in these fields.

Below we are searching fields "fullText" and "title" for the specified search string.

docs <- query_fetch(con, max = 100) %>%
  filter_sqs(c("fullText", "title"), "((health | healthcare | \\\"health care\\\" | medical) + (specialist* | provider* | professional* | practitioner* | doctor* | worker* | personnel | staff)) | physician* | \\\"general practitioner\\\" | therapist*") %>%
  select_fields(c("fullText", "title")) %>%
  run()

Boolean filter operators

All of the filter_*() functions have a boolean parameter bool that allows you to specify the boolean logic for the filter. The supported options are described here. The default is "must", meaning that the specified filter must appear in the returned documents.

Below, we are filtering on the field "tags", saying that we want documents with tags "l:WPRO", but no documents with tags "l:CoronavirusInfection".

docs <- query_fetch(con, max = 100) %>%
  filter_terms("tags", "l:WPRO") %>%
  filter_terms("tags", "l:CoronavirusInfection", bool = "must_not") %>%
  run()

Fetch Queries

Fetch queries simply retrieve documents based on filtering criteria. All of the filtering functions specified above apply to these queries.

Initiating a Query

Similar to aggregation queries, a fetch query is initialized using query_fetch(), which takes as its primary argument the connection object.

One optional argument of note to this function is path, which specifies a directory to write docuents to as they are fetched. If this is not specified, results will be read into memory. If the result set looks like it will be very large, a warning is provided that encourages the user to provide a path and write to disk.

If we intialize a fetch query with no refinements, it will returl all documents in the index.

For example, with our example index which contains 10k documents:

docs <- query_fetch(con) %>%
  run()

This will fetch all 10k documents and return them as a large list to docs.

Note that fetch queries automatically take care of scrolling to retrieve potentially very large sets of documents. The scroll limit is 10k documents, so iterative queries are run to fetch these in batches and piece them together upon retrieval.

Adding Filters to Fetch Queries

It is probably more desirable for a fetch query to pinpoint records of interest rather than retrieve all documents. This can be done using filter queries as we specified earlier.

For example, to fetch all documents matching the filtering criteria we specified in the final aggregation example:

docs <- query_fetch(con) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()

Checking to see how large query results will be

Sometimes we may wish to see how large a query is before executing it. To do this, we can replace run() with n_docs(). To make query execution even faster, we can specify max = 0 so that only no documents are fetched.

query_fetch(con, max = 0) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  n_docs()

Fetching a subset of records

There is an argument max in query_fetch() that can be used to specify that we just want to fetch the first max documents. This can be useful if we want to experiment with our query and the resulting data before doing a full fetch.

docs <- query_fetch(con, max = 10) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()

Fetching to Disk

In the previous fetch examples, the return object docs has been a list format of the document content of the query.

In a many cases we may wish to do a bulk download of many articles. If we specify a path argument to query_fetch(), the results will be written in batches to the specified directory.

For example, to write our last query to disk, we specify a directory in our query initizilaztion. Also, note that to simulate scrolling, we specify each iteration of the query to retrieve 10 documents (instead of the default 10k documents) with the size argument. With this, we see that two files get written, one for each scroll.

tf <- tempfile()
dir.create(tf)
docs <- query_fetch(con, path = tf, size = 10) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  filter_terms("affectedCountriesIso", c("US", "CA")) %>%
  filter_match("fullText", "disease") %>%
  run()

list.files(docs)

Specifying fields to sort on

Another operation available only for fetch queries is sort_docs(), which allows you to specify fields to sort by as part of the fetch.

For example:

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  sort_docs("processedOnDate") %>%
  run()

sapply(docs, function(x) x$`_source`$processedOnDate)

You can provide a vector of field names for nested sorting.

You can also wrap fields with utility functions asc() and desc() to specify the sort order:

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  sort_docs(desc("processedOnDate")) %>%
  run()

sapply(docs, function(x) x$`_source`$processedOnDate)

Specifying fields to return

An operation available only for fetch queries, select_fields(), allows us to specify which fields should be returned for each document. This is useful of documents contain some fields that are very large and we don't want to include them in our results.

To see what values are acceptable for a selectable field:

selectable_fields(con)

For example, to return just the fields "source.countryIso" and "locations":

docs <- query_fetch(con, size = 10, max = 25) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  select_fields(c("source.id", "triggers", "locations")) %>%
  run()

str(docs[[1]]$`_source`, 2)

Ad hoc Queries

The interface provided by query_fetch(), query_agg(), and the accompanying filter and sort functions is meant to enable clean and precise definition of Elasticsearch queries, hiding all the messy details of scrolling, aggregation bucketing, query JSON specification, execution, etc. There may be times when this interface does not quite provide enough flexibility. In this scenario, a function, query_str(), has been created that allows you to provide a string specifying an Elasticsearch query that gets executed.

For example, suppose we would like to fetch all documents retrieved after 2020-03-10. As we have seen before, we can do this with the following:

docs <- query_fetch(con, size = 1000) %>%
  filter_range("processedOnDate", from = "2020-03-10") %>%
  run()

Suppose we want to run a query similar to this, but we need to modify the query a bit in a way that isn't available through the interface provided in this package. Let's first look at what the resulting query string is:

query_fetch(con, size = 1000) %>%
  filter_range("processedOnDate", from = "2020-03-10")

For illustration purposes, we will take this string and run it directly without modification. To run this query string, we can do the following:

str <- '{
  "size": 1000,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "processedOnDate": {
              "gte": "2020-03-10"
            }
          }
        }
      ]
    }
  }
}'

res <- query_str(con, str = str) %>%
  run()

Here we simply specified to run the query specified by this string. The structure of the output looks like this:

str(res, 2)

Note that this is the raw output provided by Elasticsearch and not the more convenient list output provided by query_fetch(). Also note that it only pulled 1000 documents when we know there are more results in this query. These limitations occur because query_str() does not know what kind of query specificaiton it is going to receive, and therefore does not know how to best process the output.

To fully take advantage of the fetch and aggregation conveniences provided by this package, an additional type parameter is available to query_str() which by default is "unkown", but can also be set to "fetch" or "agg".

With our current example, if we add type = "fetch" to our call to query_str(), the execution now knows to handle the query as a fetch query and will take care of scrolling, etc. to fetch all of the documents.

res <- query_str(con, str = str, type = "fetch") %>%
  run()

length(res)

While this example wasn't very imaginative, suppose there is more complex filter logic you want to apply. A typical use case might be to use query_fetch() and associated filter/sort functions to get you started, modify the query string as needed, and pass that to query_str(). Note, however, that there is less protection in this scenario from errors in the query specification, so you can expect to see more error messages which will result in the need to debug your query string.

Similarly, we can apply query_str() to aggregation-type queries. Let's take the following example:

query_agg(con) %>%
  agg_by_field("tags") %>%
  agg_by_field("affectedCountriesIso")

We can run this same query using query_str() as follows:

str <- '{
  "size": 0,
  "aggs": {
    "agg_results": {
      "composite": {
        "size": 1000,
        "sources": [
          {
            "tags": {
              "terms": {
                "field": "tags"
              }
            }
          },
          {
            "affectedCountriesIso": {
              "terms": {
                "field": "affectedCountriesIso"
              }
            }
          }
        ]
      }
    }
  }
}'

res <- query_str(con, str = str) %>%
  run()

str(res, 3)

As before, we see that query_str() is "dumb" in that it doesn't know we are doing an aggregation and doesn't know to do the smart iteration to fetch the rest of the aggregation buckets as well as putting the result into a data frame.

To make query_str() "smart", we can add type = "agg":

res <- query_str(con, str = str, type = "agg") %>%
  run()

res

Limitations

This package is experimental and has not undergone rigorous testing to verify the correctness of the constructed queries. Use at your own risk.

The package has been written to cover a large number of immediate use cases. However, there are many additional features and parameters of Elasticsearch that could be exposed through this interface in the future.



hafen/elasticquery documentation built on May 17, 2021, 3:01 p.m.