#' MongoDB client
#'
#' Connect to a MongoDB collection. Returns a \code{mongo} connection object with
#' methods listed below.
#'
#' @export
#' @aliases mongolite
#' @param url address of the mongodb server in mongo connection string
#' \href{http://docs.mongodb.org/manual/reference/connection-string/}{URI format}.
#' @param db name of database
#' @param collection name of collection
#' @param verbose emit some more output
#' @return Upon success returns a pointer to a collection on the server.
#' The collection can be interfaced using the methods described below.
#' @examples # Connect to mongolabs
#' con <- mongo("mtcars", url = "mongodb://readwrite:test@ds043942.mongolab.com:43942/jeroen_test")
#' if(con$count() > 0) con$drop()
#' con$insert(mtcars)
#' stopifnot(con$count() == nrow(mtcars))
#'
#' # Query data
#' mydata <- con$find()
#' stopifnot(all.equal(mydata, mtcars))
#' con$drop()
#'
#' # Automatically disconnect when connection is removed
#' rm(con)
#' gc()
#'
#' \dontrun{
#' # dplyr example
#' library(nycflights13)
#'
#' # Insert some data
#' m <- mongo(collection = "nycflights")
#' m$drop()
#' m$insert(flights)
#'
#' # Basic queries
#' m$count('{"month":1, "day":1}')
#' jan1 <- m$find('{"month":1, "day":1}')
#'
#' # Sorting
#' jan1 <- m$find('{"month":1,"day":1}', sort='{"distance":-1}')
#' head(jan1)
#'
#' # Sorting on large data requires index
#' m$index(add = "distance")
#' allflights <- m$find(sort='{"distance":-1}')
#'
#' # Select columns
#' jan1 <- m$find('{"month":1,"day":1}', fields = '{"_id":0, "distance":1, "carrier":1}')
#'
#' # List unique values
#' m$distinct("carrier")
#' m$distinct("carrier", '{"distance":{"$gt":3000}}')
#'
#' # Tabulate
#' m$aggregate('[{"$group":{"_id":"$carrier", "count": {"$sum":1}, "average":{"$avg":"$distance"}}}]')
#'
#' # Map-reduce (binning)
#' hist <- m$mapreduce(
#' map = "function(){emit(Math.floor(this.distance/100)*100, 1)}",
#' reduce = "function(id, counts){return Array.sum(counts)}"
#' )
#'
#' # Stream jsonlines into a connection
#' tmp <- tempfile()
#' m$export(file(tmp))
#'
#' # Remove the collection
#' m$drop()
#'
#' # Import from jsonlines stream from connection
#' dmd <- mongo("diamonds")
#' dmd$import(url("http://jeroenooms.github.io/data/diamonds.json"))
#' dmd$count()
#'
#' # Export
#' dmd$drop()
#' }
#' @section Methods:
#' \describe{
#' \item{\code{aggregate(pipeline = '{}', handler = NULL, pagesize = 1000)}}{Execute a pipeline using the Mongo aggregation framework.}
#' \item{\code{count(query = '{}')}}{Count the number of records matching a given \code{query}. Default counts all records in collection.}
#' \item{\code{distinct(key, query = '{}')}}{List unique values of a field given a particular query.}
#' \item{\code{drop()}}{Delete entire collection with all data and metadata.}
#' \item{\code{export(con = stdout(), bson = FALSE)}}{Streams all data from collection to a \code{\link{connection}} in \href{http://ndjson.org}{jsonlines} format (similar to \href{http://docs.mongodb.org/v2.6/reference/mongoexport/}{mongoexport}). Alternatively when \code{bson = TRUE} it outputs the binary \href{http://bsonspec.org/faq.html}{bson} format (similar to \href{http://docs.mongodb.org/manual/reference/program/mongodump/}{mongodump}).}
#' \item{\code{find(query = '{}', fields = '{"_id" : 0}', sort = '{}', skip = 0, limit = 0, handler = NULL, pagesize = 1000)}}{Retrieve \code{fields} from records matching \code{query}. Default \code{handler} will return all data as a single dataframe.}
#' \item{\code{import(con, bson = FALSE)}}{Stream import data in \href{http://ndjson.org}{jsonlines} format from a \code{\link{connection}}, similar to the \href{http://docs.mongodb.org/v2.6/reference/mongoimport/}{mongoimport} utility. Alternatively when \code{bson = TRUE} it assumes the binary \href{http://bsonspec.org/faq.html}{bson} format (similar to \href{http://docs.mongodb.org/manual/reference/program/mongorestore/}{mongorestore}).}
#' \item{\code{index(add = NULL, remove = NULL)}}{List, add, or remove indexes from the collection. The \code{add} and \code{remove} arguments can either be a field name or json object. Returns a dataframe with current indexes.}
#' \item{\code{info()}}{Returns collection statistics and server info (if available).}
#' \item{\code{insert(data, pagesize = 1000, ...)}}{Insert rows into the collection. Argument 'data' must be a data-frame, named list (for single record) or character vector with json strings (one string for each row). For lists and data frames, arguments in \code{...} get passed to \code{\link[jsonlite:toJSON]{jsonlite::toJSON}}}
#' \item{\code{iterate(query = '{}', fields = '{"_id":0}', sort = '{}', skip = 0, limit = 0)}}{Runs query and returns iterator to read single records one-by-one.}
#' \item{\code{mapreduce(map, reduce, query = '{}', sort = '{}', limit = 0, out = NULL, scope = NULL)}}{Performs a map reduce query. The \code{map} and \code{reduce} arguments are strings containing a JavaScript function. Set \code{out} to a string to store results in a collection instead of returning.}
#' \item{\code{remove(query = "{}", multiple = FALSE)}}{Remove record(s) matching \code{query} from the collection.}
#' \item{\code{rename(name, db = NULL)}}{Change the name or database of a collection. Changing name is cheap, changing database is expensive.}
#' \item{\code{update(query, update = '{"$set":{}}', upsert = FALSE, multiple = FALSE)}}{Replace or modify matching record(s) with value of the \code{update} argument.}
#' }
#' @references Jeroen Ooms (2014). The \code{jsonlite} Package: A Practical and Consistent Mapping Between JSON Data and \R{} Objects. \emph{arXiv:1403.2805}. \url{http://arxiv.org/abs/1403.2805}
mongo <- function(collection = "test", db = "test", url = "mongodb://localhost", verbose = TRUE){
client <- mongo_client_new(url)
# workaround for missing 'mongoc_client_get_default_database'
if(missing(db) || is.null(db)){
if(!is.null(url_db <- mongo_get_default_database(client)))
db <- url_db
}
col <- mongo_collection_new(client, collection, db)
mongo_collection_command_simple(col, '{"ping":1}')
orig <- list(
name = tryCatch(mongo_collection_name(col), error = function(e){collection}),
db = db,
url = url
)
mongo_object(col, client, verbose = verbose, orig)
}
mongo_object <- function(col, client, verbose, orig){
# Check if the ptr has died and automatically recreate it
check_col <- function(){
if(null_ptr(col)){
if(verbose)
message("Trying to reconnect with mongo...")
client <<- mongo_client_new(orig$url)
col <<- mongo_collection_new(client, orig$name, orig$db)
}
}
# The reference object
self <- local({
insert <- function(data, pagesize = 1000, ...){
check_col()
if(is.data.frame(data)){
mongo_stream_out(data, col, pagesize = pagesize, verbose = verbose, ...)
} else if(is.list(data) && !is.null(names(data))){
mongo_collection_insert_page(col, jsonlite::toJSON(data, ...))
} else if(is.character(data)) {
if(!all(is_valid <- vapply(data, jsonlite::validate, logical(1), USE.NAMES = FALSE))){
el <- paste(which(!is_valid), collapse = ", ")
stop("Argument 'data' is a character vector but contains invalid JSON at elements: ", el)
}
if(!all(is_valid <- grepl("^\\s*\\{", data))){
el <- paste(which(!is_valid), collapse = ", ")
stop("Argument 'data' contains strings that are not JSON objects at elements: ", el)
}
mongo_collection_insert_page(col, data)
} else {
stop("Argument 'data' must be a data frame, named list, or character vector with json strings")
}
}
find <- function(query = '{}', fields = '{"_id":0}', sort = '{}', skip = 0, limit = 0, handler = NULL, pagesize = 1000){
check_col()
cur <- mongo_collection_find(col, query = query, sort = sort, fields = fields, skip = skip, limit = limit)
mongo_stream_in(cur, handler = handler, pagesize = pagesize, verbose = verbose)
}
iterate <- function(query = '{}', fields = '{"_id":0}', sort = '{}', skip = 0, limit = 0) {
check_col()
cur <- mongo_collection_find(col, query = query, sort = sort, fields = fields, skip = skip, limit = limit)
# make sure 'col' does not go out of scope to prevent gc
mongo_iterator(cur, col)
}
export <- function(con = stdout(), bson = FALSE){
check_col()
if(isTRUE(bson)){
mongo_dump(col, con, verbose = verbose)
} else {
mongo_export(col, con, verbose = verbose)
}
}
import <- function(con, bson = FALSE){
check_col()
if(isTRUE(bson)){
mongo_restore(col, con, verbose = verbose)
} else {
mongo_import(col, con, verbose = verbose)
}
}
aggregate <- function(pipeline = '{}', handler = NULL, pagesize = 1000){
check_col()
cur <- mongo_collection_aggregate(col, pipeline)
mongo_stream_in(cur, handler = handler, pagesize = pagesize, verbose = verbose)
}
count <- function(query = '{}'){
check_col()
mongo_collection_count(col, query)
}
remove <- function(query, multiple = FALSE){
check_col()
mongo_collection_remove(col, query, multiple)
}
drop <- function(){
check_col()
mongo_collection_drop(col)
}
update <- function(query, update = '{"$set":{}}', upsert = FALSE, multiple = FALSE){
check_col()
mongo_collection_update(col, query, update, upsert, multiple)
}
mapreduce <- function(map, reduce, query = '{}', sort = '{}', limit = 0, out = NULL, scope = NULL){
check_col()
cur <- mongo_collection_mapreduce(col, map = map, reduce = reduce, query = query,
sort = sort, limit = limit, out = out, scope = scope)
results <- mongo_stream_in(cur, verbose = FALSE)
if(is.null(out))
results[[1, "results"]]
else
results
}
distinct <- function(key, query = '{}'){
check_col()
out <- mongo_collection_distinct(col, key, query)
jsonlite:::simplify(out$values)
}
info <- function(){
check_col()
list(
name = mongo_collection_name(col),
stats = tryCatch(mongo_collection_stats(col), error = function(e) NULL),
server = mongo_client_server_status(client)
)
}
rename <- function(name, db = NULL){
check_col()
out <- mongo_collection_rename(col, db, name)
orig <<- list(
name = tryCatch(mongo_collection_name(col), error = function(e){name}),
db = ifelse(is.null(db), orig$db, db),
url = orig$url
)
orig
}
index <- function(add = NULL, remove = NULL){
check_col()
if(length(add))
mongo_collection_create_index(col, add);
if(length(remove))
mongo_collection_drop_index(col, remove);
mongo_collection_find_indexes(col)
}
environment()
})
lockEnvironment(self, TRUE)
structure(self, class=c("mongo", "jeroen", class(self)))
}
#' @export
print.mongo <- function(x, ...){
parent.env(x)$check_col()
print.jeroen(x, title = paste0("<Mongo collection> '", mongo_collection_name(parent.env(x)$col), "'"))
}
#setGeneric("serialize")
#setOldClass("jeroen")
#setMethod("serialize", "jeroen", function(object, connection){
# if(!missing(connection)) {
# writeBin(bson_to_raw(object), connection)
# } else {
# bson_to_raw(object);
# }
#});
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.