Nothing
# This file contains functions and environments used internally
# by the rredis package (not exported in the namespace).
.redisEnv <- new.env()
.redisEnv$current <- .redisEnv
.redis <- function(e)
{
if(missing(e)) e = .redisEnv$current
if(!exists('con',envir=e))
stop('Not connected, try using redisConnect()')
e$con
}
.openConnection <- function(host, port, nodelay=FALSE, timeout=2678399L, envir=rredis:::.redisEnv$current)
{
stopifnot(is.character(host))
stopifnot(is.numeric(port))
stopifnot(is.logical(nodelay))
# We track the file descriptor of the new connection in a crude way
# fds <- rownames(showConnections(all=TRUE)) # this doesn't work FYI
fd = NULL
if(nodelay)
{
fds <- .Call("OPEN_FD",PACKAGE="rredis")
}
con <- socketConnection(host, port, open="a+b",
blocking=TRUE, timeout=timeout)
if(nodelay)
{
fd <- as.integer(setdiff(.Call("OPEN_FD",PACKAGE="rredis"),fds))
}
if(length(fd)>0 && nodelay)
{
Nagle <- .Call("SOCK_NAGLE",fd,1L,PACKAGE="rredis")
if(Nagle!=1)
{
nodelay <- FALSE
warning("Unable to set nodelay.")
}
}
# Stash state in the redis enivronment describing this connection:
assign('fd',fd,envir=envir)
assign('con',con,envir=envir)
assign('host',host,envir=envir)
assign('port',port,envir=envir)
assign('nodelay',nodelay,envir=envir)
# Count is for pipelined communication, it keeps track of the number of
# getResponse calls that are pending.
assign('count',0,envir=envir)
con
}
.closeConnection <- function(s)
{
close(s)
}
# .redisError may be called by any function when a serious error occurs.
# It will print an indicated error message, attempt to reset the current
# Redis server connection, and signal the error.
.redisError <- function(msg, e=NULL)
{
env <- .redisEnv$current
con <- .redis()
.closeConnection(con)
# May stop with an error here on connect fail
con <- .openConnection(host=env$host,
port=env$port, nodelay=env$nodelay, envir=env)
if(!is.null(e)) print(as.character(e))
stop(msg)
}
.redisPP <- function()
{
# Ping-pong
.redisCmd(.raw('PING'))
}
.cerealize <- function(value)
{
if("redis string value" %in% names(attributes(value)))
{
value <- tryCatch(charToRaw(value) , error=function(e) value)
}
if(!is.raw(value)) serialize(value,ascii=FALSE,connection=NULL)
else value
}
# Burn data in the RX buffer, used after interrupt conditions
.burn <- function(e)
{
con <- .redis()
while(socketSelect(list(con),timeout=1L))
readBin(con, raw(), 1000000L)
.redisError("Interrupted communincation with Redis",e)
}
#
# .raw is just a shorthand wrapper for charToRaw:
#
.raw <- function(word)
{
tryCatch(charToRaw(word),
warning=function(w) stop(w),
error=function(e) stop(e))
}
# Expose the basic Redis interface to the user
redisCmd <- function(CMD, ..., raw=FALSE)
{
a <- c(alist(),list(.raw(CMD)),
lapply(list(...), function(x)
if(is.character(x)) charToRaw(x)
else(.cerealize(x))
))
if(raw) a <- c(a,raw=TRUE)
do.call('.redisCmd', a)
}
# .redisCmd corresponds to the Redis "multi bulk" protocol. It
# expects an argument list of command elements. Arguments that
# are not of type raw are serialized.
# Examples:
# .redisCmd(.raw('INFO'))
# .redisCmd(.raw('SET'),.raw('X'), runif(5))
#
# We use match.call here instead of, for example, as.list() to try to
# avoid making unnecessary copies of (potentially large) function arguments.
#
# TODO
# We can further improve this by writing a shadow serialization routine that
# quickly computes the length of a serialized object without serializing it.
# Then, we could serialize directly to the connection, avoiding the temporary
# copy (which is limited to 2GB due to R indexing).
.redisCmd <- function(...)
{
env <- .redisEnv$current
con <- .redis()
# Check to see if a rename list exists and use it if it does...we also
# define a little helper function to handle replacing the command.
# The rename list must have the form:
# list(OLDCOMMAND="NEWCOMMAND", SOME_OTHER_CMD="SOME_OTHER_NEW_CMD",...)
rep <- c()
if(exists("rename",envir=.redisEnv)) rep = get("rename",envir=.redisEnv)
f <- match.call()
# Check for raw option (which means don't deserialize returned resuts)
raw <- FALSE
if(any("raw" %in% names(f)))
{
wr <- which(names(f)=="raw")
raw <- f[[wr]]
f <- f[-wr]
}
n <- length(f) - 1
hdr <- paste('*', as.character(n), '\r\n',sep='')
writeBin(.raw(hdr), con)
tryCatch({
for(j in seq_len(n)) {
if(j==1)
v <- .renameCommand(eval(f[[j+1]],envir=sys.frame(-1)), rep)
else
v <- eval(f[[j+1]],envir=sys.frame(-1))
if(!is.raw(v)) v <- .cerealize(v)
l <- length(v)
hdr <- paste('$', as.character(l), '\r\n', sep='')
writeBin(.raw(hdr), con)
writeBin(v, con)
writeBin(.raw('\r\n'), con)
}
},
error=function(e) {.redisError("Invalid argument");invisible()},
interrupt=function(e) .burn(e)
)
pipeline <- FALSE
if(exists('pipeline',envir=env)) pipeline <- get('pipeline',envir=env)
if(!pipeline)
return(.getResponse(raw=raw))
tryCatch(
env$count <- env$count + 1,
error = function(e) assign('count', 1, envir=env)
)
invisible()
}
.renameCommand <- function(x, rep)
{
if(is.null(rep)) return(x)
v <- rawToChar(x)
if(v %in% names(rep)) return(charToRaw(rep[[v]]))
x
}
.getResponse <- function(raw=FALSE)
{
env <- .redisEnv$current
tryCatch({
con <- .redis()
l <- readLines(con=con, n=1)
if(length(l)==0) .burn("Empty")
tryCatch(
env$count <- max(env$count - 1,0),
error = function(e) assign('count', 0, envir=env)
)
s <- substr(l, 1, 1)
if (nchar(l) < 2) {
if(s == "+") {
# '+' is a valid retrun message for at least one cmd (RANDOMKEY)
return("")
}
.burn("Invalid")
}
switch(s,
'-' = stop(substr(l,2,nchar(l))),
'+' = substr(l,2,nchar(l)),
':' = {
if(!is.null(getOption('redis:num'))) return(as.numeric(substr(l,2,nchar(l))))
un <- substr(l,2,nchar(l))
attr(un, "redis string value") <- TRUE
un
},
'$' = {
n <- as.numeric(substr(l,2,nchar(l)))
if (n < 0) {
return(NULL)
}
dat <- tryCatch(readBin(con, 'raw', n=n),
error=function(e) .redisError(e$message))
m <- length(dat)
if(m==n)
{
l <- readLines(con,n=1)
if(raw)
return(dat)
else
return(tryCatch(unserialize(dat),
error=function(e)
{
un <- rawToChar(dat)
attr(un, "redis string value") <- TRUE
un
}))
}
# The message was not fully recieved in one pass for whatever reason.
# We allocate a list to hold incremental messages and then concatenate it.
# This perfromance enhancement was adapted from the Rbig server package,
# written by Steve Weston and Pat Shields.
rlen <- 50
j <- 1
r <- vector('list',rlen)
r[j] <- list(dat)
while(m<n)
{
dat <- tryCatch(readBin(con, 'raw', n=(n-m)),
error=function (e) .redisError(e$message))
j <- j + 1
if(j>rlen)
{
rlen <- 2*rlen
length(r) <- rlen
}
r[j] <- list(dat)
m <- m + length(dat)
}
l <- readLines(con,n=1) # Trailing \r\n
length(r) <- j
if(raw)
do.call(c,r)
else
tryCatch(unserialize(do.call(c,r)),
error=function(e) rawToChar(do.call(c,r)))
},
'*' = {
numVars <- as.integer(substr(l,2,nchar(l)))
if(numVars > 0L) {
replicate(numVars, .getResponse(raw=raw), simplify=FALSE)
} else NULL
},
stop('Unknown message type')
)
}, interrupt=function(e) .burn(e)
)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.