inst/pubsub/intraday-GLOBEX-from-Redis.r

suppressMessages({
    library(quantmod)                   # plotting
    library(xts) 		                # data container
    library(anytime)                    # convenient time parsing
    library(RcppRedis)                  # data cache and pub/sub
    #library(ggplot2)                   # for show_plot_gg
    #library(patchwork)                 # for show_plot_gg
})

## Parameters
defaultTZ <- "America/Chicago"
symbols <- c("BTC=F", "CL=F", "ES=F", "GC=F")
host <- "localhost"
ndays <- 2

## Callback handler for convenience
multiSymbolRedisMonitorChannels <- function(context, type="rdata", env=.GlobalEnv) {
    res <- context$listen(type)
    if (length(res) != 3 || res[[1]] != "message") return(res)
    if (exists(res[[2]], mode="function", envir=env)) {
        data <- do.call(res[[2]], as.list(res[[3]]), envir=env)
        val <- list(symbol=res[[2]], data=data)
        return(val)
    }
    res
}

most_recent_n_days <- function(rl, n=2, minobs=1500) {
    rl <- lapply(rl, function(x) {
        tt <- table(as.Date(index(x)))
        if (length(tt) < n || nrow(x) < minobs) return(x)
        cutoff <- paste(format(as.Date(names(head(tail(tt[tt>minobs], n), 1)))), "00:00:00")
        newx <- x[ index(x) >= as.POSIXct(cutoff) ]
        #msg(Sys.time(), "most recent data starting at", format(head(index(newx),1)))
        newx
    })
}

get_all_data <- function(symbols, host) {
    rl <- sapply(symbols, simplify=FALSE, FUN=function(symbol) {
        m <- redis$zrange(symbol, 0, -1)
        colnames(m) <- c("Time", "Close", "Change", "PctChange", "Volume")
        y <- xts(m[,-1,drop=FALSE], order.by=anytime(as.numeric(m[,1,drop=FALSE])))
        y
    })
}

show_plot_base <- function(symbols, rl) {
    dev.hold(1L)                        # thanks to Paul Murrell: hold the plot device ...
    op <- par(no.readonly=TRUE)
    par(mfrow=c(length(symbols), 1))
    res <- lapply(symbols, function(symbol) {
        x <- rl[[symbol]]
        lastx <- tail(coredata(x),1)
        cname <- paste(symbol,
                       format(lastx[,"Close"]),
                       round(lastx[,"Change"], 2),
                       round(lastx[,"PctChange"], 5),
                       format(Sys.time(), "%H:%M:%S"),
                       sep = "   ")
        cs <- quantmod::chart_Series(x[,"Close"], name = cname)
        ## cf issue 270
        ##    chart_Series(IBM, name=NULL)
        ##    title('IBM', font.main=3, col.main="#ff0000", line=2.5)
        plot(cs)
    })
    par(op)
    dev.flush(1L)                       # ... and flush all at once without flicker
}

## alternate using ggplot and patchwork
show_plot_gg <- function(symbols, rl) {
    res <- lapply(symbols, function(symbol) {
        x <- rl[[symbol]]
        lastx <- tail(coredata(x),1)
        cname <- paste(symbol,
                       format(lastx[,"Close"]),
                       round(lastx[,"Change"], 2),
                       round(lastx[,"PctChange"], 5),
                       format(Sys.time(), "%H:%M:%S"),
                       sep = "   ")
        xx <- data.frame(Date=index(x), coredata(x))
        ggplot(data=xx) + aes(x=Date, y=Close) +  geom_line() +
            labs(title = cname, y = "", x = "") +
            theme(plot.title = element_text(size = 9),
                  plot.margin = margin(0, 0, 0, 0, "cm"))
    })
    print(res[[1]] / res[[2]] / res[[3]])
}

show_plot <- show_plot_base
#show_plot <- show_plot_gg

msg <- function(ts, ...) {
    op <- options(digits.secs=3)
    cat(format(ts), ..., "\n")
    options(op)
}

redis <- new(Redis, host)
if (redis$ping() != "PONG") stop("No Redis server?", call. = FALSE)
x <- get_all_data(symbols, host)
x <- most_recent_n_days(x, ndays)
show_plot(symbols, x)

env <- new.env() # local environment for callbacks

## This is the callback function assigned to a symbol
.data2xts <- function(x) {
    m <- read.csv(text=x, sep=";", header=FALSE, col.names = c("Time", "Close", "Change", "PctChange", "Volume"))
    y <- xts(m[,-1,drop=FALSE], anytime(as.numeric(m[,1,drop=FALSE])))
    y
}
## With environment 'env', assign callback function for each symbol
res <- sapply(symbols, function(symbol) {
    assign(symbol, .data2xts, envir=env)  # programmatic version `ES=F` <- function(x) ....
    redis$subscribe(symbol)
})


market_closed <- TRUE
repeat {
    curr_t <- Sys.time()
    now_t <- xts(, curr_t)
    now <- .indexhour(now_t)*100 + .indexmin(now_t)
    if (now >= 1525 && now < 1710) {    # this may not get hit on futures ...
        x <- most_recent_n_days(x,ndays)
        tgt <- as.POSIXct(paste(format(as.Date(curr_t)), "17:09:59.999"))
        dt <- max(1,round(as.numeric(difftime(tgt, curr_t, units="secs")),0))
        msg(index(now_t), "after close; sleeping", dt, "seconds")
        Sys.sleep(dt)
        market_closed <- TRUE
        next
    } else if ((now >= 1710 || now < 1526) && market_closed) {
        x <- most_recent_n_days(x,ndays)
        msg(index(now_t), "market open")
        market_closed <- FALSE
    }
    rl <- multiSymbolRedisMonitorChannels(redis, env=env)  	# monitor channels in context of 'env'
    if (is.list(rl)) {
        sym <- rl[["symbol"]]
        x[[sym]] <- rbind(x[[sym]], rl[["data"]])
        z <- tail(x[[sym]],1)
        if (sym == symbols[3]) msg(Sys.time(), "data", format(index(z)), "close", z[1,"Close"], "change", z[1, "PctChange"])
    } else {
        msg(index(now_t), "null data in y")
    }
    show_plot(symbols, x)
}
# never reached but could do 'redis$unsubscribe(symbol)' here

Try the RcppRedis package in your browser

Any scripts or data that you put into this service are public.

RcppRedis documentation built on Aug. 19, 2023, 5:13 p.m.