library(shiny)
library(shinySignals)
library(dplyr)
library(bubbles)
# An empty prototype of the data frame we want to create
prototype <- data.frame(date = character(), time = character(),
size = numeric(), r_version = character(), r_arch = character(),
r_os = character(), package = character(), version = character(),
country = character(), ip_id = character(), received = numeric())
# Connects to streaming log data for cran.rstudio.com and
# returns a reactive expression that serves up the cumulative
# results as a data frame
packageStream <- function(session = getDefaultReactiveDomain()) {
# Connect to data source
sock <- socketConnection("cransim.rstudio.com", 6789, blocking = FALSE, open = "r")
# Clean up when session is over
session$onSessionEnded(function() {
close(sock)
})
# Returns new lines
newLines <- reactive({
invalidateLater(1000, session)
readLines(sock)
})
# Parses newLines() into data frame
reactive({
if (length(newLines()) == 0)
return()
read.csv(textConnection(newLines()), header=FALSE, stringsAsFactors=FALSE,
col.names = names(prototype)
) %>% mutate(received = as.numeric(Sys.time()))
})
}
# Accumulates pkgStream rows over time; throws out any older than timeWindow
# (assuming the presence of a "received" field)
packageData <- function(pkgStream, timeWindow) {
shinySignals::reducePast(pkgStream, function(memo, value) {
rbind(memo, value) %>%
filter(received > as.numeric(Sys.time()) - timeWindow)
}, prototype)
}
# Count the total nrows of pkgStream
downloadCount <- function(pkgStream) {
shinySignals::reducePast(pkgStream, function(memo, df) {
if (is.null(df))
return(memo)
memo + nrow(df)
}, 0)
}
# Use a bloom filter to probabilistically track the number of unique
# users we have seen; using bloom filter means we will not have a
# perfectly accurate count, but the memory usage will be bounded.
userCount <- function(pkgStream) {
# These parameters estimate that with 5000 unique users added to
# the filter, we'll have a 1% chance of false positive on the next
# user to be queried.
bloomFilter <- BloomFilter$new(5000, 0.01)
total <- 0
reactive({
df <- pkgStream()
if (!is.null(df) && nrow(df) > 0) {
# ip_id is only unique on a per-day basis. To make them unique
# across days, include the date. And call unique() to make sure
# we don't double-count dupes in the current data frame.
ids <- paste(df$date, df$ip_id) %>% unique()
# Get indices of IDs we haven't seen before
newIds <- !sapply(ids, bloomFilter$has)
# Add the count of new IDs
total <<- total + length(newIds)
# Add the new IDs so we know for next time
sapply(ids[newIds], bloomFilter$set)
}
total
})
}
# Quick and dirty bloom filter. The hashing "functions" are based on choosing
# random sets of bytes out of a single MD5 hash. Seems to work well for normal
# values, but has not been extensively tested for weird situations like very
# small n or very large p.
library(digest)
library(bit)
BloomFilter <- setRefClass("BloomFilter",
fields = list(
.m = "integer",
.bits = "ANY",
.k = "integer",
.bytesNeeded = "integer",
.bytesToTake = "matrix"
),
methods = list(
# @param n - Set size
# @param p - Desired false positive probability (e.g. 0.01 for 1%)
initialize = function(n = 10000, p = 0.001) {
m = (as.numeric(n) * log(1 / p)) / (log(2)^2)
.m <<- as.integer(m)
.bits <<- bit(.m)
.k <<- max(1L, as.integer(round((as.numeric(.m)/n) * log(2))))
# This is how many *bytes* of data we need for *each* of the k indices we need to
# generate
.bytesNeeded <<- as.integer(ceiling(log2(.m) / 8))
.bytesToTake <<- sapply(rep_len(.bytesNeeded, .k), function(byteCount) {
# 16 is number of bytes an md5 hash has
sample.int(16, byteCount, replace = FALSE)
})
},
.hash = function(x) {
hash <- digest(x, "md5", serialize = FALSE, raw = TRUE)
sapply(1:.k, function(i) {
val <- rawToInt(hash[.bytesToTake[,i]])
# Scale down to fit into the desired range
as.integer(val * (as.numeric(.m) / 2^(.bytesNeeded*8)))
})
},
has = function(x) {
all(.bits[.hash(x)])
},
set = function(x) {
.bits[.hash(x)] <<- TRUE
}
)
)
rawToInt <- function(bytes) {
Reduce(function(left, right) {
bitwShiftL(left, 8) + right
}, as.integer(bytes), 0L)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.