###
### @P4COPYRIGHT_SINCE:2022@
###
#'
#' File: shim.R
#' Implementation layer for calls to the Shim, particularly S3 methods
#' on class "shim"
#'
#' @seealso GetServerVersion()
#' @noRd
GetServerVersion.shim <- function(db)
{
version = SGET.shim(db, "/version")
attr(db, "connection")$scidb.version <- .ParseVersion(version)
## If we got this far, the request succeeded - so the connected host and port
## is for the Shim.
class(attr(db, "connection")) <- "shimconn"
class(db) <- c("shim", "afl")
return(db)
}
#' @seealso NewSession()
#' @noRd
NewSession.shim <- function(db, auth_type=NULL)
{
## Check for login using either scidb or HTTP digest authentication
conn = attr(db, "connection")
if (has.chars(conn$username)) {
stopifnot(has.chars(auth_type))
conn$authtype = auth_type
if (auth_type=="scidb") {
conn$protocol = "https"
} else {
## HTTP basic digest auth
conn$digest = paste(conn$username, conn$password, sep=":")
}
}
## Run an arbitrary query via scidbquery.shim().
## scidbquery.shim() creates a new session, executes the query, and returns
## a list(response=, session=) where session is the Shim session ID, and
## response is the query ID (because that's what the Shim outputs in its
## HTTP response to any query).
## We don't care about the query result.
x <- tryCatch(scidbquery.shim(db, query="list('libraries')", resp=TRUE),
error=function(e) stop("Connection error ", e),
warning=invisible)
## Register the session to be closed when db's "connection" env
## gets garbage-collected.
reg.finalizer(conn, .CloseShimSession, onexit=TRUE)
## Get the id of the query we just ran.
query_id <- tryCatch(strsplit(x$response, split="\\r\\n")[[1]],
error=function(e) stop("Connection error ", e),
warning=invisible)
## Give the connection the session ID.
conn$session <- x$session
## Give the connection a unique ID - in this case just use the query ID.
conn$id <- query_id[[length(query_id)]]
## Should not use password going forward (session is stored)
conn$password <- NULL
}
#' @seealso EnsureSession()
#' @noRd
EnsureSession.shim <- function(db, ...)
{
conn = attr(db, "connection")
if (is.null(conn) || is.null(conn$session) || is.null(conn$id)) {
NewSession.shim(db)
}
}
#' @seealso Reauthenticate()
#' @noRd
Reauthenticate.shim <- function(db, password, defer=FALSE)
{
# Nothing to do here for shim
}
#' @seealso Close()
#' @noRd
Close.shim <- function(db)
{
.CloseShimSession(attr(db, "connection"))
}
#' Close the Shim session.
#' This is registered as a finalizer on attr(db, "connection") so it closes
#' the session when the connection gets garbage-collected.
#' @param conn the connection environment, usually obtained from
#' attr(db, "connection")
#' @noRd
.CloseShimSession <- function(conn)
{
if (is.null(conn$session)) {
msg.trace("[Shim session] Session already closed. Nothing to do here.")
return()
}
msg.debug("[Shim session] automatically cleaning up db session ", conn$session)
SGET.shim(conn, "/release_session", list(id=conn$session), err=FALSE)
## Set the session to NULL so we don't double-release the session.
## Because conn is an env, this side effect will persist
## outside of the current function call.
conn$session <- NULL
}
#' @seealso Execute()
#' @seealso scidbquery.shim()
#' @noRd
Execute.shim <- function(db, query_or_scidb, ...)
{
scidbquery.shim(db, .GetQueryString(query_or_scidb), ...)
invisible()
}
#' @seealso iquery(), Query()
#' @noRd
Query.shim = function(db, query_or_scidb,
`return`=FALSE, binary=TRUE, arrow=FALSE, ...)
{
DEBUG = getOption("scidb.debug", FALSE)
query = .GetQueryString(query_or_scidb)
n = -1 # Indicate to shim that we want all the output
if(binary && arrow) {
stop("Only one of `binary` and `arrow` can be true")
}
session = NULL; release = 1;
if (!is.null(attr(db, "connection")$session)) {
session = attr(db, "connection")$session
release = 0
}
if (`return`) {
if (arrow) return(scidb_arrow_to_dataframe.shim(db, query, ...))
if (binary) return(BinaryQuery.shim(db, query, ...))
ans = tryCatch(
{
# SciDB save syntax changed in 15.12
if (at_least(attr(db, "connection")$scidb.version, 15.12)) {
sessionid = scidbquery.shim(db, query, save="csv+:l")
} else {
sessionid = scidbquery.shim(db, query, save="csv+")
}
dt1 = proc.time()
result = tryCatch(
{
SGET.shim(db, "/read_lines",
list(id=sessionid, n=as.integer(n + 1)))
},
error=function(e)
{
SGET.shim(db, "/cancel", list(id=sessionid))
if (release) {
SGET.shim(db, "/release_session", list(id=sessionid), err=FALSE)
}
stop(e)
})
if (release) {
SGET.shim(db, "/release_session", list(id=sessionid), err=FALSE)
}
if (DEBUG) message("Data transfer time ", round((proc.time() - dt1)[3], 4))
dt1 = proc.time()
## Convert the CSV data to an R dataframe
ret <- .Csv2df(result, ...)
if (DEBUG) message("R parsing time ", round((proc.time()-dt1)[3], 4))
ret
},
error = function(e) {
stop(e)
},
warning=invisible)
return(ans)
} else {
scidbquery.shim(db, query)
}
invisible()
}
#' @seealso BinaryQuery()
#' @noRd
BinaryQuery.shim = function(db, query_or_scidb,
binary=NULL, buffer_size=NULL,
only_attributes=NULL, schema=NULL, ...)
{
DEBUG = FALSE
INT64 = attr(db, "connection")$int64
DEBUG = getOption("scidb.debug", FALSE)
AIO = getOption("scidb.aio", FALSE)
RESULT_SIZE_LIMIT = getOption("scidb.result_size_limit", 256)
if (DEBUG) {
if (is.null(attr(db, "connection")$session)) {
stop("[Shim session] unexpected in long running shim session")
}
}
lazyeval_ret = lazyeval(db, query_or_scidb)
only_attributes = if (is.null(only_attributes)) {
dist = lazyeval_ret$distribution
if(is.na(dist)) FALSE else if(dist == 'dataframe') TRUE else FALSE
} else {
only_attributes
}
if (is.null(binary)) binary = TRUE
if (!inherits(query_or_scidb, "scidb"))
{
stopifnot(is.character(query_or_scidb))
query = query_or_scidb
## make a scidb object out of the query,
## optionally using a supplied schema to skip metadata query
if (is.null(schema)) {
query_or_scidb = if(is.na(lazyeval_ret$schema)) {
scidb(db, query)
} else {
scidb(db, query, schema=lazyeval_ret$schema)
}
} else {
query_or_scidb = scidb(db, query, schema=schema)
}
}
## query_or_scidb must be a scidb object by this point
stopifnot(inherits(query_or_scidb, "scidb"))
scidb_obj = query_or_scidb
attributes = schema(scidb_obj, "attributes")
dimensions = schema(scidb_obj, "dimensions")
query = scidb_obj@name
if(! binary) return(iquery(db, query, binary=FALSE, `return`=TRUE, arrow=FALSE))
if (only_attributes)
{
internal_attributes = attributes
internal_query = query
} else {
dim_names = dimensions$name
attr_names = attributes$name
all_names = c(dim_names, attr_names)
internal_query = query
if (length(all_names) != length(unique(all_names)))
{
# Cast to completely unique names to be safe:
cast_dim_names = make.names_(dim_names)
cast_attr_names = make.unique_(cast_dim_names, make.names_(attributes$name))
cast_schema = sprintf("<%s>[%s]", paste(paste(cast_attr_names, attributes$type, sep=":"), collapse=","), paste(cast_dim_names, collapse=","))
internal_query = sprintf("cast(%s, %s)", internal_query, cast_schema)
all_names = c(cast_dim_names, cast_attr_names)
dim_names = cast_dim_names
}
# Apply dimensions as attributes, using unique names. Manually construct the list of resulting attributes:
dimensional_attributes = data.frame(name=dimensions$name, type="int64", nullable=FALSE) # original dimension names (used below)
internal_attributes = rbind(attributes, dimensional_attributes)
if (AIO == FALSE)
{
dim_apply = paste(gsub("^\\$", "X_", dim_names), dim_names, sep=",", collapse=",")
internal_query = sprintf("apply(%s, %s)", internal_query, dim_apply)
}
}
ns = rep("", length(internal_attributes$nullable))
ns[internal_attributes$nullable] = "null"
format_string = paste(paste(internal_attributes$type, ns), collapse=",")
format_string = sprintf("(%s)", format_string)
## Start running the query
if (DEBUG) message("Data query ", internal_query)
if (DEBUG) message("Format ", format_string)
sessionid = scidbquery.shim(
db,
internal_query,
save=format_string,
result_size_limit=RESULT_SIZE_LIMIT,
atts_only=ifelse(only_attributes, TRUE, ifelse(AIO, FALSE, TRUE)))
if (!is.null(attr(db, "connection")$session)) { # if session already exists
release = 0
} else { # need to get new session every time
release = 1;
}
if (release) {
on.exit(SGET.shim(db, "/release_session", list(id=sessionid), err=FALSE),
add=TRUE)
}
## Fetch the query output data in binary format
dt2 = proc.time()
uri = URI(db, "/read_bytes", list(id=sessionid, n=0))
h = .create_curl_handle.shim(db, request_type="GET", uri=uri)
.TraceHttpRequestSent(h, "GET", uri)
resp = curl_fetch_memory(uri, h)
.TraceHttpResponseReceived(h, "GET", uri, resp)
if (resp$status_code > 299) stop("HTTP error", resp$status_code)
if (DEBUG) message("Data transfer time ", round((proc.time() - dt2)[3], 4))
## Unpack the binary data into a data.table
dt1 = proc.time()
ans = .Binary2df(resp$content, internal_attributes, buffer_size, INT64)
if (DEBUG) message("Total R parsing time ", round( (proc.time() - dt1)[3], 4))
## If there was no data, return an empty dataframe with the query's schema
if (is.null(ans)) {
return(.Schema2EmptyDf(attributes,
if (only_attributes) NULL else dimensions))
}
## Special behavior for "binary" datatype:
## if any column has type "binary", don't return a dataframe; instead, return
## a list containing just that binary value. Don't permute the columns
## to put the dimension(s) first; the binary value should be the first element
## in the returned list. See issue #163 and the test for issue #163 in
## tests/a.R .
if (typeof(ans) == "list" && "binary" %in% internal_attributes$type) {
return(ans)
}
## Permute dimension cols, see issue #125
if (only_attributes) {
colnames(ans) = make.names_(attributes$name)
} else {
nd = length(dimensions$name)
i = ncol(ans) - nd
ans = ans[, c( (i+1):ncol(ans), 1:i)]
colnames(ans) = make.names_(c(dimensions$name, attributes$name))
}
return(ans)
}
#' @seealso Upload()
#' @noRd
Upload.shim <- function(db, payload, name=NULL, ...)
{
if (!has.chars(name)) {
name <- tmpnam(db)
}
if (inherits(payload, "raw")) {
X = (.raw2scidb.shim(db, payload, name=name, ...))
} else if (inherits(payload, "data.frame")) {
X = (.df2scidb.shim(db, payload, name=name, ...))
} else if (inherits(payload, "dgCMatrix")) {
X = (.matrix2scidb.shim(db, payload, name=name, ...))
} else {
X = (.matvec2scidb.shim(db, payload, name=name, ...))
}
return(X)
}
#' Return a shim session ID, or error
#' @param db_or_conn scidb database connection object,
#' _or_ its "connection" attribute
#' @return a string representing the ID of the session
#' @noRd
getSession.shim = function(db_or_conn)
{
session = SGET.shim(db_or_conn, "/new_session")
if (length(session)<1) stop("SciDB http session error; are you connecting to a valid SciDB host?")
session = gsub("\r", "", session)
session = gsub("\n", "", session)
session
}
#' Issue an HTTP GET request.
#' @param db_or_conn scidb database connection object,
#' _or_ its "connection" attribute
#' @param resource (string) A URI identifying the requested service
#' @param args (list) A list of named query parameters
#' @param err (boolean) If true, stop if the server returned an error code
#' @param binary (boolean) If TRUE, return binary data,
#' else convert the result to character
#' @return if binary is TRUE, the raw bytes of the response from the server;
#' if binary is FALSE, the text of the response from the server.
#' Stops with "HTTP error ${status}" if the response's HTTP status code
#' is outside of the 200 (successful) range.
#' @keywords internal
SGET.shim <- function(db_or_conn, resource, args=list(), err=TRUE, binary=FALSE)
{
if (!(substr(resource, 1, 1)=="/")) resource = paste("/", resource, sep="")
uri = URI(db_or_conn, resource, args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = .create_curl_handle.shim(db_or_conn, request_type="GET", uri=uri)
.TraceHttpRequestSent(h, "GET", uri)
ans = curl_fetch_memory(uri, h)
.TraceHttpResponseReceived(h, "GET", uri, ans)
if (ans$status_code > 299 && err)
{
msg = sprintf("HTTP error %s", ans$status_code)
if (ans$status_code >= 400) msg = sprintf("%s\n%s", msg, rawToChar(ans$content))
stop(msg)
}
if (binary) return(ans$content)
rawToChar(ans$content)
}
#' Construct a URI for connecting to the Shim.
#' This override adds special URL parameters for providing authentication
#' to the Shim.
#' @param db_or_conn scidb database connection object
#' _or_ its "connection" attribute
#' @param resource (string): A URI identifying the requested service (path)
#' @param args (list): A list of named query parameters to put at the end
#' of the constructed URI
#' @return (string) the new URI/URL
#' @seealso URI(), URI.default()
#' @keywords internal
URI.shim <- URI.shimconn <- function(db_or_conn, resource="", args=list())
{
conn <- .GetConnectionEnv(db_or_conn)
## For the shim, we need to pass authentication and admin settings
## in the URL parameters. (The direct HTTP API handles this differently.)
if (!is.null(conn$auth)) args = c(args, list(auth=conn$auth))
if (!is.null(conn$password)) args = c(args, list(password=conn$password))
if (!is.null(conn$username)) args = c(args, list(user=conn$username))
if (!is.null(conn$admin) && conn$admin) args = c(args, list(admin=1))
## Call the "inherited" (.default) method
return(URI.default(db_or_conn, resource, args))
}
#' Basic low-level query. Returns query id. This is an internal function.
#' @param db_or_conn a scidb database connection object,
#' _or_ its "connection" attribute
#' @param query a character query string
#' @param save format string for save() query, or NULL.
#' Example values: "dcsv", "csv+", "(double NULL, int32)"
#' @param session if you already have a SciDB http session, set this to it,
#' otherwise NULL
#' @param resp if true, return http response
#' @param prefix optional AFL statement to prefix query in the same connection
#' context
#' @return list(sessionid, response) if resp==TRUE, otherwise just sessionid
#' @keywords internal
scidbquery.shim = function(db_or_conn, query,
save=NULL,
result_size_limit=NULL,
session=NULL,
resp=FALSE,
prefix=NULL,
atts_only=TRUE)
{
trace <- .TraceEnterInternalFn("scidbquery", query=query, save=save,
result_size_limit=result_size_limit,
session=session, resp=resp,
prefix=prefix, atts_only=atts_only)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
conn <- .GetConnectionEnv(db_or_conn)
prefix <- prefix %||% conn$prefix
DEBUG = FALSE
DEBUG = getOption("scidb.debug", FALSE)
release = 0
if (!is.null(conn$session)) {
session = conn$session
} else {
msg.debug("[Shim session] created new session")
}
sessionid = session
if (is.null(session)) {
sessionid = getSession.shim(conn)
}
if (is.null(save)) save=""
if (is.null(result_size_limit)) result_size_limit=""
if (DEBUG)
{
message(query)
t1 = proc.time()
}
ans = tryCatch(
{
args = list(id=sessionid, afl=0L, query=query)
args$release = release
args$prefix = c(getOption("scidb.prefix"), prefix)
if (!is.null(args$prefix)) args$prefix = paste(args$prefix, collapse=";")
args$save = save
args$result_size_limit = result_size_limit
if (!is.null(args$save)) args$atts_only=ifelse(atts_only, 1L, 0L)
do.call("SGET.shim",
args=list(conn, resource="/execute_query", args=args))
}, error=function(e)
{
SGET.shim(conn, "/cancel", list(id=sessionid), err=FALSE)
if (release) {
SGET.shim(conn, "/release_session", list(id=sessionid), err=FALSE)
}
e$call = NULL
stop(e)
}, interrupt=function(e)
{
SGET.shim(conn, "/cancel", list(id=sessionid), err=FALSE)
if (release) {
SGET.shim(conn, "/release_session", list(id=sessionid), err=FALSE)
}
stop("cancelled")
}, warning=invisible)
if (DEBUG) message("Query time ", round( (proc.time() - t1)[3], 4))
if (resp) return(list(session=sessionid, response=ans))
sessionid
}
#' Return a SciDB query expression as a data frame
#' @param db scidb database connection object
#' @param query A SciDB query expression or scidb object
#' @param ... optional extra arguments (see below)
#' @note option extra arguments
#' \itemize{
#' \item{only_attributes}{ optional logical value, TRUE if only attributes should be returned}
#' \item{schema}{ optional result schema string }
#' }
#' @keywords internal
scidb_arrow_to_dataframe.shim = function(db, query, ...) {
trace <- .TraceEnterInternalFn("scidb_arrow_to_dataframe",
query=query, ...)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
INT64 = attr(db, "connection")$int64
DEBUG = getOption("scidb.debug", FALSE)
RESULT_SIZE_LIMIT = getOption("scidb.result_size_limit", 256)
AIO = getOption("scidb.aio", TRUE)
if (!AIO) {
stop("AIO Must be TRUE for Arrow")
}
args = list(...)
# TODO: Look into this, but guarantees we have only_atts
lazyeval_ret = lazyeval(db, query)
args$only_attributes = if (is.null(args$only_attributes)) {
dist = lazyeval_ret$distribution
if(is.na(dist)) FALSE else if(dist == 'dataframe') TRUE else FALSE
} else {
args$only_attributes
}
# Get the atts and dims so we can filter the results
if (!inherits(query, "scidb"))
{
# make a scidb object out of the query, optionally using a supplied schema to skip metadata query
if (is.null(args$schema)) {
query = if(is.na(lazyeval_ret$schema)) {
scidb(db, query)
} else {
scidb(db, query, schema=lazyeval_ret$schema)
}
} else {
query = scidb(db, query, schema=args$schema)
}
}
attributes = schema(query, "attributes")
dimensions = schema(query, "dimensions")
query = query@name
# Make the scidbquery
if (DEBUG) message("Data query ", query)
# if (DEBUG) message("Format ", format_string)
sessionid = scidbquery.shim(
db,
query,
save="arrow",
result_size_limit=RESULT_SIZE_LIMIT,
atts_only=ifelse(args$only_attributes, TRUE, ifelse(AIO, FALSE, TRUE)))
if (!is.null(attr(db, "connection")$session)) { # if session already exists
release = 0
} else { # need to get new session every time
release = 1;
}
if (release) {
on.exit(SGET.shim(db, "/release_session", list(id=sessionid), err=FALSE),
add=TRUE)
}
dt2 = proc.time()
uri = URI(db, "/read_bytes", list(id=sessionid, n=0))
h = .create_curl_handle.shim(db, request_type="GET", uri=uri)
.TraceHttpRequestSent(h, "GET", uri)
resp = curl_fetch_memory(uri, h)
.TraceHttpResponseReceived(h, "GET", uri, resp)
if (resp$status_code > 299) stop("HTTP error", resp$status_code)
if (DEBUG) message("Data transfer time ", round((proc.time() - dt2)[3], 4))
if (DEBUG) message("Data size ", length(resp$content))
dt1 = proc.time()
res <- .Arrow2df(resp$content)
if (DEBUG) message("Total R parsing time ", round( (proc.time() - dt1)[3], 4))
# Reorganize
if (!args$only_attributes) {
res <- res[, c(dimensions$name, attributes$name)]
}
return(res)
}
#' Internal function to upload an R raw value to special 1-element SciDB array
#' @param db scidb database connection
#' @param X a raw value
#' @param name (character) SciDB array name
#' @param gc (boolean) set to \code{TRUE} to connect SciDB array to R's garbage collector
#' @param ... optional extra arguments
#' \itemize{
#' \item {temp:} {(boolean) create a temporary SciDB array}
#' }
#' @return a \code{\link{scidb}} object
#' @keywords internal
.raw2scidb.shim = function(db, X, name, gc=TRUE, ...)
{
trace <- .TraceEnterInternalFn("raw2scidb", X=X,
name=name,
gc=gc, ...)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
if (!is.raw(X)) stop("X must be a raw value")
args = list(...)
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession.shim(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) {
on.exit(SGET.shim(db, "/release_session", list(id=session), err=FALSE),
add=TRUE)
}
bytes = .Call(C_scidb_raw, X)
ans = .Post.shim(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
schema = "<val:binary null>[i=0:0:0:1]"
if (!is.null(args$temp))
{
if (args$temp) create_temp_array(db, name, schema)
}
query = sprintf("store(input(%s,'%s',-2,'(binary null)'),%s)", schema, ans, name)
iquery(db, query)
return(scidb(db, name, gc=gc))
}
#' Internal function to upload an R data frame to SciDB
#' @param db scidb database connection
#' @param X a data frame
#' @param name SciDB array name
#' @param chunk_size optional value passed to the aio_input operator see https://github.com/Paradigm4/accelerated_io_tools
#' @param types SciDB attribute types
#' @param gc set to \code{TRUE} to connect SciDB array to R's garbage collector
#' @return a \code{\link{scidb}} object, or a character schema string if \code{schema_only=TRUE}.
#' @keywords internal
.df2scidb.shim = function(db, X,
name=tmpnam(db),
types=NULL,
use_aio_input=FALSE,
chunk_size=NULL,
gc=TRUE,
temp=FALSE,
start=NULL,
...)
{
trace <- .TraceEnterInternalFn("df2scidb", X=X,
name=name, types=types,
use_aio_input=use_aio_input,
chunk_size=chunk_size,
gc=gc,
temp=temp,
start=start,
...)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
if (!is.data.frame(X)) stop("X must be a data frame")
## Preprocess the dataframe to prepare it for upload
processed = .PreprocessDfTypes(X, types, use_aio_input)
X = processed$df
ncolX = ncol(X)
nrowX = nrow(X)
anames = names(X)
typ = processed$attr_types
aio_apply_args = processed$aio_apply_args
## Serialize the dataframe to TSV format
data = charToRaw(.TsvWrite(X, file=.Primitive("return")))
## Obtain a session from the SciDB http service for the upload process
if (!is.null(attr(db, "connection")$session)) {
## session already exists
session = attr(db, "connection")$session
} else {
## need to get new session
session = getSession.shim(db)
if (length(session)<1) stop("SciDB http session error")
on.exit(SGET.shim(db, "/release_session", list(id=session), err=FALSE),
add=TRUE)
}
tmp = .Post.shim(db, data, list(id=session))
tmp = gsub("\n", "", gsub("\r", "", tmp))
## Generate a load_tools query
aio = length(grep("aio_input", names(db))) > 0
if (use_aio_input && aio) {
aioSettings = list(num_attributes = ncolX)
if (!is.null(chunk_size)) {
aioSettings[['chunk_size']] = chunk_size
}
LOAD = sprintf("project(apply(aio_input('%s', %s),%s),%s)",
tmp,
get_setting_items_str(db, aioSettings),
aio_apply_args,
paste(anames, collapse=","))
} else {
LOAD = sprintf("input(%s, '%s', -2, 'tsv')",
dfschema(anames, typ, nrowX, chunk=chunk_size, start=start),
tmp)
}
if (temp) {
## Use scidb temporary array instead of regular versioned array
targetArraySchema = lazyeval(db, LOAD)$schema
create_temp_array(db, name, schema = targetArraySchema)
}
query = sprintf("store(%s,%s)", LOAD, name)
Execute(db, query, session=session)
return(scidb(db, name, gc=gc))
}
#' Internal function to upload an R sparse matrix into SciDB
#' @param db scidb database connection
#' @param X a sparse matrix
#' @param name (character) SciDB array name
#' @param rowChuckSize,colChunkSize (int) optional value passed to the aio_input operator see https://github.com/Paradigm4/accelerated_io_tools
#' @param start (int) dimension start values
#' @param temp (boolean) create a temporary SciDB array
#' @param gc (boolean) set to \code{TRUE} to connect SciDB array to R's garbage collector
#' @return a \code{\link{scidb}} object
#' @keywords internal
.matrix2scidb.shim = function(db, X, name,
rowChunkSize=1000,
colChunkSize=1000,
start=NULL,
temp=FALSE,
gc=TRUE,
...)
{
trace <- .TraceEnterInternalFn(".matrix2scidb.shim", X=X, name=name,
rowChunkSize=rowChunkSize,
colChunkSize=colChunkSize,
start=if (missing(start)) NULL else start,
temp=temp, gc=gc, ...)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
D = dim(X)
if (length(start) == 0) start = c(0, 0)
if (length(start) > 2) start = start[1:2]
if (length(start) < 2) start = c(start, 0)
start = as.integer(start)
type = .scidbtypes[[typeof(X@x)]]
if (is.null(type)) {
stop(paste("Unsupported data type. The package presently supports: ",
paste(.scidbtypes, collapse=" "), ".", sep=""))
}
if (type != "double") stop("Sorry, the package only supports double-precision sparse matrices right now.")
schema = sprintf(
"<val:%s null>[i=%.0f:%.0f:0:%.0f; j=%.0f:%.0f:0:%.0f]",
type,
start[[1]],
nrow(X)-1+start[[1]],
min(nrow(X), rowChunkSize),
start[[2]],
ncol(X)-1+start[[2]],
min(ncol(X), colChunkSize))
schema1d = sprintf("<i:int64 null, j:int64 null, val:%s null>[idx=0:*:0:100000]", type)
# Compute the indices and assemble message to SciDB in the form
# double, double, double for indices i, j and data val.
dp = diff(X@p)
j = rep(seq_along(dp), dp) - 1
bytes = .Call(C_scidb_raw, as.vector(t(matrix(c(X@i + start[[1]], j + start[[2]], X@x), length(X@x)))))
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession.shim(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) {
on.exit(SGET.shim(db, "/release_session", list(id=session), err=FALSE),
add=TRUE)
}
# Upload the data
ans = .Post.shim(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
# Create a temporary array 'name'
if(temp){ # Use scidb temporary array instead of regular versioned array
targetArraySchema = schema
create_temp_array(db, name, schema = targetArraySchema)
}
# redimension into a matrix
query = sprintf("store(redimension(input(%s,'%s',-2,'(double null,double null,double null)'),%s),%s)", schema1d, ans, schema, name)
iquery(db, query)
scidb(db, name, gc=gc)
}
#' Internal function to upload an R vector, dense n-d array or matrix to SciDB
#' @param db scidb database connection
#' @param X a vector, dense n-d array or matrix
#' @param name (character) SciDB array name
#' @param start (int) dimension start value
#' @param gc (boolean) set to TRUE to connect SciDB array to R's garbage collector
#' @param temp (boolean) create a temporary SciDB array
#' @param ... optional extra arguments.
#' \itemize{
#' \item {attr: } {attribute name}
#' \item {reshape: } {(boolean) to control reshape}
#' \item {type: } {(character) desired data type - however, limited type conversion available}
#' \item {max_byte_size: } {(int) maximum size of each block (in bytes) while uploading vectors in
#' a multi-part fashion. Minimum block size is 8 bytes. }
#' }
#' @return a \code{\link{scidb}} object
#' @keywords internal
.matvec2scidb.shim = function(db, X,
name=tmpnam(db),
start=NULL,
gc=TRUE,
temp=FALSE, ...)
{
trace <- .TraceEnterInternalFn("matvec2scidb", X=X, name=name,
start=if (missing(start)) NULL else start,
gc=gc, temp=temp, ...)
on.exit(.TraceExit(trace, returnValue()), add=TRUE)
# Check for a bunch of optional hidden arguments
args = list(...)
attr_name = "val"
if (!is.null(args$attr)) attr_name = as.character(args$attr) # attribute name
processed = .PreprocessArrayType(X, type=args$type)
X = processed$array
## attr_type: the type of the attribute
attr_type = processed$attr_type
## load_type: the type used internally for loading the data
load_type = processed$load_type
do_reshape = TRUE
if (!is.null(args$reshape)) do_reshape = as.logical(args$reshape) # control reshape
chunkSize = c(min(1000L, nrow(X)), min(1000L, ncol(X)))
chunkSize = as.numeric(chunkSize)
if (length(chunkSize) == 1) chunkSize = c(chunkSize, chunkSize)
overlap = c(0, 0)
if (is.null(start)) start = c(0, 0)
start = as.numeric(start)
if (length(start) ==1) start = c(start, start)
D = dim(X)
start = as.integer(start)
overlap = as.integer(overlap)
dimname = make.unique_(attr_name, "i")
DEBUG = getOption("scidb.debug", FALSE)
if (is.null(D))
{
# X is a vector
do_reshape = FALSE
chunkSize = min(chunkSize[[1]], length(X))
X = as.matrix(X)
block_size = .get_multipart_post_load_block_size(
data = X,
debug = DEBUG,
max_byte_size = if(is.null(args$max_byte_size)) getOption('scidb.max_byte_size', 500*(10^6)) else args$max_byte_size)
# Define schema for an initial SciDB upload and provide a template to
# load subsequent blocks of the vector
schema = sprintf(
"< %s : %s null> [%s=%.0f:%.0f,%.0f,%.0f]",
attr_name, attr_type,
dimname,
start[[1]],
nrow(X) - 1 + start[[1]],
min(nrow(X), chunkSize),
overlap[[1]])
load_schema = schema
# Define a temporary schema for multi-part loading of blocks of the vector
temp_schema = sprintf(
"< %s : %s null> [%s=%.0f:%.0f,%.0f,%.0f]",
attr_name, attr_type,
dimname,
start[[1]],
min((nrow(X) - 1 + start[[1]]), (block_size - 1 + start[[1]])),
min(nrow(X), chunkSize),
overlap[[1]])
} else if (length(D) > 2) {
# X is a dense n-d array
ndim = length(D)
chunkSize = rep(floor(10e6 ^ (1 / ndim)), ndim)
start = rep(0, ndim)
end = D - 1
dimNames = make.unique_(attr_name, paste("i", 1:length(D), sep=""))
schema = sprintf(
"< %s : %s null >[%s]",
attr_name, attr_type,
paste(sprintf( "%s=%.0f:%.0f,%.0f,0", dimNames, start, end, chunkSize),
collapse=","))
load_schema = sprintf(
"<%s:%s null>[__row=1:%.0f,1000000,0]", attr_name, attr_type, length(X))
} else {
# X is a matrix
schema = sprintf(
"< %s : %s null> [i=%.0f:%.0f,%.0f,%.0f, j=%.0f:%.0f,%.0f,%.0f]",
attr_name, attr_type,
start[[1]], nrow(X) - 1 + start[[1]], chunkSize[[1]], overlap[[1]],
start[[2]], ncol(X) - 1 + start[[2]], chunkSize[[2]], overlap[[2]])
load_schema = sprintf("<%s:%s null>[__row=1:%.0f,1000000,0]",
attr_name, attr_type, length(X))
}
if (!is.array(X)) stop ("X must be an array or vector")
td1 = proc.time()
# Obtain a session from shim for the upload process
if (!is.null(attr(db, "connection")$session)) { # if session already exists
session = attr(db, "connection")$session
release = 0
} else { # need to get new session every time
session = getSession.shim(db)
if (length(session)<1) stop("SciDB http session error")
release = 1;
}
if (release) {
on.exit(SGET.shim(db, "/release_session", list(id=session), err=FALSE),
add=TRUE)
}
shimcon_time = round((proc.time() - td1)[3], 4)
if(is.null(D)) {
.multipart_post_load.shim(db, session,
X, name,
load_schema, schema, load_type,
temp_schema, block_size,
temp, DEBUG, shimcon_time)
} else {
td2 = proc.time()
bytes = .Call(C_scidb_raw, as.vector(aperm(X)))
ans = .Post.shim(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
post_time = round((proc.time() - td2)[3], 4)
if (DEBUG)
{
message("Data upload time ", (shimcon_time + post_time))
}
# Create a temporary array 'name'
if(temp){ # Use scidb temporary array instead of regular versioned array
targetArraySchema = schema
create_temp_array(db, name, schema = targetArraySchema)
}
# Load query
if (do_reshape) {
query = sprintf("store(reshape(input(%s,'%s', -2, '(%s null)'),%s),%s)",
load_schema, ans, load_type, schema, name)
} else {
query = sprintf("store(input(%s,'%s', -2, '(%s null)'),%s)",
load_schema, ans, load_type, name)
}
iquery(db, query)
}
scidb(db, name, gc=gc)
}
.multipart_post_load.shim <- function(db, session,
data, name,
load_schema, schema, type,
temp_schema, block_size,
temp, debug, shimcon_time)
{
total_length = as.numeric(length(data))
# Create a temporary array 'name'
if(temp){ # Use scidb temporary array instead of regular versioned array
targetArraySchema = schema
create_temp_array(db, name, schema = targetArraySchema)
}
# Declare a numeric variable for storing post time
post_time = 0
for(begin in seq(1, total_length, block_size)) {
end = min((begin + block_size -1), total_length)
# convert data to raw
td = proc.time()
data_part = as.matrix(data[begin:end])
bytes = .Call(C_scidb_raw, as.vector(aperm(data_part)))
post_time = post_time + round((proc.time() - td)[3], 4)
# upload data
ans = .Post.shim(db, bytes, list(id=session))
ans = gsub("\n", "", gsub("\r", "", ans))
if(debug) message("Uploading block ", ceiling(begin/block_size), " of ", ceiling(total_length/block_size))
# load query
if(begin == 1) {
query = sprintf("store(input(%s,'%s', -2, '(%s null)'), %s)", load_schema, ans, type, name)
iquery(db, query)
} else {
query = sprintf("input(%s,'%s', -2, '(%s null)')", temp_schema, ans, type)
iquery(db, sprintf("append(%s, %s, i)", query, name))
}
}
if(debug) message("Data upload time ", shimcon_time + post_time)
return(NULL)
}
# Normally called with raw data and args=list(id=whatever)
.Post.shim = function(db, data, args=list(), err=TRUE)
{
# check for new shim simple post option (/upload), otherwise use
# multipart/file upload (/upload_file)
shimspl = strsplit(attr(db, "connection")$scidb.version, "\\.")[[1]]
shim_yr = tryCatch(as.integer(gsub("[A-z]", "", shimspl[1])), error=function(e) 16, warning=function(e) 8)
shim_mo = tryCatch(as.integer(gsub("[A-z]", "", shimspl[2])), error=function(e) 16, warning=function(e) 8)
if (is.na(shim_yr)) shim_yr = 16
if (is.na(shim_mo)) shim_mo = 8
simple = (shim_yr >= 15 && shim_mo >= 7) || shim_yr >= 16
if (simple)
{
uri = URI(db, "/upload", args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = .create_curl_handle.shim(db, request_type="POST",
uri=uri, postdata=data)
.TraceHttpRequestSent(h, "POST", uri, data)
ans = curl_fetch_memory(uri, h)
.TraceHttpResponseReceived(h, "POST", uri, ans)
if (ans$status_code > 299 && err) stop("HTTP error ", ans$status_code)
return(rawToChar(ans$content))
}
uri = URI(db, "/upload_file", args)
uri = oldURLencode(uri)
uri = gsub("\\+", "%2B", uri, perl=TRUE)
h = .create_curl_handle.shim(db, request_type="POST", uri=uri)
tmpf = tempfile()
## Trace the request before the text data gets converted to raw data
.TraceHttpRequestSent(h, "POST", uri, data)
if (is.character(data)) data = charToRaw(data)
writeBin(data, tmpf)
handle_setform(h, file=form_file(tmpf))
ans = curl_fetch_memory(uri, h)
.TraceHttpResponseReceived(h, "POST", uri, ans)
unlink(tmpf)
if (ans$status_code > 299 && err) stop("HTTP error", ans$status_code)
return(rawToChar(ans$content))
}
#' Create a Curl Handle to be Used in DB Queries
#' @param db_or_conn a scidb database connection object,
#' _or_ its "connection" attribute
#' @param request_type string for type of request (e.g. "POST", "GET")
#' @param uri base SciDB URI
#' @param postdata raw data to be posted. default is `NULL`
#' @return a curl handle. will include custom options if set by user
#' @keywords internal
#' @importFrom curl new_handle handle_setheaders handle_setopt
.create_curl_handle.shim <- function(db_or_conn, request_type, uri, postdata=NULL)
{
h = new_handle()
h_header_and_opt = .formulate_curl_handle_and_opts.shim(
db_or_conn,
request_type=request_type,
uri=uri,
postdata=postdata
)
handle_setheaders(
h,
.list=h_header_and_opt$headers
)
handle_setopt(
h,
.list=h_header_and_opt$options
)
return(h)
}
#' create the header and options for the to-be-made curl handle
#' @param db_or_conn a scidb database connection object,
#' _or_ its "connection" attribute
#' @param request_type string for type of request (e.g. "POST", "GET")
#' @param uri base SciDB URI
#' @param postdata raw data to be posted. default is `NULL`
#' @return named list with "headers" and "options". both attributes are lists
#' @keywords internal
.formulate_curl_handle_and_opts.shim = function(db_or_conn, request_type, uri, postdata=NULL)
{
# Default Behavior
# dont' have these get overwritten
handle_headers = list(
`Authorization`=.digest_auth.shim(db_or_conn, request_type, uri)
)
handle_options = list(
ssl_verifyhost=as.integer(getOption("scidb.verifyhost", 0)),
ssl_verifypeer=as.integer(getOption("scidb.verifypeer", 0)),
http_version=2
)
if(!is.null(postdata)){
handle_options$post = TRUE
handle_options$postfields = postdata
handle_options$postfieldsize = length(postdata)
}
# if there are options set, check them first
options_check = check_curl_options()
# concatenate extra options as needed
if(identical(TRUE, options_check)){
handle_options = c(
handle_options,
getOption("scidb.curl_options")
)
}
# package up headers and options
return(
list(
headers = handle_headers,
options = handle_options
)
)
}
#' Convenience function for digest authentication.
#' @param db_or_conn a scidb database connection object,
#' _or_ its "connection" attribute
#' @param method digest method
#' @param uri uri
#' @param realm realm
#' @param nonce nonce
#' @keywords internal
#' @importFrom digest digest
.digest_auth.shim = function(db_or_conn, method, uri, realm="", nonce="123456")
{
.scidbenv = .GetConnectionEnv(db_or_conn)
if (!is.null(.scidbenv$authtype))
{
if (.scidbenv$authtype != "digest") return("")
}
uri = gsub(".*/", "/", uri)
userpwd = .scidbenv$digest
if (is.null(userpwd)) userpwd=":"
up = strsplit(userpwd, ":")[[1]]
user = up[1]
pwd = up[2]
if (is.na(pwd)) pwd=""
ha1=digest(sprintf("%s:%s:%s", user, realm, pwd), algo="md5", serialize=FALSE)
ha2=digest(sprintf("%s:%s", method, uri), algo="md5", serialize=FALSE)
cnonce="MDc1YmFhOWFkY2M0YWY2MDAwMDBlY2JhMDAwMmYxNTI="
nc="00000001"
qop="auth"
response=digest(sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, qop, ha2), algo="md5", serialize=FALSE)
sprintf('Digest username="%s", realm=%s, nonce="%s", uri="%s", cnonce="%s", nc=%s, qop=%s, response="%s"', user, realm, nonce, uri, cnonce, nc, qop, response)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.