stream_class <- function(stream) {
class(stream) <- c("spark_stream", class(stream))
stream
}
stream_status <- function(stream) {
invoke(stream, "status") %>%
invoke("json") %>%
jsonlite::fromJSON()
}
#' @export
print.spark_stream <- function(x, ...) {
id <- stream_id(x)
status <- stream_status(x)
active <- invoke(x, "isActive")
cat(
paste(
paste("Stream:", id),
paste("Status:", status$message),
paste("Active:", active),
"",
sep = "\n"
)
)
invisible(x)
}
#' Stops a Spark Stream
#'
#' Stops processing data from a Spark stream.
#'
#' @param stream The spark stream object to be stopped.
#'
#' @export
stream_stop <- function(stream) {
if (!is.null(stream$job)) rstudio_jobs_api()$add_job_progress(stream$job, 100L)
stopped <- invoke(stream, "stop")
stream_unregister(stream)
invisible(stopped)
}
stream_validate <- function(stream) {
waitSeconds <- cast_scalar_integer(spark_config_value(config, "sparklyr.stream.validate.timeout", 3)) %>%
as.difftime(units = "secs")
commandStart <- Sys.time()
while (!grepl("waiting", stream_status(stream)$message) &&
commandStart + waitSeconds > Sys.time()) {
Sys.sleep(0.1)
}
exception <- invoke(stream, "exception")
if (!invoke(exception, "isEmpty")) {
cause <- invoke(exception, "get") %>%
invoke("cause") %>%
invoke("getMessage")
if (identical(cause, NULL)) {
cause <- invoke(exception, "get") %>%
invoke("cause") %>%
invoke("toString")
}
stop(cause)
}
stream
}
#' Spark Stream Interval Trigger
#'
#' Creates a Spark structured streaming trigger to execute
#' over the specified interval.
#'
#' @param interval The execution interval specified in milliseconds.
#'
#' @seealso \code{\link{stream_trigger_continuous}}
#' @export
stream_trigger_interval <- function(
interval = 1000) {
structure(class = c("stream_trigger_interval"), list(
interval = interval
))
}
#' Spark Stream Continuous Trigger
#'
#' Creates a Spark structured streaming trigger to execute
#' continuously. This mode is the most performant but not all operations
#' are supported.
#'
#' @param checkpoint The checkpoint interval specified in milliseconds.
#'
#' @seealso \code{\link{stream_trigger_interval}}
#' @export
stream_trigger_continuous <- function(
checkpoint = 5000) {
structure(class = c("stream_trigger_continuous"), list(
interval = checkpoint
))
}
stream_trigger_create <- function(trigger, sc) {
UseMethod("stream_trigger_create")
}
#' @export
stream_trigger_create.stream_trigger_interval <- function(trigger, sc) {
invoke_static(sc, "org.apache.spark.sql.streaming.Trigger", "ProcessingTime", as.integer(trigger$interval))
}
#' @export
stream_trigger_create.stream_trigger_continuous <- function(trigger, sc) {
invoke_static(sc, "org.apache.spark.sql.streaming.Trigger", "Continuous", as.integer(trigger$interval))
}
#' Spark Stream's Name
#'
#' Retrieves the name of the Spark stream if available.
#'
#' @param stream The spark stream object.
#'
#' @export
stream_name <- function(stream) {
invoke(stream, "name")
}
#' Spark Stream's Identifier
#'
#' Retrieves the identifier of the Spark stream.
#'
#' @param stream The spark stream object.
#'
#' @export
stream_id <- function(stream) {
invoke(stream, "id") %>% invoke("toString")
}
sdf_collect_stream <- function(x, ...) {
sc <- spark_connection(x)
args <- list(...)
n <- args$n
memory <- stream_write_memory(
x,
trigger = stream_trigger_interval(interval = 0)
)
data <- data.frame()
waitSeconds <- cast_scalar_integer(spark_config_value(config, "sparklyr.stream.collect.timeout", 5))
commandStart <- Sys.time()
while (nrow(data) == 0 && commandStart + waitSeconds > Sys.time()) {
data <- tbl(sc, stream_name(memory))
data <- data %>%
spark_dataframe() %>%
sdf_collect(n = n)
Sys.sleep(0.1)
}
stream_stop(memory)
data
}
stream_generate_test_entry <- function(
df,
path,
distribution,
iterations,
interval,
idxDataStart = 1,
idxDataEnd = 1,
idxDist = 1,
idxFile = 1) {
later <- get("later", envir = asNamespace("later"))
if (idxFile > iterations) {
return()
}
if (idxDist > length(distribution)) idxDist <- 1
idxDataStart <- idxDataEnd %% nrow(df)
idxDataEnd <- idxDataStart + distribution[idxDist]
dfCycle <- df
while (nrow(dfCycle) < idxDataEnd) {
dfCycle <- rbind(dfCycle, df)
}
rows <- dfCycle[idxDataStart:idxDataEnd, ]
write.csv(rows, file.path(path, paste0("stream_", idxFile, ".csv")), row.names = FALSE)
Sys.sleep(interval)
idxDist <- idxDist + 1
idxFile <- idxFile + 1
later(function() {
stream_generate_test_entry(
df,
path,
distribution,
iterations,
interval,
idxDataStart,
idxDataEnd,
idxDist,
idxFile
)
}, interval)
}
#' Generate Test Stream
#'
#' Generates a local test stream, useful when testing streams locally.
#'
#' @param df The data frame used as a source of rows to the stream, will
#' be cast to data frame if needed. Defaults to a sequence of one thousand
#' entries.
#' @param path Path to save stream of files to, defaults to \code{"source"}.
#' @param distribution The distribution of rows to use over each iteration,
#' defaults to a binomial distribution. The stream will cycle through the
#' distribution if needed.
#' @param iterations Number of iterations to execute before stopping, defaults
#' to fifty.
#' @param interval The inverval in seconds use to write the stream, defaults
#' to one second.
#'
#' @details This function requires the \code{callr} package to be installed.
#'
#' @importFrom stats runif
#' @export
stream_generate_test <- function(
df = rep(1:1000),
path = "source",
distribution = floor(10 + 1e5 * stats::dbinom(1:20, 20, 0.5)),
iterations = 50,
interval = 1) {
if (!"later" %in% installed.packages()) {
stop("'stream_generate_test()' requires the 'later' package.")
}
if (!is.data.frame(df)) df <- as.data.frame(df)
if (!dir.exists(path)) dir.create(path, recursive = TRUE)
stream_generate_test_entry(df, path, distribution, iterations, interval)
}
#' Find Stream
#'
#' Finds and returns a stream based on the stream's identifier.
#'
#' @param sc The associated Spark connection.
#' @param id The stream identifier to find.
#'
#' @examples
#' \dontrun{
#' sc <- spark_connect(master = "local")
#' sdf_len(sc, 10) %>%
#' spark_write_parquet(path = "parquet-in")
#'
#' stream <- stream_read_parquet(sc, "parquet-in") %>%
#' stream_write_parquet("parquet-out")
#'
#' stream_id <- stream_id(stream)
#' stream_find(sc, stream_id)
#' }
#'
#' @export
stream_find <- function(sc, id) {
spark_session(sc) %>%
invoke("streams") %>%
invoke("get", id) %>%
stream_class()
}
#' Watermark Stream
#'
#' Ensures a stream has a watermark defined, which is required for some
#' operations over streams.
#'
#' @param x An object coercable to a Spark Streaming DataFrame.
#' @param column The name of the column that contains the event time of the row,
#' if the column is missing, a column with the current time will be added.
#' @param threshold The minimum delay to wait to data to arrive late, defaults
#' to ten minutes.
#'
#' @export
stream_watermark <- function(x, column = "timestamp", threshold = "10 minutes") {
sdf <- spark_dataframe(x)
sc <- spark_connection(x)
if (!column %in% invoke(sdf, "columns")) {
sdf <- sdf %>%
invoke(
"withColumn",
column,
invoke_static(sc, "org.apache.spark.sql.functions", "expr", "current_timestamp()")
)
}
sdf %>%
invoke("withWatermark", column, threshold) %>%
sdf_register()
}
to_milliseconds <- function(dur) {
if (is.numeric(dur)) {
dur
} else if (is.character(dur)) {
m <- regmatches(dur, regexec("^([0-9]+)\\s*([[:alpha:]]+)$", dur))[[1]]
conversions <- list(
ms = 1L,
msec = 1L,
msecs = 1L,
millisecond = 1L,
milliseconds = 1L,
s = 1000L,
sec = 1000L,
secs = 1000L,
second = 1000L,
seconds = 1000L,
m = 60000L,
min = 60000L,
mins = 60000L,
minute = 60000L,
minutes = 60000L,
h = 3600000L,
hr = 3600000L,
hrs = 3600000L,
hour = 3600000L,
hours = 3600000L,
d = 86400000L,
day = 86400000L,
days = 86400000L
)
if (length(m) == 0 || !(m[[3]] %in% names(conversions))) {
stop(dur, " is not a valid time duration specification")
}
as.integer(m[[2]]) * conversions[[m[[3]]]]
} else {
stop(
"time duration specification must be an integer (representing number of ",
"milliseconds) or a valid time duration string (e.g., '5s')"
)
}
}
#' Apply lag function to columns of a Spark Streaming DataFrame
#'
#' Given a streaming Spark dataframe as input, this function will return another
#' streaming dataframe that contains all columns in the input and column(s) that
#' are shifted behind by the offset(s) specified in `...` (see example)
#'
#' @param x An object coercable to a Spark Streaming DataFrame.
#' @param cols A list of expressions for a single or multiple variables to create
#' that will contain the value of a previous entry.
#' @param thresholds Optional named list of timestamp column(s) and
#' corresponding time duration(s) for deterimining whether a previous record
#' is sufficiently recent relative to the current record.
#' If the any of the time difference(s) between the current and a previous
#' record is greater than the maximal duration allowed, then the previous
#' record is discarded and will not be part of the query result.
#' The durations can be specified with numeric types (which will be
#' interpreted as max difference allowed in number of milliseconds between 2
#' UNIX timestamps) or time duration strings such as "5s", "5sec", "5min",
#' "5hour", etc.
#' Any timestamp column in `x` that is not of timestamp of date Spark SQL
#' types will be interepreted as number of milliseconds since the UNIX epoch.
#'
#' @examples
#' \dontrun{
#'
#' library(sparklyr)
#'
#' sc <- spark_connect(master = "local", version = "2.2.0")
#'
#' streaming_path <- tempfile("days_df_")
#' days_df <- dplyr::tibble(
#' today = weekdays(as.Date(seq(7), origin = "1970-01-01"))
#' )
#' num_iters <- 7
#' stream_generate_test(
#' df = days_df,
#' path = streaming_path,
#' distribution = rep(nrow(days_df), num_iters),
#' iterations = num_iters
#' )
#'
#' stream_read_csv(sc, streaming_path) %>%
#' stream_lag(cols = c(yesterday = today ~ 1, two_days_ago = today ~ 2)) %>%
#' collect() %>%
#' print(n = 10L)
#' }
#'
#' @export
stream_lag <- function(x, cols, thresholds = NULL) {
sc <- spark_connection(x)
spark_require_version(sc, "2.0.0")
if (!sdf_is_streaming(x)) {
stop("expected a streaming dataframe as input")
}
exprs <- rlang::exprs(!!!cols)
src_cols <- NULL
offsets <- NULL
dst_cols <- NULL
for (i in seq_along(exprs)) {
dst <- names(exprs[i])
expr <- exprs[[i]]
if (length(expr) != 3 || as.character(expr[[1]]) != "~" ||
!is.numeric(expr[[3]])) {
stop(
"expected `...` to be a comma-separated list of expressions of form",
"<destination column> = <source column> ~ <offset>"
)
}
src_cols <- c(src_cols, as.character(expr[[2]]))
offsets <- c(offsets, as.integer(expr[[3]]))
dst_cols <- c(dst_cols, dst)
}
invoke_static(
sc,
"sparklyr.StreamUtils",
"lag",
x %>% spark_dataframe(),
as.list(src_cols),
as.list(offsets),
as.list(dst_cols),
as.list(names(thresholds)),
lapply(unname(thresholds), to_milliseconds)
) %>%
sdf_register()
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.