#' @importFrom jsonlite fromJSON
stream_progress <- function(stream) {
lastProgress <- invoke(stream, "lastProgress")
if (is.null(lastProgress)) {
NULL
}
else {
lastProgress %>%
invoke("toString") %>%
fromJSON()
}
}
#' View Stream
#'
#' Opens a Shiny gadget to visualize the given stream.
#'
#' @param stream The stream to visualize.
#' @param ... Additional optional arguments.
#'
#' @examples
#' \dontrun{
#' library(sparklyr)
#' sc <- spark_connect(master = "local")
#'
#' dir.create("iris-in")
#' write.csv(iris, "iris-in/iris.csv", row.names = FALSE)
#'
#' stream_read_csv(sc, "iris-in/") %>%
#' stream_write_csv("iris-out/") %>%
#' stream_view() %>%
#' stream_stop()
#' }
#' @export
stream_view <- function(
stream,
...) {
stream_package_check("shiny")
validate <- stream_progress(stream)
interval <- 1000
shinyUI <- get("shinyUI", envir = asNamespace("shiny"))
tags <- get("tags", envir = asNamespace("shiny"))
div <- get("div", envir = asNamespace("shiny"))
HTML <- get("HTML", envir = asNamespace("shiny"))
stream_package_check("r2d3")
d3Output <- get("d3Output", envir = asNamespace("r2d3"))
renderD3 <- get("renderD3", envir = asNamespace("r2d3"))
r2d3 <- get("r2d3", envir = asNamespace("r2d3"))
ui <- shinyUI(
div(
tags$head(
tags$style(HTML("
html, body, body > div {
width: 100%;
height: 100%;
margin: 0px;
}
"))
),
d3Output("plot", width = "100%", height = "100%")
)
)
options <- list(...)
observe <- get("observe", envir = asNamespace("shiny"))
invalidateLater <- get("invalidateLater", envir = asNamespace("shiny"))
server <- function(input, output, session) {
first <- stream_progress(stream)
output$plot <- renderD3(
r2d3(
data = list(
sources = as.list(first$sources$description),
sinks = as.list(first$sink$description)
),
script = system.file("streams/stream.js", package = "sparklyr"),
container = "div",
options = options
)
)
observe({
invalidateLater(interval, session)
data <- stream_progress(stream)
session$sendCustomMessage(type = "sparklyr_stream_view", list(
timestamp = data$timestamp,
rps = list(
"in" = if (is.numeric(data$inputRowsPerSecond)) floor(data$inputRowsPerSecond) else 0,
"out" = if (is.numeric(data$processedRowsPerSecond)) floor(data$processedRowsPerSecond) else 0
)
))
})
}
runGadget <- get("runGadget", envir = asNamespace("shiny"))
runGadget(ui, server)
stream
}
#' Stream Statistics
#'
#' Collects streaming statistics, usually, to be used with \code{stream_render()}
#' to render streaming statistics.
#'
#' @param stream The stream to collect statistics from.
#' @param stats An optional stats object generated using \code{stream_stats()}.
#'
#' @return A stats object containing streaming statistics that can be passed
#' back to the \code{stats} parameter to continue aggregating streaming stats.
#'
#' @examples
#' \dontrun{
#' sc <- spark_connect(master = "local")
#' sdf_len(sc, 10) %>%
#' spark_write_parquet(path = "parquet-in")
#'
#' stream <- stream_read_parquet(sc, "parquet-in") %>%
#' stream_write_parquet("parquet-out")
#'
#' stream_stats(stream)
#' }
#'
#' @export
stream_stats <- function(stream, stats = list()) {
data <- stream_progress(stream)
if (is.null(stats$stats)) {
stats$sources <- data$sources$description
stats$sink <- data$sink$description
stats$stats <- list()
}
stats$stats[[length(stats$stats) + 1]] <- list(
timestamp = data$timestamp,
rps = list(
"in" = if (is.numeric(data$inputRowsPerSecond)) floor(data$inputRowsPerSecond) else 0,
"out" = if (is.numeric(data$processedRowsPerSecond)) floor(data$processedRowsPerSecond) else 0
)
)
stats
}
#' Render Stream
#'
#' Collects streaming statistics to render the stream as an 'htmlwidget'.
#'
#' @param stream The stream to render
#' @param collect The interval in seconds to collect data before rendering the
#' 'htmlwidget'.
#' @param stats Optional stream statistics collected using \code{stream_stats()},
#' when specified, \code{stream} should be omitted.
#' @param ... Additional optional arguments.
#'
#' @examples
#' \dontrun{
#' library(sparklyr)
#' sc <- spark_connect(master = "local")
#'
#' dir.create("iris-in")
#' write.csv(iris, "iris-in/iris.csv", row.names = FALSE)
#'
#' stream <- stream_read_csv(sc, "iris-in/") %>%
#' stream_write_csv("iris-out/")
#'
#' stream_render(stream)
#' stream_stop(stream)
#' }
#' @export
stream_render <- function(
stream = NULL,
collect = 10,
stats = NULL,
...) {
stream_package_check("r2d3")
r2d3 <- get("r2d3", envir = asNamespace("r2d3"))
if (is.null(stats)) {
stats <- stream_stats(stream)
for (i in seq_len(collect)) {
Sys.sleep(1)
stats <- stream_stats(stream, stats)
}
}
r2d3(
data = list(
sources = as.list(stats$sources),
sinks = as.list(stats$sink),
stats = stats$stats
),
script = system.file("streams/stream.js", package = "sparklyr"),
container = "div",
options = options
)
}
stream_package_check <- function(x) {
if (!(x %in% installed.packages())) {
stop(glue::glue("The '{x}' package is required for this operation.") )
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.