#' @import methods
#' @import magrittr
#' @import mongolite
NULL
mongoPrimitiveOps = c("&"
, "|"
, "=="
, "!="
, ">"
, ">="
, "<"
, "<="
, "%in%"
, "+"
, "-"
, "/"
, "*"
)
exprParser <- function()
{
callingFrame <- parent.frame(2)
function(expr, dollarForMemberAccess = F)
{
visit <- function(x)
{
if (is.atomic(x))
{
return(list(type = "atomic", value = x))
}
else if (is.name(x))
{
value = deparse(x)
# period prefix indicates a mongo field (some contexts require the $ prefix)
if (startsWith(value, "."))
{
value <- sub("^\\.", ifelse(dollarForMemberAccess, "$", ""), value)
return(list(type = "member", value = value))
}
# otherwise, it's an R object, so add that actual value to the ast
else
{
obj <- get(value, envir = callingFrame)
return(list(type = "atomic", value = obj))
}
}
else if (is.call(x))
{
op <- visit(x[[1]])
opText <- deparse(x[[1]])
argCount <- length(x) - 1
# make an exception for negated literals - they're actually calls in R
# resolve them ahead of time instead of shelling out to an invalid $subtract call
isPrimitiveException <- opText == "-" & argCount == 1 & is.numeric(x[[2]])
# if it's a primitive op, or we're just proxying to a mongo expression,
# then create an ast node to render that out later
if (!isPrimitiveException & (opText %in% mongoPrimitiveOps | startsWith(opText, ".")))
{
args <- lapply(x[2:length(x)], visit)
opText <- sub("^\\.", "", opText)
return(list(type = "call", op = opText, args = args))
}
# otherwise, it's a proper R function call (and so contains no mongo field/func refs)
# so resolve the return value pre-query and sub that in as an atomic ast node
else
{
rargs <- list()
if (argCount > 0)
{
rargs <- as.list(x[2:length(x)])
}
result <- do.call(op$value, rargs, envir = callingFrame)
return(list(type = "atomic", value = result))
}
}
else
{
stop(paste0("unhandled type: ", typeof(x))) # nocov
}
}
visit(expr)
}
}
mongoAstToList <- function(ast)
{
visit <- function(node)
{
arrayExpr <- function(name, requiredArity = NA)
{
function(node)
{
ret <- list()
args <- lapply(node$args, visit)
if (!is.na(requiredArity) & length(args) != requiredArity)
{
stop(paste("Expected", requiredArity, "args for", name, "operator, got", length(args))) # nocov
}
ret[[paste0("$", name)]] <- args
ret
}
}
binaryExpr <- function(name, requiresArray = F)
{
function(node)
{
argList <- list()
rhs <- visit(node$args[[2]])
if (requiresArray & length(rhs) == 1)
{
rhs <- list(rhs)
}
argList[[paste0("$", name)]] <- rhs
ret <- list()
field <- visit(node$args[[1]])
ret[[field]] <- argList
ret
}
}
renderFunc <- function(node)
{
ret <- list()
ret[[paste0("$", node$op)]] <- visit(node$args[[1]])
ret
}
renderSubobject <- function(node)
{
lapply(node$args, visit)
}
if (node$type == "call")
{
func <- switch(node$op,
# expressions
"&" = { arrayExpr("and") },
"|" = { arrayExpr("or") },
"==" = { binaryExpr("eq") },
"!=" = { binaryExpr("ne") },
">" = { binaryExpr("gt") },
">=" = { binaryExpr("gte") },
"<" = { binaryExpr("lt") },
"<=" = { binaryExpr("lte") },
"+" = { arrayExpr("add") },
"-" = { arrayExpr("subtract", 2) },
"*" = { arrayExpr("multiply") },
"/" = { arrayExpr("divide", 2) },
"%in%" = { binaryExpr("in", T) },
"substr" = { arrayExpr("substr", 3) },
# accumulators
"sum" = { renderFunc },
"min" = { renderFunc },
"max" = { renderFunc },
"first" = { renderFunc },
"list" = { renderSubobject },
stop(paste("unhandled operator:", node$op))
)
func(node)
}
else if (node$type == "atomic" || node$type == "member")
{
node$value
}
else
{
stop(paste("unhandled type:", node$type)) # nocov
}
}
visit(ast)
}
createStage <- function(name, payload)
{
ret <- list()
ret[[paste0("$", name)]] <- payload
jsonlite::toJSON(ret, auto_unbox = T)
}
#' Represents a MongoDB aggregation pipeline query.
#' @slot stages Character vector containing the current pipeline stages, serialised as JSON.
#' @export MongoPipeline
MongoPipeline <- setClass("MongoPipeline", slots = c(stages = "character"))
#' Add a $match stage to a MongoPipeline.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param expr An expression indicating the requirements to match against.
#' @return A copy of the previous pipeline, with the new stage added.
#' @rdname mmatch-methods
#' @export
setGeneric("mmatch", function(p, expr) standardGeneric("mmatch"))
#' @rdname mmatch-methods
#' @aliases mmatch,MongoPipeline-method
setMethod("mmatch", signature(p = "MongoPipeline"),
function(p, expr)
{
parser <- exprParser()
ast <- parser(substitute(expr))
stage <- createStage("match", mongoAstToList(ast))
p@stages <- c(p@stages, stage)
p
})
#' Add a $limit stage to a MongoPipeline.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param limit The number of documents to include in the result.
#' @return A copy of the previous pipeline, with the new stage added.
#' @rdname mlimit-methods
#' @export
setGeneric("mlimit", function(p, limit) standardGeneric("mlimit"))
#' @rdname mlimit-methods
#' @aliases mlimit,MongoPipeline,numeric-method
setMethod("mlimit", signature(p = "MongoPipeline", limit = "numeric"),
function(p, limit)
{
p@stages <- c(p@stages, createStage("limit", jsonlite::unbox(limit)))
p
})
#' Add a $group stage to a MongoPipeline.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param by The grouping variable(s), as an expression.
#' @param ... Zero or more name=value pairs where name represent
#' a field to create, and values are accumulator expressions.
#' @return A copy of the previous pipeline, with the new stage added.
#' @rdname mgroup-methods
#' @export
setGeneric("mgroup", function(p, by, ...) standardGeneric("mgroup"))
#' @rdname mgroup-methods
#' @aliases mgroup,MongoPipeline-method
setMethod("mgroup", signature(p = "MongoPipeline"),
function(p, by, ...)
{
subbedArgs <- as.list(substitute(list(...)))[-1L]
subbedArgs[["_id"]] = substitute(by)
parser <- exprParser()
asts <- lapply(subbedArgs, function(a) { parser(a, dollarForMemberAccess = T) })
argList <- lapply(asts, mongoAstToList)
p@stages <- c(p@stages, createStage("group", argList))
p
})
#' Add an $unwind stage to a MongoPipeline.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param expr An expression indicating the array field to unwind.
#' @return A copy of the previous pipeline, with the new stage added.
#' @rdname munwind-methods
#' @export
setGeneric("munwind", function(p, expr) standardGeneric("munwind"))
#' @rdname munwind-methods
#' @aliases munwind,MongoPipeline-method
setMethod("munwind", signature(p = "MongoPipeline"),
function(p, expr)
{
parser <- exprParser()
arg <- parser(substitute(expr), dollarForMemberAccess = T)
p@stages <- c(p@stages, createStage("unwind", mongoAstToList(arg)))
p
})
#' Add a $sort stage to a MongoPipeline.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param ... Named arguments for sort fields, indicating whether to sort in ascending (1) or descending (-1) order.
#' @return A copy of the previous pipeline, with the new stage added.
#' @rdname msort-methods
#' @export
setGeneric("msort", function(p, ...) standardGeneric("msort"))
#' @rdname msort-methods
#' @aliases msort,MongoPipeline-method
setMethod("msort", signature(p = "MongoPipeline"),
function(p, ...)
{
namedArgs <- lapply(list(...), jsonlite::unbox)
names(namedArgs) <- sapply(names(namedArgs), function(n) sub("^\\.", "", n))
p@stages <- c(p@stages, createStage("sort", namedArgs))
p
})
#' Execute a MongoPipeline's current query.
#' @param p A \linkS4class{MongoPipeline} instance.
#' @param client A connection to the mongo collection to execute against.
#' @return A data frame containing the aggregation pipeline results.
#' @rdname mexecute-methods
#' @export
setGeneric("mexecute", function(p, client) standardGeneric("mexecute"))
#' @rdname mexecute-methods
#' @aliases mexecute,MongoPipeline-method
setMethod("mexecute", signature(p = "MongoPipeline"),
function(p, client)
{
pipeline <- paste0("[", toString(p@stages), "]")
frame <- client$aggregate(pipeline)
flattenedFrame <- jsonlite::flatten(frame)
colnames(flattenedFrame) <- gsub("^_id", "id", colnames(flattenedFrame))
flattenedFrame
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.