Nothing
# Non-exported utility functions
#' Unpack and return a SciDB query expression as a data frame
#' @param db scidb database connection object
#' @param query A SciDB query expression or scidb object
#' @param ... optional extra arguments (see below)
#' @note option extra arguments
#' \itemize{
#' \item{binary}{ optional logical value, if \code{FALSE} use iquery text transfer, otherwise binary transfer, defaults \code{TRUE}}
#' \item{buffer}{ integer initial parse buffer size in bytes, adaptively resized as needed: larger buffers can be faster but comsume more memory, default size is 100000L.}
#' \item{only_attributes}{ optional logical value, \code{TRUE} means don't retrieve dimension coordinates, only return attribute values; defaults to \code{FALSE}.}
#" \item{schema}{ optional result schema string, only applies when \code{query} is not a SciDB object. Supplying this avois one extra metadata query to determine result schema. Defaults to \code{schema(query)}.}
#' }
#' @keywords internal
#' @importFrom curl new_handle handle_setheaders handle_setopt curl_fetch_memory handle_setform form_file
#' @importFrom data.table data.table
#' @import bit64
scidb_unpack_to_dataframe = function(db, query, ...)
{
DEBUG = FALSE
INT64 = attr(db, "connection")$int64
DEBUG = getOption("scidb.debug", FALSE)
AIO = getOption("scidb.aio", FALSE)
RESULT_SIZE_LIMIT = getOption("scidb.result_size_limit", 256)
if (DEBUG) {
if (is.null(attr(db, "connection")$session)) stop("[Shim session] unexpected in long running shim session")
}
buffer = 100000L
args = list(...)
if (is.null(args$only_attributes)) args$only_attributes = FALSE
if (is.null(args$binary)) args$binary = TRUE
if (!is.null(args$buffer))
{
argsbuf = tryCatch(as.integer(args$buffer), warning=function(e) NA)
if (!is.na(argsbuf) && argsbuf <= 1e9) buffer = as.integer(argsbuf)
}
if (!inherits(query, "scidb"))
{
# make a scidb object out of the query, optionally using a supplied schema to skip metadata query
if (is.null(args$schema)) query = scidb(db, query)
else query = scidb(db, query, schema=args$schema)
}
attributes = schema(query, "attributes")
dimensions = schema(query, "dimensions")
query = query@name
if(! args$binary) return(iquery(db, query, binary=FALSE, `return`=TRUE))
if (args$only_attributes)
{
internal_attributes = attributes
internal_query = query
} else
{
dim_names = dimensions$name
attr_names = attributes$name
all_names = c(dim_names, attr_names)
internal_query = query
if (length(all_names) != length(unique(all_names)))
{
# Cast to completeley unique names to be safe:
cast_dim_names = make.names_(dim_names)
cast_attr_names = make.unique_(cast_dim_names, make.names_(attributes$name))
cast_schema = sprintf("<%s>[%s]", paste(paste(cast_attr_names, attributes$type, sep=":"), collapse=","), paste(cast_dim_names, collapse=","))
internal_query = sprintf("cast(%s, %s)", internal_query, cast_schema)
all_names = c(cast_dim_names, cast_attr_names)
dim_names = cast_dim_names
}
# Apply dimensions as attributes, using unique names. Manually construct the list of resulting attributes:
dimensional_attributes = data.frame(name=dimensions$name, type="int64", nullable=FALSE) # original dimension names (used below)
internal_attributes = rbind(attributes, dimensional_attributes)
if (AIO == FALSE)
{
dim_apply = paste(dim_names, dim_names, sep=",", collapse=",")
internal_query = sprintf("apply(%s, %s)", internal_query, dim_apply)
}
}
ns = rep("", length(internal_attributes$nullable))
ns[internal_attributes$nullable] = "null"
format_string = paste(paste(internal_attributes$type, ns), collapse=",")
format_string = sprintf("(%s)", format_string)
if (DEBUG) message("Data query ", internal_query)
if (DEBUG) message("Format ", format_string)
sessionid = scidbquery(
db,
internal_query,
save=format_string,
result_size_limit=RESULT_SIZE_LIMIT,
atts_only=ifelse(args$only_attributes, TRUE, ifelse(AIO, FALSE, TRUE)))
if (!is.null(attr(db, "connection")$session)) { # if session already exists
release = 0
} else { # need to get new session every time
release = 1;
}
if (release) on.exit( SGET(db, "/release_session", list(id=sessionid), err=FALSE), add=TRUE)
dt2 = proc.time()
uri = URI(db, "/read_bytes", list(id=sessionid, n=0))
h = new_handle()
handle_setheaders(h, .list=list(`Authorization`=digest_auth(db, "GET", uri)))
handle_setopt(h, .list=list(ssl_verifyhost=as.integer(getOption("scidb.verifyhost", FALSE)),
ssl_verifypeer=0))
resp = curl_fetch_memory(uri, h)
if (resp$status_code > 299) stop("HTTP error", resp$status_code)
if (DEBUG) message("Data transfer time ", (proc.time() - dt2)[3])
dt1 = proc.time()
len = length(resp$content)
p = 0
ans = c()
cnames = c(internal_attributes$name, "lines", "p") # we are unpacking to a SciDB array, ignore dims
n = nrow(internal_attributes)
rnames = c()
typediff = setdiff(internal_attributes$type, names(.scidbtypes))
if(length(typediff) > 0)
{
stop(typediff, " SciDB type not supported. Try converting to string in SciDB or use a binary=FALSE data transfer")
}
while (p < len)
{
dt2 = proc.time()
tmp = .Call(C_scidb_parse, as.integer(buffer), internal_attributes$type,
internal_attributes$nullable, resp$content, as.double(p), as.integer(INT64))
names(tmp) = cnames
lines = tmp[[n+1]]
p_old = p
p = tmp[[n+2]]
if (DEBUG) message(" R buffer ", p, "/", len, " bytes parsing time ", round( (proc.time() - dt2)[3], 4))
dt2 = proc.time()
if (lines > 0)
{
if ("binary" %in% internal_attributes$type)
{
if (DEBUG) message(" R rbind/df assembly time ", round( (proc.time() - dt2)[3], 4))
ans = lapply(1:n, function(j) tmp[[j]][1:lines])
names(ans) = internal_attributes$name
return(ans)
}
len_out = length(tmp[[1]])
if (lines < len_out) tmp = lapply(tmp[1:n], function(x) x[1:lines])
# adaptively re-estimate a buffer size
avg_bytes_per_line = ceiling( (p - p_old) / lines)
buffer = min(getOption("scidb.buffer_size"), ceiling(1.3 * (len - p) / avg_bytes_per_line)) # Engineering factors
# Assemble the data frame
ans = data.table::rbindlist(list(ans, tmp[1:n]))
# if (is.null(ans)) ans = data.table::data.table(data.frame(tmp[1:n], stringsAsFactors=FALSE, check.names=FALSE))
# else ans = rbind(ans, data.table::data.table(data.frame(tmp[1:n], stringsAsFactors=FALSE, check.names=FALSE)))
}
if (DEBUG) message(" R rbind/df assembly time ", round( (proc.time() - dt2)[3], 4))
}
if (is.null(ans))
{
xa = attributes$name
xd = NULL
classes = list()
classes_dimensions = NULL
if (!args$only_attributes) {
xd = dimensions$name
classes_dimensions = rep("numeric", length(xd))
}
has_binary = FALSE
for(i in 1:nrow(attributes)) {
t = attributes$type[i]
if(t == 'bool') {
classes = c(classes, 'logical')
} else if(t == 'binary') {
classes = c(classes, 'list')
has_binary = TRUE
} else if(t == 'datetime') {
classes[[length(classes) + 1]] = as.character(c('POSIXct', 'POSIXt'))
} else if(t == 'string' || t == 'char') {
classes = c(classes, 'character')
} else if(t %in% c('int8', 'uint8', 'int16', 'uint16', 'int32')) {
classes = c(classes, 'integer')
} else {
classes = c(classes, 'numeric')
}
}
n = length(xd) + length(xa)
ans = vector(mode="list", length=n)
if (has_binary) {
# C_scidb_parse leaves dimensions at the end,
# for "binary" leave dimensions as they are
classes = c(classes, classes_dimensions)
names(ans) = make.names_(c(xa, xd))
for(i in 1:length(ans))
class(ans[[i]]) = classes[[i]]
}
else {
classes = c(classes_dimensions, classes)
names(ans) = make.names_(c(xd, xa))
class(ans) = "data.frame"
for(i in 1:ncol(ans)) {
# Workaround for POSIXct class
if (length(classes[[i]]) == 2
&& all.equal(classes[[i]], as.character(c("POSIXct", "POSIXt"))))
class(ans[, i]) = 'numeric'
class(ans[, i]) = classes[[i]]
}
}
return(ans)
}
if (DEBUG) message("Total R parsing time ", round( (proc.time() - dt1)[3], 4))
ans = as.data.frame(ans, check.names=FALSE)
if (INT64)
{
for (i64 in which(internal_attributes$type %in% "int64")) oldClass(ans[, i64]) = "integer64"
}
# Handle datetime (integer POSIX time)
for (idx in which(internal_attributes$type %in% "datetime")) ans[, idx] = as.POSIXct(ans[, idx], origin="1970-1-1")
if (args$only_attributes) # permute cols, see issue #125
{
colnames(ans) = make.names_(attributes$name)
}
else
{
nd = length(dimensions$name)
i = ncol(ans) - nd
ans = ans[, c( (i+1):ncol(ans), 1:i)]
colnames(ans) = make.names_(c(dimensions$name, attributes$name))
}
ans
}
#' Convenience function for digest authentication.
#' @param db a scidb database connection object
#' @param method digest method
#' @param uri uri
#' @param realm realm
#' @param nonce nonce
#' @keywords internal
#' @importFrom digest digest
digest_auth = function(db, method, uri, realm="", nonce="123456")
{
.scidbenv = attr(db, "connection")
if (!is.null(.scidbenv$authtype))
{
if (.scidbenv$authtype != "digest") return("")
}
uri = gsub(".*/", "/", uri)
userpwd = .scidbenv$digest
if (is.null(userpwd)) userpwd=":"
up = strsplit(userpwd, ":")[[1]]
user = up[1]
pwd = up[2]
if (is.na(pwd)) pwd=""
ha1=digest(sprintf("%s:%s:%s", user, realm, pwd, algo="md5"), serialize=FALSE)
ha2=digest(sprintf("%s:%s", method, uri, algo="md5"), serialize=FALSE)
cnonce="MDc1YmFhOWFkY2M0YWY2MDAwMDBlY2JhMDAwMmYxNTI="
nc="00000001"
qop="auth"
response=digest(sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, qop, ha2), algo="md5", serialize=FALSE)
sprintf('Digest username="%s", realm=%s, nonce="%s", uri="%s", cnonce="%s", nc=%s, qop=%s, response="%s"', user, realm, nonce, uri, cnonce, nc, qop, response)
}
# Internal warning function
warnonce = (function() {
state = list(
count="Use the AFL op_count macro operator for an exact count of data rows.",
nonum="Note: The R sparse Matrix package does not support certain value types like\ncharacter strings"
)
function(warn) {
if (!is.null(state[warn][[1]])) {
message(state[warn])
s <<- state
s[warn] = c()
state <<- s
}
}
}) ()
# Some versions of RCurl seem to contain a broken URLencode function.
oldURLencode = function (URL, reserved = FALSE)
{
OK = paste0("[^-ABCDEFGHIJKLMNOPQRSTUVWXYZ", "abcdefghijklmnopqrstuvwxyz0123456789$_.+!*'(),",
if (!reserved)
";/?:@=&", "]")
x = strsplit(URL, "")[[1L]]
z = grep(OK, x)
if (length(z)) {
y = sapply(x[z], function(x) paste0("%", as.character(charToRaw(x)),
collapse = ""))
x[z] = y
}
paste(x, collapse = "")
}
# Internal function
create_temp_array = function(db, name, schema)
{
# SciDB temporary array syntax varies with SciDB version
TEMP = "'TEMP'"
if (at_least(attr(db, "connection")$scidb.version, "14.12")) TEMP="true"
query = sprintf("create_array(%s, %s, %s)", name, schema, TEMP)
iquery(db, query, `return`=FALSE)
}
# Internal function
get_setting_items_str = function(db, settings, sep=',') {
convert_single_item_v19 = function(key, value) {
if(is.character(value))
value = sprintf("'%s'", value) # Quote string value(s)
valueStr = if(length(value) > 1) sprintf("(%s)", paste(value, collapse = ',')) else value
sprintf("%s:%s", key, valueStr)
}
convert_single_item_pre_v19 = function(key, value) {
valueStr = if(length(value) > 1) paste(value, collapse = ',') else value
sprintf("'%s=%s'", key, valueStr)
}
convert_single_item = if (at_least(attr(db, "connection")$scidb.version, "19.0"))
convert_single_item_v19 else convert_single_item_pre_v19
items = mapply(convert_single_item, names(settings), settings)
paste(items, collapse = sep)
}
#' An important internal convenience function that returns a scidb object. If
#' eval=TRUE, a new SciDB array is created the returned scidb object refers to
#' that. Otherwise, the returned scidb object represents a SciDB array promise.
#'
#' @param db scidb connection object
#' @param expr (character) A SciDB expression or array name
#' @param eval (logical) If TRUE evaluate expression and assign to new SciDB array.
#' If FALSE, infer output schema but don't evaluate.
#' @param name (optional character) If supplied, name for stored array when eval=TRUE
#' @param gc (optional logical) If TRUE, tie SciDB object to garbage collector.
#' @param depend (optional list) An optional list of other scidb objects
#' that this expression depends on (preventing their garbage collection
#' if other references to them go away).
#' @param schema (optional) used to create SciDB temp arrays
#' (requires scidb >= 14.8)
#' @param temp (optional) used to create SciDB temp arrays
#' (requires scidb >= 14.8)
#' @return A \code{scidb} array object
#' @note Only AFL supported.
`.scidbeval` = function(db, expr, eval=FALSE, name, gc=TRUE, depend, schema, temp)
{
ans = c()
if (missing(depend)) depend = c()
if (missing(schema)) schema = ""
if (missing(temp)) temp = FALSE
if (!is.list(depend)) depend = list(depend)
# Address bug #45. Try to cheaply determine if expr refers to a named array
# or an AFL expression. If it's a named array, then eval must be set TRUE.
if (!grepl("\\(", expr, perl=TRUE)) eval = TRUE
if (`eval`)
{
if (missing(name) || is.null(name))
{
newarray = tmpnam(db)
if (temp) create_temp_array(db, newarray, schema)
}
else newarray = name
query = sprintf("store(%s,%s)", expr, newarray)
scidbquery(db, query, stream=0L)
ans = scidb(db, newarray, gc=gc)
if (temp) ans@meta$temp = TRUE
} else
{
ans = scidb(db, expr, gc=gc)
# Assign dependencies
if (length(depend) > 0)
{
assign("depend", depend, envir=ans@meta)
}
}
ans
}
make.names_ = function(x)
{
gsub("\\.", "_", make.names(x, unique=TRUE), perl=TRUE)
}
# x is vector of existing values
# y is vector of new values
# returns a set the same size as y with non-conflicting value names
make.unique_ = function(x, y)
{
z = make.names(gsub("_", ".", c(x, y)), unique=TRUE)
gsub("\\.", "_", utils::tail(z, length(y)))
}
# Make a name from a prefix and a unique SciDB identifier.
getuid = function(db)
{
.scidbenv = attributes(db)$connection
if (is.null(.scidbenv$id)) stop("Not connected...try scidbconnect")
.scidbenv$id
}
tmpnam = function(db, prefix="R_array")
{
stopifnot(inherits(db, "afl"))
salt = basename(tempfile(pattern=prefix))
paste(salt, getuid(db), sep="")
}
# Return a shim session ID or error
getSession = function(db)
{
session = SGET(db, "/new_session")
if (length(session)<1) stop("SciDB http session error; are you connecting to a valid SciDB host?")
session = gsub("\r", "", session)
session = gsub("\n", "", session)
session
}
# Supply the base SciDB URI from the global host, port and auth
# parameters stored in the "connection" environment in the db object
# Every function that needs to talk to the shim interface should use
# this function to supply the URI.
# Arguments:
# db scidb database connection object
# resource (string): A URI identifying the requested service
# args (list): A list of named query parameters
URI = function(db, resource="", args=list())
{
.scidbenv = attr(db, "connection")
if (is.null(.scidbenv$host)) stop("Not connected...try scidbconnect")
if (!is.null(.scidbenv$auth))
args = c(args, list(auth=.scidbenv$auth))
if (!is.null(.scidbenv$password)) args = c(args, list(password=.scidbenv$password))
if (!is.null(.scidbenv$username)) args = c(args, list(user=.scidbenv$username))
if (!is.null(.scidbenv$admin) && .scidbenv$admin) args = c(args, list(admin=1))
prot = paste(.scidbenv$protocol, "//", sep=":")
if ("password" %in% names(args) || "auth" %in% names(args)) prot = "https://"
if (!is.null(.scidbenv$port)) { # if port value is not NULL
ans = paste(prot, .scidbenv$host, ":", .scidbenv$port, sep="")
} else { # if port value is NULL, Shim port must have been forwarded to a URL
# and only having the URL is sufficient
ans = paste(prot, .scidbenv$host, sep = "")
}
ans = paste(ans, resource, sep="/")
if (length(args)>0)
ans = paste(ans, paste(paste(names(args), args, sep="="), collapse="&"), sep="?")
ans
}
SGET = function(db, resource, args=list(), err=TRUE, binary=FALSE)
{
if (!(substr(resource, 1, 1)=="/")) resource = paste("/", resource, sep="")
uri = URI(db, resource, args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = new_handle()
handle_setheaders(h, .list=list(Authorization=digest_auth(db, "GET", uri)))
handle_setopt(h, .list=list(ssl_verifyhost=as.integer(getOption("scidb.verifyhost", FALSE)),
ssl_verifypeer=0))
ans = curl_fetch_memory(uri, h)
if (ans$status_code > 299 && err)
{
msg = sprintf("HTTP error %s", ans$status_code)
if (ans$status_code >= 400) msg = sprintf("%s\n%s", msg, rawToChar(ans$content))
stop(msg)
}
if (binary) return(ans$content)
rawToChar(ans$content)
}
# Normally called with raw data and args=list(id=whatever)
POST = function(db, data, args=list(), err=TRUE)
{
# check for new shim simple post option (/upload), otherwise use
# multipart/file upload (/upload_file)
shimspl = strsplit(attr(db, "connection")$scidb.version, "\\.")[[1]]
shim_yr = tryCatch(as.integer(gsub("[A-z]", "", shimspl[1])), error=function(e) 16, warning=function(e) 8)
shim_mo = tryCatch(as.integer(gsub("[A-z]", "", shimspl[2])), error=function(e) 16, warning=function(e) 8)
if (is.na(shim_yr)) shim_yr = 16
if (is.na(shim_mo)) shim_mo = 8
simple = (shim_yr >= 15 && shim_mo >= 7) || shim_yr >= 16
if (simple)
{
uri = URI(db, "/upload", args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = new_handle()
handle_setheaders(h, .list=list(Authorization=digest_auth(db, "POST", uri)))
handle_setopt(h, .list=list(ssl_verifyhost=as.integer(getOption("scidb.verifyhost", FALSE)),
ssl_verifypeer=0, post=TRUE, postfieldsize=length(data), postfields=data))
ans = curl_fetch_memory(uri, h)
if (ans$status_code > 299 && err) stop("HTTP error ", ans$status_code)
return(rawToChar(ans$content))
}
uri = URI(db, "/upload_file", args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = new_handle()
handle_setheaders(h, .list=list(Authorization=digest_auth(db, "POST", uri)))
handle_setopt(h, .list=list(ssl_verifyhost=as.integer(getOption("scidb.verifyhost", FALSE)),
ssl_verifypeer=0))
tmpf = tempfile()
if (is.character(data)) data = charToRaw(data)
writeBin(data, tmpf)
handle_setform(h, file=form_file(tmpf))
ans = curl_fetch_memory(uri, h)
unlink(tmpf)
if (ans$status_code > 299 && err) stop("HTTP error", ans$status_code)
return(rawToChar(ans$content))
}
# Basic low-level query. Returns query id. This is an internal function.
# db: scidb database connection object
# query: a character query string
# save: Save format query string or NULL.
# session: if you already have a SciDB http session, set this to it, otherwise NULL
# resp(logical): return http response
# stream: Set to 0L or 1L to control streaming (NOT USED)
# prefix: optional AFL statement to prefix query in the same connection context.
# Example values of save: "dcsv", "csv+", "(double NULL, int32)"
#
# Returns the HTTP session in each case
scidbquery = function(db, query, save=NULL, result_size_limit=NULL, session=NULL, resp=FALSE, stream, prefix=attributes(db)$connection$prefix, atts_only=TRUE)
{
DEBUG = FALSE
STREAM = 0L
DEBUG = getOption("scidb.debug", FALSE)
if (missing(stream))
{
STREAM = 0L
} else STREAM = as.integer(stream)
release = 0
if (!is.null(attr(db, "connection")$session)) {
session = attr(db, "connection")$session
} else {
if (DEBUG) cat("[Shim session] created new session\n")
}
sessionid = session
if (is.null(session))
{
sessionid = getSession(db) # Obtain a session from shim
}
if (is.null(save)) save=""
if (is.null(result_size_limit)) result_size_limit=""
if (DEBUG)
{
message(query, "\n")
t1 = proc.time()
}
ans = tryCatch(
{
args = list(id=sessionid, afl=0L, query=query, stream=0L)
args$release = release
args$prefix = c(getOption("scidb.prefix"), prefix)
if (!is.null(args$prefix)) args$prefix = paste(args$prefix, collapse=";")
args$save = save
args$result_size_limit = result_size_limit
if (!is.null(args$save)) args$atts_only=ifelse(atts_only, 1L, 0L)
do.call("SGET", args=list(db=db, resource="/execute_query", args=args))
}, error=function(e)
{
SGET(db, "/cancel", list(id=sessionid), err=FALSE)
if (release) SGET(db, "/release_session", list(id=sessionid), err=FALSE)
e$call = NULL
stop(e)
}, interrupt=function(e)
{
SGET(db, "/cancel", list(id=sessionid), err=FALSE)
if (release) SGET(db, "/release_session", list(id=sessionid), err=FALSE)
stop("cancelled")
}, warning=invisible)
if (DEBUG) message("Query time ", round( (proc.time() - t1)[3], 4))
if (resp) return(list(session=sessionid, response=ans))
sessionid
}
.Matrix2scidb = function(db, X, name, rowChunkSize=1000, colChunkSize=1000, start=c(0, 0), gc=TRUE, ...)
{
D = dim(X)
rowOverlap = 0L
colOverlap = 0L
if (missing(start)) start=c(0, 0)
if (length(start) < 1) stop ("Invalid starting coordinates")
if (length(start) > 2) start = start[1:2]
if (length(start) < 2) start = c(start, 0)
start = as.integer(start)
type = .scidbtypes[[typeof(X@x)]]
if (is.null(type)) {
stop(paste("Unupported data type. The package presently supports: ",
paste(.scidbtypes, collapse=" "), ".", sep=""))
}
if (type != "double") stop("Sorry, the package only supports double-precision sparse matrices right now.")
schema = sprintf(
"< val : %s null> [i=%.0f:%.0f,%.0f,%.0f, j=%.0f:%.0f,%.0f,%.0f]", type, start[[1]],
nrow(X)-1+start[[1]], min(nrow(X), rowChunkSize), rowOverlap, start[[2]], ncol(X)-1+start[[2]],
min(ncol(X), colChunkSize), colOverlap)
schema1d = sprintf("<i:int64 null, j:int64 null, val : %s null>[idx=0:*,100000,0]", type)
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) on.exit( SGET(db, "/release_session", list(id=session), err=FALSE), add=TRUE)
# Compute the indices and assemble message to SciDB in the form
# double, double, double for indices i, j and data val.
dp = diff(X@p)
j = rep(seq_along(dp), dp) - 1
# Upload the data
bytes = .Call(C_scidb_raw, as.vector(t(matrix(c(X@i + start[[1]], j + start[[2]], X@x), length(X@x)))))
ans = POST(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
# redimension into a matrix
query = sprintf("store(redimension(input(%s,'%s',-2,'(double null,double null,double null)'),%s),%s)", schema1d, ans, schema, name)
iquery(db, query)
scidb(db, name, gc=gc)
}
# raw value to special 1-element SciDB array
raw2scidb = function(db, X, name, gc=TRUE, ...)
{
if (!is.raw(X)) stop("X must be a raw value")
args = list(...)
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) on.exit( SGET(db, "/release_session", list(id=session), err=FALSE), add=TRUE)
bytes = .Call(C_scidb_raw, X)
ans = POST(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
schema = "<val:binary null>[i=0:0,1,0]"
if (!is.null(args$temp))
{
if (args$temp) create_temp_array(db, name, schema)
}
query = sprintf("store(input(%s,'%s',-2,'(binary null)'),%s)", schema, ans, name)
iquery(db, query)
scidb(db, name, gc=gc)
}
# Internal utility function used to format numbers
noE = function(w) sapply(w,
function(x)
{
if (is.infinite(x)) return("*")
if (is.character(x)) return(x)
sprintf("%.0f", x)
})
#' Returns TRUE if version string x is greater than or equal to than version y
#' @param x version string like "12.1", "15.12", etc. (non-numeric ignored)
#' @param y version string like "12.1", "15.12", etc. (non-numeric ignored)
#' @return logical TRUE if x is greater than or equal to y
at_least = function(x, y)
{
b = as.numeric(gsub("-.*", "", gsub("[A-z].*", "", strsplit(sprintf("%s.0", x), "\\.")[[1]])))
b = b[1] + b[2] / 100
a = as.numeric(gsub("-.*", "", gsub("[A-z].*", "", strsplit(sprintf("%s.0", y), "\\.")[[1]])))
a = a[1] + a[2] / 100
b >= a
}
# Used in delayed assignment of scidb object schema
lazyeval = function(db, name)
{
escape = gsub("'", "\\\\'", name, perl=TRUE)
query = iquery(db, sprintf("show('filter(%s, true)', 'afl')", escape), `return`=TRUE, binary=FALSE)
# NOTE that we need binary=FALSE here to avoid a terrible recursion
list(schema = gsub("^.*<", "<", query$schema, perl=TRUE))
}
#' Internal function to upload an R data frame to SciDB
#' @param db scidb database connection
#' @param X a data frame
#' @param name SciDB array name
#' @param chunk_size optional value passed to the aio_input operator see https://github.com/Paradigm4/accelerated_io_tools
#' @param types SciDB attribute types
#' @param gc set to \code{TRUE} to connect SciDB array to R's garbage collector
#' @return a \code{\link{scidb}} object, or a character schema string if \code{schema_only=TRUE}.
#' @keywords internal
df2scidb = function(db, X,
name=tmpnam(db),
types=NULL,
use_aio_input=FALSE,
chunk_size,
gc, format, temp=FALSE)
{
if (!is.data.frame(X)) stop("X must be a data frame")
if (missing(gc)) gc = TRUE
nullable = TRUE
anames = make.names(names(X), unique=TRUE)
anames = gsub("\\.", "_", anames, perl=TRUE)
if (length(anames) != ncol(X)) anames = make.names(1:ncol(X))
if (!all(anames == names(X))) warning("Attribute names have been changed")
# Default type is string
typ = rep("string", ncol(X))
dcast = anames
if (!is.null(types)) {
for (j in 1:ncol(X)) typ[j] = types[j]
}
for (j in 1:ncol(X)) {
if ((! grepl("^int", typ[j])) && "numeric" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "double"
X[, j] = gsub("NA", "null",
sprintf("%.17g",
ifelse(X[, j] > 0 & X[, j] < .Machine$double.xmin, 0,
ifelse(X[, j] < 0 & X[, j] > -.Machine$double.xmin, 0, X[, j]))))
}
else if (grepl("^int", typ[j]) || "integer" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "int32"
X[, j] = gsub("NA", "null", sprintf("%d", X[, j]))
}
else if (grepl("^int", typ[j]) || "integer64" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "int64"
X[, j] = gsub("NA", "null", sprintf("%s", X[, j]))
}
else if ("logical" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "bool"
X[, j] = gsub("na", "null", tolower(sprintf("%s", X[, j])))
}
else if ("character" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "string"
X[is.na(X[, j]), j] = "null"
}
else if ("factor" %in% class(X[, j]))
{
if(is.null(types)) typ[j] = "string"
isna = is.na(X[, j])
X[, j] = sprintf("%s", X[, j])
if (any(isna)) X[isna, j] = "null"
}
else if ("Date" %in% class(X[, j]) || "POSIXct" %in% class(X[, j]))
{
warning("Converting R Date/POSIXct to SciDB datetime as UTC time. Subsecond times rounded to seconds.")
X[, j] = round(as.double(as.POSIXct(X[, j], tz="UTC")))
X[, j] = gsub("NA", "null", sprintf("%d", X[, j]))
if(is.null(types)) typ[j] = "datetime"
}
}
for (j in 1:ncol(X))
{
if (typ[j] == "datetime") dcast[j] = sprintf("%s, datetime(a%d)", anames[j], j - 1)
else if (typ[j] == "string") dcast[j] = sprintf("%s, a%d", anames[j], j - 1)
else dcast[j] = sprintf("%s, dcast(a%d, %s(null))", anames[j], j - 1, typ[j])
}
args = sprintf("<%s>", paste(anames, ":", typ, " null", collapse=","))
# Obtain a session from the SciDB http service for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) on.exit( SGET(db, "/release_session", list(id=session), err=FALSE), add=TRUE)
ncolX = ncol(X)
nrowX = nrow(X)
if(missing(format)) X = charToRaw(fwrite(X, file=return))
else X = charToRaw(fwrite(X, file=return, format=format))
tmp = POST(db, X, list(id=session))
tmp = gsub("\n", "", gsub("\r", "", tmp))
# Generate a load_tools query
aio = length(grep("aio_input", names(db))) > 0
atts = paste(dcast, collapse=",")
if (use_aio_input && aio)
{
aioSettings = list(num_attributes = ncolX)
if(!missing(chunk_size))
aioSettings[['chunk_size']] = chunk_size
LOAD = sprintf("project(apply(aio_input('%s', %s),%s),%s)", tmp,
get_setting_items_str(db, aioSettings), atts, paste(anames, collapse=","))
} else
{
if (missing(chunk_size))
LOAD = sprintf("input(%s, '%s', -2, 'tsv')", dfschema(anames, typ, nrowX), tmp)
else
LOAD = sprintf("input(%s, '%s', -2, 'tsv')", dfschema(anames, typ, nrowX, chunk_size), tmp)
}
## Create a temporary array 'name'
if(temp){ # Use scidb temporary array instead of regular versioned array
targetArraySchema = lazyeval(db, LOAD)$schema
create_temp_array(db, name, schema = targetArraySchema)
}
##
query = sprintf("store(%s,%s)", LOAD, name)
scidbquery(db, query, session=session, stream=0L)
scidb(db, name, gc=gc)
}
#' Fast write.table/textConnection substitute
#'
#' Conversions are vectorized and the entire output is buffered in memory and written in
#' one shot. Great option for replacing writing to a textConnection (much much faster).
#' Not such a great option for writing to files, marginal difference from write.table and
#' obviously much greater memory use.
#' @param x a data frame
#' @param file a connection or \code{return} to return character output directly (fast)
#' @param sep column separator
#' @param format optional fprint-style column format specifyer
#' @return Use for the side effect of writing to the connection returning \code{NULL}, or
#' return a character value when \code{file=return}.
#' @importFrom utils write.table
#' @keywords internal
fwrite = function(x, file=stdout(), sep="\t", format=paste(rep("%s", ncol(x)), collapse=sep))
{
if(length(format) > 1) format = paste(format, collapse=sep)
foo = NULL
rm(list="foo") # avoid package R CMD check warnings of undeclared variable
if (!is.data.frame(x)) stop("x must be a data.frame")
if (is.null(file) || ncol(x) > 97) # use slow write.table method
{
tc = textConnection("foo", open="w")
write.table(x, sep=sep, col.names=FALSE, row.names=FALSE, file=tc, quote=FALSE)
close(tc)
return(paste(foo, collapse="\n"))
}
if (is.function(file)) return(paste(do.call("sprintf", args=c(format, as.list(x))), collapse="\n"))
write(paste(do.call("sprintf", args=c(format, as.list(x))), collapse="\n"), file=file)
invisible()
}
matvec2scidb = function(db, X,
name=tmpnam(db),
start,
gc=TRUE, ...)
{
# Check for a bunch of optional hidden arguments
args = list(...)
attr_name = "val"
if (!is.null(args$attr)) attr_name = as.character(args$attr) # attribute name
do_reshape = TRUE
if ("factor" %in% class(X)) X = as.character(X)
type = force_type = .Rtypes[[typeof(X)]]
if ("Date" %in% class(X))
{
X = as.double(as.POSIXct(X, tz="UTC")) # XXX warn UTC?
force_type = "datetime"
}
if ("integer64" %in% class(X)) type = force_type = "int64"
if (is.null(type)) {
stop(paste("Unupported data type. The package supports: ",
paste(unique(names(.Rtypes)), collapse=" "), ".", sep=""))
}
if (!is.null(args$reshape)) do_reshape = as.logical(args$reshape) # control reshape
if (!is.null(args$type)) force_type = as.character(args$type) # limited type conversion
chunkSize = c(min(1000L, nrow(X)), min(1000L, ncol(X)))
chunkSize = as.numeric(chunkSize)
if (length(chunkSize) == 1) chunkSize = c(chunkSize, chunkSize)
overlap = c(0, 0)
if (missing(start)) start = c(0, 0)
start = as.numeric(start)
if (length(start) ==1) start = c(start, start)
D = dim(X)
start = as.integer(start)
overlap = as.integer(overlap)
dimname = make.unique_(attr_name, "i")
if (is.null(D))
{
# X is a vector
do_reshape = FALSE
chunkSize = min(chunkSize[[1]], length(X))
X = as.matrix(X)
schema = sprintf(
"< %s : %s null> [%s=%.0f:%.0f,%.0f,%.0f]", attr_name, force_type, dimname, start[[1]],
nrow(X) - 1 + start[[1]], min(nrow(X), chunkSize), overlap[[1]])
load_schema = schema
} else if (length(D) > 2)
{
# X is a dense n-d array
ndim = length(D)
chunkSize = rep(floor(10e6 ^ (1 / ndim)), ndim)
start = rep(0, ndim)
end = D - 1
dimNames = make.unique_(attr_name, paste("i", 1:length(D), sep=""))
schema = sprintf("< %s : %s null >[%s]", attr_name, force_type, paste(sprintf( "%s=%.0f:%.0f,%.0f,0", dimNames, start, end, chunkSize), collapse=","))
load_schema = sprintf("<%s:%s null>[__row=1:%.0f,1000000,0]", attr_name, force_type, length(X))
} else {
# X is a matrix
schema = sprintf(
"< %s : %s null> [i=%.0f:%.0f,%.0f,%.0f, j=%.0f:%.0f,%.0f,%.0f]", attr_name, force_type, start[[1]],
nrow(X) - 1 + start[[1]], chunkSize[[1]], overlap[[1]], start[[2]], ncol(X) - 1 + start[[2]],
chunkSize[[2]], overlap[[2]])
load_schema = sprintf("<%s:%s null>[__row=1:%.0f,1000000,0]", attr_name, force_type, length(X))
}
if (!is.array(X)) stop ("X must be an array or vector")
DEBUG = getOption("scidb.debug", FALSE)
td1 = proc.time()
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) on.exit( SGET(db, "/release_session", list(id=session), err=FALSE), add=TRUE)
# Upload the data
bytes = .Call(C_scidb_raw, as.vector(aperm(X)))
ans = POST(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
if (DEBUG)
{
message("Data upload time ", (proc.time() - td1)[3], "\n")
}
# Load query
if (do_reshape)
{
query = sprintf("store(reshape(input(%s,'%s', -2, '(%s null)'),%s),%s)", load_schema, ans, type, schema, name)
} else
{
query = sprintf("store(input(%s,'%s', -2, '(%s null)'),%s)", load_schema, ans, type, name)
}
iquery(db, query)
scidb(db, name, gc=gc)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.