knitr::opts_chunk$set( collapse = TRUE, comment = "#>" )
The first thing to do is to establish a connection with the Elasticsearch server. In our case, it is running locally on port 9200.
library(elasticquery) con <- es_connect( host = "127.0.0.1", port = 9200, primary_index = "eios-items_all")
library(elasticquery) con <- es_connect( host = "127.0.0.1", port = 9200, primary_index = "my-index")
This connection object is passed to queries so it knows where to go to run the query.
Note that we can also specify the primary_index
, which is the index used by default by queries we build unless we specify otherwise in the query. Here we are querying "my-index" by default.
Two major types of queries are currently supported by this package.
query_agg()
query_fetch()
After a query is initiated, it can built upon by piping various operations:
agg_by_field()
: specify a field to aggregate by, only for aggregation queriesagg_by_date()
: specify a date field and date binning to aggregate by, only for aggregation queriesfilter_match()
: specify a field to filter documents on according to a string match (partial for text fields, exact for keyword fields), for both aggregation and fetch queriesfilter_terms()
: specify a field to filter documents on according to a string exact match, for both aggregation and fetch queriesfilter_regexp()
: specify a field to filter documents on according to a regular expression match, for both aggregation and fetch queriesfilter_range()
: specify a field to filter documents on according to a specified range, for both aggregation and fetch queriesselect_fields()
: specify fields to select in the returned documents, only for fetch queriessort_docs()
: specify fields by which to sort the returned documents, only for fetch queries.Aggregation queries are constructed by doing the following:
query_agg()
agg_by_field()
agg_by_date()
To initiate an aggregation query, we use the function query_agg()
, and pass it our connection object.
query <- query_agg(con)
We can view the query's translation to an Elasticsearch search body string simply through printing the query.
query
Here, of course, the query is empty as we haven't specified aggregation dimensions yet.
Queries can be executed using the run()
function.
run(query)
Since the query is empty, nothing is returned.
To begin specifying fields to aggregate on, it can be helpful to get a view of what fields are available to aggregate on. This can be done by passing the connection object to `queryable_fields().
queryable_fields(con)
Note that aggregations make most sense when done against categorical variables with some bounded cardinality. Typically keywords make the most sense to aggregate against, but even with keywords, you should use care to think about what fields make sense to aggregate.
Suppose we want to tabulate the frequency of all of the fields in the index. We can do this by adding agg_by_field()
to our query, specifying the field name "tags".
query <- query_agg(con) %>% agg_by_field("tags")
The function agg_by_field()
, and all subsequent query modifying functions take a query object as its input and emit a modified query object as its output. This makes these functions suitable for piping, which is a convenient and expressive way to build queries.
To see what this new query looks like:
query
Note that aggregation queries use composite aggregation with paging, and running the query will automatically take care of recurrent queries until paging is done and bind the results together, saving a lot of tedious work.
We can retrieve the result of this query by calling run()
.
run(query)
We can continue to add more dimensions to the aggregation using pipes. For example, to count the frequency of both the fields "tags" and "affectedCountriesIso":
query_agg(con) %>% agg_by_field("tags") %>% agg_by_field("affectedCountriesIso") %>% run()
Suppose we want to get daily counts for each tag in the data. We can use a function agg_by_date()
, which by default aggregates daily.
Here, we aggregate on a document's field "processedOnDate".
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate") %>% run()
For finer control over the date binning, we can use functions calendar_interval()
and fixed_interval()
.
For example, to bin on calendar week:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate", calendar_interval("1w")) %>% run()
And to bin on every 10 days:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate", fixed_interval("10d")) %>% run()
We can further modify an aggregation query by specifying filters. Three types of filters are currently available:
Note that filters can apply to both aggregation and fetch queries.
Range filters are specifyind using filter_range()
, specifying the field to filter, and then specifying one or both of from
and to
values for the range.
For example, to take our earlier aggregation query and filter it to dates later than 2018-01-01:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate") %>% filter_range("processedOnDate", from = "2020-03-10") %>% run()
To filter on a date/time, the format is like the following:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate") %>% filter_range("processedOnDate", from = "2020-03-10T10:21:32") %>% run()
The funtion filter_terms()
adds a filter to a query that specifies certain values a field must have to be included in the aggregation.
For example, to add to our earlier query, suppose we require that "affectedCountriesIso" must contain "US" or "CA":
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate", calendar_interval("1w")) %>% filter_range("processedOnDate", from = "2018-01-01") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% run()
The function filter_regexp()
adds a filter to a query that provides a regular expression that a field must match to be included in the results. Note that unfortuately Elasticsearch regular expressions are case sensitive.
To aggregate tags but only for documents that have a tag that contain "Corona", for example:
query_agg(con) %>% agg_by_field("tags") %>% filter_regexp("tags", ".*Corona.*") %>% run()
Note that here we are filtering on a field that is an array of values for each document. Because of this, we will get more tags in our resulting aggregation than tags that include "Corona". Here, we are counting all tags that are present in each article that contains a tag matching "Corona".
The function filter_match()
specifies a filter to only include documents where the specified field contains a match for the provided string.
For example, to further refine our aggregation to only include documents where a match for the string "disease" is found in the full text:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_date("processedOnDate", calendar_interval("1w")) %>% filter_range("processedOnDate", from = "2020-03-10") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% filter_match("fullText", "disease") %>% run()
You can add a simple query string filter by using the function filter_sqs()
. This allows you to specify field name or vector of field names to search, and then a query string to search in these fields.
Below we are searching fields "fullText" and "title" for the specified search string.
docs <- query_fetch(con, max = 100) %>% filter_sqs(c("fullText", "title"), "((health | healthcare | \\\"health care\\\" | medical) + (specialist* | provider* | professional* | practitioner* | doctor* | worker* | personnel | staff)) | physician* | \\\"general practitioner\\\" | therapist*") %>% select_fields(c("fullText", "title")) %>% run()
All of the filter_*()
functions have a boolean parameter bool
that allows you to specify the boolean logic for the filter. The supported options are described here. The default is "must", meaning that the specified filter must appear in the returned documents.
Below, we are filtering on the field "tags", saying that we want documents with tags "l:WPRO", but no documents with tags "l:CoronavirusInfection".
docs <- query_fetch(con, max = 100) %>% filter_terms("tags", "l:WPRO") %>% filter_terms("tags", "l:CoronavirusInfection", bool = "must_not") %>% run()
Fetch queries simply retrieve documents based on filtering criteria. All of the filtering functions specified above apply to these queries.
Similar to aggregation queries, a fetch query is initialized using query_fetch()
, which takes as its primary argument the connection object.
One optional argument of note to this function is path
, which specifies a directory to write docuents to as they are fetched. If this is not specified, results will be read into memory. If the result set looks like it will be very large, a warning is provided that encourages the user to provide a path
and write to disk.
If we intialize a fetch query with no refinements, it will returl all documents in the index.
For example, with our example index which contains 10k documents:
docs <- query_fetch(con) %>% run()
This will fetch all 10k documents and return them as a large list to docs
.
Note that fetch queries automatically take care of scrolling to retrieve potentially very large sets of documents. The scroll limit is 10k documents, so iterative queries are run to fetch these in batches and piece them together upon retrieval.
It is probably more desirable for a fetch query to pinpoint records of interest rather than retrieve all documents. This can be done using filter queries as we specified earlier.
For example, to fetch all documents matching the filtering criteria we specified in the final aggregation example:
docs <- query_fetch(con) %>% filter_range("processedOnDate", from = "2020-03-10") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% filter_match("fullText", "disease") %>% run()
Sometimes we may wish to see how large a query is before executing it. To do this, we can replace run()
with n_docs()
. To make query execution even faster, we can specify max = 0
so that only no documents are fetched.
query_fetch(con, max = 0) %>% filter_range("processedOnDate", from = "2020-03-10") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% filter_match("fullText", "disease") %>% n_docs()
There is an argument max
in query_fetch()
that can be used to specify that we just want to fetch the first max
documents. This can be useful if we want to experiment with our query and the resulting data before doing a full fetch.
docs <- query_fetch(con, max = 10) %>% filter_range("processedOnDate", from = "2020-03-10") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% filter_match("fullText", "disease") %>% run()
In the previous fetch examples, the return object docs
has been a list format of the document content of the query.
In a many cases we may wish to do a bulk download of many articles. If we specify a path
argument to query_fetch()
, the results will be written in batches to the specified directory.
For example, to write our last query to disk, we specify a directory in our query initizilaztion. Also, note that to simulate scrolling, we specify each iteration of the query to retrieve 10 documents (instead of the default 10k documents) with the size
argument. With this, we see that two files get written, one for each scroll.
tf <- tempfile() dir.create(tf) docs <- query_fetch(con, path = tf, size = 10) %>% filter_range("processedOnDate", from = "2020-03-10") %>% filter_terms("affectedCountriesIso", c("US", "CA")) %>% filter_match("fullText", "disease") %>% run() list.files(docs)
Another operation available only for fetch queries is sort_docs()
, which allows you to specify fields to sort by as part of the fetch.
For example:
docs <- query_fetch(con, size = 10, max = 25) %>% filter_range("processedOnDate", from = "2020-03-10") %>% sort_docs("processedOnDate") %>% run() sapply(docs, function(x) x$`_source`$processedOnDate)
You can provide a vector of field names for nested sorting.
You can also wrap fields with utility functions asc()
and desc()
to specify the sort order:
docs <- query_fetch(con, size = 10, max = 25) %>% filter_range("processedOnDate", from = "2020-03-10") %>% sort_docs(desc("processedOnDate")) %>% run() sapply(docs, function(x) x$`_source`$processedOnDate)
An operation available only for fetch queries, select_fields()
, allows us to specify which fields should be returned for each document. This is useful of documents contain some fields that are very large and we don't want to include them in our results.
To see what values are acceptable for a selectable field:
selectable_fields(con)
For example, to return just the fields "source.countryIso" and "locations":
docs <- query_fetch(con, size = 10, max = 25) %>% filter_range("processedOnDate", from = "2020-03-10") %>% select_fields(c("source.id", "triggers", "locations")) %>% run() str(docs[[1]]$`_source`, 2)
The interface provided by query_fetch()
, query_agg()
, and the accompanying filter and sort functions is meant to enable clean and precise definition of Elasticsearch queries, hiding all the messy details of scrolling, aggregation bucketing, query JSON specification, execution, etc. There may be times when this interface does not quite provide enough flexibility. In this scenario, a function, query_str()
, has been created that allows you to provide a string specifying an Elasticsearch query that gets executed.
For example, suppose we would like to fetch all documents retrieved after 2020-03-10. As we have seen before, we can do this with the following:
docs <- query_fetch(con, size = 1000) %>% filter_range("processedOnDate", from = "2020-03-10") %>% run()
Suppose we want to run a query similar to this, but we need to modify the query a bit in a way that isn't available through the interface provided in this package. Let's first look at what the resulting query string is:
query_fetch(con, size = 1000) %>% filter_range("processedOnDate", from = "2020-03-10")
For illustration purposes, we will take this string and run it directly without modification. To run this query string, we can do the following:
str <- '{ "size": 1000, "query": { "bool": { "filter": [ { "range": { "processedOnDate": { "gte": "2020-03-10" } } } ] } } }' res <- query_str(con, str = str) %>% run()
Here we simply specified to run the query specified by this string. The structure of the output looks like this:
str(res, 2)
Note that this is the raw output provided by Elasticsearch and not the more convenient list output provided by query_fetch()
. Also note that it only pulled 1000 documents when we know there are more results in this query. These limitations occur because query_str()
does not know what kind of query specificaiton it is going to receive, and therefore does not know how to best process the output.
To fully take advantage of the fetch and aggregation conveniences provided by this package, an additional type
parameter is available to query_str()
which by default is "unkown", but can also be set to "fetch" or "agg".
With our current example, if we add type = "fetch"
to our call to query_str()
, the execution now knows to handle the query as a fetch query and will take care of scrolling, etc. to fetch all of the documents.
res <- query_str(con, str = str, type = "fetch") %>% run() length(res)
While this example wasn't very imaginative, suppose there is more complex filter logic you want to apply. A typical use case might be to use query_fetch()
and associated filter/sort functions to get you started, modify the query string as needed, and pass that to query_str()
. Note, however, that there is less protection in this scenario from errors in the query specification, so you can expect to see more error messages which will result in the need to debug your query string.
Similarly, we can apply query_str()
to aggregation-type queries. Let's take the following example:
query_agg(con) %>% agg_by_field("tags") %>% agg_by_field("affectedCountriesIso")
We can run this same query using query_str()
as follows:
str <- '{ "size": 0, "aggs": { "agg_results": { "composite": { "size": 1000, "sources": [ { "tags": { "terms": { "field": "tags" } } }, { "affectedCountriesIso": { "terms": { "field": "affectedCountriesIso" } } } ] } } } }' res <- query_str(con, str = str) %>% run() str(res, 3)
As before, we see that query_str()
is "dumb" in that it doesn't know we are doing an aggregation and doesn't know to do the smart iteration to fetch the rest of the aggregation buckets as well as putting the result into a data frame.
To make query_str()
"smart", we can add type = "agg"
:
res <- query_str(con, str = str, type = "agg") %>% run() res
This package is experimental and has not undergone rigorous testing to verify the correctness of the constructed queries. Use at your own risk.
The package has been written to cover a large number of immediate use cases. However, there are many additional features and parameters of Elasticsearch that could be exposed through this interface in the future.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.