R/httpapi.R

Defines functions .ReportTimings .SanitizeUri .EndpointUri .ParseLinkHeaders .HttpRequest .HandleHttpError .SaveCookies .CurlAddAttachments .CurlSetHeaders .CurlUseCookies .CookiesToTsv .CurlSetOptions .gc.httpapi .CloseSession.httpapi Next.httpquery .gc.httpquery Close.httpquery New.httpquery .UploadWithQuery.httpapi .UploadDenseDimensioned.httpapi .UploadVector.httpapi .UploadSparseMatrix.httpapi .UploadDf.httpapi .UploadRaw.httpapi Upload.httpapi .Desc2Schema .Dims.UploadDesc .Attrs.UploadDesc as.list.UploadDesc BinaryQuery.httpapi TextQuery.httpapi Execute.httpapi Query.httpapi Close.httpapi Reauthenticate.httpapi EnsureSession.httpapi NewSession.httpapi GetServerVersion.httpapi

Documented in as.list.UploadDesc .Attrs.UploadDesc BinaryQuery.httpapi Close.httpquery .CloseSession.httpapi .CookiesToTsv .CurlAddAttachments .CurlSetHeaders .CurlSetOptions .CurlUseCookies .Desc2Schema .Dims.UploadDesc .EndpointUri .HandleHttpError .HttpRequest New.httpquery Next.httpquery .ParseLinkHeaders .ReportTimings .SaveCookies TextQuery.httpapi .UploadDenseDimensioned.httpapi .UploadDf.httpapi Upload.httpapi .UploadRaw.httpapi .UploadSparseMatrix.httpapi .UploadVector.httpapi .UploadWithQuery.httpapi

###
### @P4COPYRIGHT_SINCE:2022@
###

# File: httpapi.R
# Implementation layer for calls to the shim-less SciDB HTTP API
#   available in SciDB 22.6 and later.

# We support these versions of the HTTP API. If SciDB adds more versions,
# add them here like c(1,2,3). Remove versions from the list when the SciDBR
# code no longer supports them.
httpapi_supported_versions <- c(1)

#' @seealso GetServerVersion()
#' @importFrom rlang env_clone
#' @noRd
GetServerVersion.httpapi <- function(db)
{
  conn <- attr(db, "connection")
  
  ## Clone the connection and unset the username/password, because:
  ## (1) the /version endpoint doesn't need a username/password;
  ## (2) the /version endpoint doesn't perform authentication or return cookies;
  ## (3) if we passed a username/password through .HttpRequest, it would 
  ##     unset conn's password because it assumes the request will
  ##     authenticate and return cookies.
  conn_clone <- rlang::env_clone(conn)
  conn_clone$username <- NULL
  conn_clone$password <- NULL
  resp <- .HttpRequest(conn_clone, "GET", .EndpointUri(db, "version"))
  stopifnot(resp$status_code == 200)  # 200 OK
  json <- jsonlite::fromJSON(rawToChar(resp$content))
  
  conn$scidb.version <- .ParseVersion(json[["scidbVersion"]])

  ## The API versions that the server supports
  server_api_versions <- json[["apiMinVersion"]]:json[["apiVersion"]]
  ## The API versions that this client and the server both support
  compatible_versions <- intersect(server_api_versions, 
                                   httpapi_supported_versions)
  if (length(compatible_versions) == 0) {
    stop("No compatible API versions found with this server.",
         " The server supports versions ", server_api_versions, 
         " and I support versions ", httpapi_supported_versions)
  }
  ## The newest API version that both the client and server support
  conn$httpapi_version <- max(compatible_versions)
  
  ## If we got this far, the request succeeded - so the connected host and port
  ## supports the SciDB httpapi (no Shim).
  class(conn) <- "httpapiconn"
  class(db) <- c("httpapi", "afl")
  
  ## We need to return db because we have modified its class,
  ## and db is pass-by-value (it isn't an env)
  return(db)
}

#' @seealso NewSession()
#' @noRd
NewSession.httpapi <- function(db, ...)
{
  conn <- .GetConnectionEnv(db)
  if (is.null(conn)) stop("no connection environment")
  if (!at_least(conn$scidb.version, 22.6)) {
    stop("HTTP API is only supported for SciDB server version 22.6 and later;",
         " this server has version ", conn$scidb.version)
  }

  ## Post to /api/sessions to create a new session  
  resp <- .HttpRequest(conn, "POST", .EndpointUri(conn, "sessions"))
  stopifnot(resp$status_code == 201)  # 201 Created
  json <- jsonlite::fromJSON(rawToChar(resp$content))
  
  ## Response JSON contains the session ID
  sid <- json[["sid"]]

  ## Parse the Location and Link headers and store them
  conn$location <- resp$parsed_headers[["location"]]
  conn$links <- .ParseLinkHeaders(resp$parsed_headers)
  
  ## Give the connection the session ID.
  conn$session <- sid
  ## Give the connection a unique ID - in this case just reuse the session ID.
  ## But since temporary array names contain the connection ID,
  ## replace all non-identifier characters with underscores.
  conn$id <- make.names_(sid)

  ## Register the session to be closed when the connection env
  ## gets garbage-collected.
  reg.finalizer(conn, .gc.httpapi, onexit=TRUE)

  ## We don't need to return db or conn because the only object we have modified
  ## is the connection env which is pass-by-reference.
}

#' @seealso EnsureSession()
#' @noRd
EnsureSession.httpapi <- function(db, ...)
{
  conn <- .GetConnectionEnv(db)
  if (is.null(conn) || is.null(conn$session) || is.null(conn$links)
      || is.null(conn$location)) {
    NewSession.httpapi(db)
  }
}

#' @seealso Reauthenticate()
#' @noRd
Reauthenticate.httpapi <- function(db, password, defer=FALSE)
{
  conn <- .GetConnectionEnv(db)
  if (is.null(conn)) stop("no connection environment")
  if (!has.chars(password)) stop("no password provided")
  
  conn$password <- password
  
  if (!defer) {
    ## Run an arbitrary query to re-activate the connection
    ## with the new password
    iquery(db, "show('help()' , 'afl')", return=TRUE)
  }
}

#' @seealso Close()
#' @noRd
Close.httpapi <- function(db)
{
  .CloseSession.httpapi(attr(db, "connection"))
}

#' @seealso iquery()
#' @noRd
Query.httpapi <- function(db, query_or_scidb, 
                          `return`=FALSE,
                          binary=TRUE,
                          arrow=FALSE,
                          ...)
{
  if (!`return`) {
    Execute.httpapi(db, query_or_scidb, ...)
    return(invisible(NULL))
  }
  if (binary || arrow) {
    return(BinaryQuery.httpapi(db,
                               query_or_scidb,
                               arrow=arrow,
                               ...))
  }
  return(TextQuery.httpapi(db, query_or_scidb, ...))
}

#' @seealso Execute()
#' @noRd
Execute.httpapi <- function(db, query_or_scidb, ...)
{
  EnsureSession.httpapi(db, reconnect=FALSE)
  conn <- attr(db, "connection")
  query <- .GetQueryString(query_or_scidb)
  
  ## Start a new query
  ## Even though there is nothing to return, we still use the paged workflow
  ## because it allows us to cancel the query by issuing a DELETE request
  ## before it completes. This can change once SDB-7403 is addressed.
  timings <- c(start=proc.time()[["elapsed"]])
  http_query <- New.httpquery(conn, query)
  timings <- c(timings, prepare=proc.time()[["elapsed"]])
  
  on.exit(
    suspendInterrupts(
      tryCatch(Close.httpquery(http_query),
               error=function (err) {
                 warning("[SciDBR] Ignored error while closing query ", http_query$id, 
                         ": ", err)
               })),
    add=TRUE)
  
  if (http_query$is_selective) {
    first_operator <- strsplit(query, "(", fixed=TRUE)[[1]]
    stop("The AFL query returns data, so it should be executed using",
         " Query.httpapi() with return=TRUE. Query text: \"", query, "\"")
  }
  
  ## Fetch the first (and only) page from the query.
  ## We must do this even though the page won't contain any data, because
  ## any DDL action only gets committed if the query is fetched to completion.
  ## If there is an error, the on.exit() above will close the query,
  ## and the error will be propagated to the caller.
  empty_page <- Next.httpquery(http_query)
  timings <- c(timings, fetch=proc.time()[["elapsed"]])
  
  .ReportTimings(paste0("Timings for query ", http_query$id, ":"), 
                 timings)
  
  return(invisible(NULL))
}

#' Execute a query that outputs data in a text format. The data is returned
#' as an R data frame.
#' @param db scidb connection object from \code{\link{scidbconnect}}
#' @param query_or_scidb a SciDB query expression or scidb object
#' @param format (default: "csv+:l") a SciDB format string for a text format.
#'    This string must be an accepted option for the \code{save()}
#'    operator (if use_aio is omitted or FALSE) or the \code{aio_save()}
#'    operator (if use_aio is TRUE).
#' @param use_aio (logical; default: FALSE) if TRUE, use the
#'    \code{aio_save()} operator to construct the output. The 
#'    \code{accelerated_io_tools} library must be loaded in SciDB.
#'    NOTE: Due to SDB-7925, this parameter is currently ignored.
#' @param only_attributes (logical; default:FALSE) By default, the first N
#'    columns of the output are coordinates for the N dimensions of the 
#'    query's schema. If only_attributes is TRUE, these columns are not
#'    included.
#' @param ... extra arguments are ignored (allows some query functions to take
#'    optional arguments without causing errors in other query functions)
#' @return the query result as an R data frame, or NULL if the query
#'    did not produce a result
#' @keywords internal
TextQuery.httpapi <- function(db, query_or_scidb, 
                              format=NULL,
                              use_aio=NULL,
                              only_attributes=NULL,
                              ...)
{
  ## It turns out that Shim hasn't been using AIO for text formats
  ## since maybe 15.12 or even earlier. To support AIO with text formats,
  ## we'd need to fix SDB-7925 and accelerated-io-tools issue #65.
  ## Once those are fixed, remove the FALSE line and uncomment the next line.
  use_aio <- FALSE
  # use_aio <- use_aio %||% getOption("scidb.aio", FALSE)
  
  only_attributes <- only_attributes %||% FALSE
  if (!has.chars(format)) {
    ## If using AIO: aio_save doesn't understand csv+:l so we can't use it.
    ## But it adds a header line to its output when given format "csv+",
    ## so aio_save(format:'csv+') is equivalent to save(format:'csv+:l').
    format <- if (use_aio) "csv+" else "csv+:l"
  }
  if (use_aio && !startsWith(format, "aio_")) {
    format <- paste0("aio_", format)
  }

  EnsureSession.httpapi(db, reconnect=FALSE)
  conn <- attr(db, "connection")
  query <- .GetQueryString(query_or_scidb)
  
  if (use_aio && !only_attributes) {
    ## The HTTP API doesn't let us pass the "atts_only" parameter to
    ## aio_save, so instead we'll use flatten() to turn dims into attrs.
    ## An added advantage of this is that flatten() returns dims followed
    ## by attributes, so we won't need to permute columns after reading.
    query <- sprintf("flatten(%s)", query)
  }
  
  ## Start a new query
  timings <- c(start=proc.time()[["elapsed"]])
  http_query <- New.httpquery(conn, query, options=list(format=format))
  timings <- c(timings, prepare=proc.time()[["elapsed"]])
  
  ## Since this function currently fetches all of its results in one page,
  ## we can close the query as soon as this function exits.
  on.exit(
    suspendInterrupts(
      tryCatch(Close.httpquery(http_query),
               error=function (err) {
                 warning("[SciDBR] Ignored error while closing query ", http_query$id, 
                         ": ", err)
               })),
    add=TRUE)

  ## If this stop() triggers, it means someone called Query(`return`=TRUE)
  ## with a non-selective query (using a DDL or updating operator).
  ## For now, we stop() so callers must use the `return` parameter correctly,
  ## but in the future we could disable this check - in which case the function
  ## would just return NULL if the query was non-selective.
  if (!http_query$is_selective) {
    first_operator <- strsplit(query_or_scidb, "(", fixed=TRUE)[[1]]
    stop("The AFL query '", first_operator, "(...)' does not return data, ",
         "so it should be executed using Query.httpapi() with return=FALSE")
  }
  
  ## Fetch the first (and only) page from the query.
  ## We must do this even for a DDL query, because any DDL action
  ## only gets committed if the query is fetched to completion.
  ## If there is an error, the on.exit() above will close the query,
  ## and the error will be propagated to the caller.
  raw_page <- Next.httpquery(http_query)
  timings <- c(timings, fetch=proc.time()[["elapsed"]])
  
  df <- NULL
  if (http_query$is_selective && !is.null(raw_page)) {
    ## Convert the text data to an R dataframe
    df <- .Text2df(rawToChar(raw_page), format=format)
    timings <- c(timings, parse_csv=proc.time()[["elapsed"]])
  }
  
  .ReportTimings(paste0("Timings for query ", http_query$id, ":"), 
                 timings)
  
  return(if (is.null(df)) invisible(NULL) else df)
}

#' Execute a query that outputs data in Apache Arrow format or 
#' SciDB binary format. The data is returned as an R data frame.
#' This function takes the same arguments as \code{\link{BinaryQuery}}
#' but with extra optional arguments.
#' @param db scidb connection object from \code{\link{scidbconnect}}
#' @param query_or_scidb a SciDB query expression or scidb object
#' @param arrow (logical; default: FALSE) if TRUE, output data in
#'    Apache Arrow format. This automatically sets use_aio to TRUE.
#' @param use_aio (logical; default: FALSE) if TRUE, use the
#'    \code{aio_save()} operator to construct the output. The 
#'    \code{accelerated_io_tools} library must be loaded in SciDB.
#' @param buffer_size (optional integer) Initial parse buffer size in bytes, 
#'    adaptively resized as needed. Larger buffers can be faster but consume
#'    more memory. Default size is determined by the connection implementation.
#' @param only_attributes (logical; default:FALSE) By default, the first N
#'    columns of the output are coordinates for the N dimensions of the 
#'    query's schema. If only_attributes is TRUE, these columns are not
#'    included.
#' @param schema (optional string) the SciDB schema that the query is
#'    expected to produce; if not supplied, use the actual schema returned
#'    by the query. In the HTTP API, there is no longer any advantage
#'    to supplying the schema ahead of time; this argument is only accepted
#'    for backward compatibility.
#' @param page_size (optional integer) maximum number of values to return per
#'    page of output. Normally this function reads all pages before returning,
#'    so this is treated similarly to buffer_size.
#'    NOTE: Due to SDB-7836, SciDB does not support paging for binary or Arrow
#'    data formats, so this argument is currently ignored.
#' @param npages (optional integer) maximum number of pages to return.
#'    NOTE: Due to SDB-7836, SciDB does not support paging for binary or Arrow
#'    data formats, so this argument is currently ignored.
#' @param ... extra arguments are ignored (allows some query functions to take
#'    optional arguments without causing errors in other query functions)
#' @return the query result as an R data frame, or NULL if the query
#'    did not produce a result
#' @seealso BinaryQuery()
#' @keywords internal
BinaryQuery.httpapi <- function(db,
                                query_or_scidb, 
                                arrow=FALSE,
                                use_aio=NULL,
                                buffer_size=NULL,
                                only_attributes=NULL, 
                                schema=NULL,
                                page_size=NULL, 
                                npages=NULL,
                                ...)
{
  ## In SciDBR <=3.1.0, the semantics of binary=TRUE queries are different from 
  ## those of binary=FALSE queries. To remain interchangeable with that version
  ## and keep the code as simple as possible, Query and BinaryQuery are
  ## implemented using separate functions rather than trying to combine the
  ## different behaviors into a single function.

  ## TODO: SciDB's binary output format does not support paging yet (SDB-7836).
  ## Once that issue is fixed, we can set page_size to a non-null default here.

  only_attributes <- only_attributes %||% FALSE
  use_aio <- arrow || (use_aio %||% getOption("scidb.aio", FALSE))

  EnsureSession.httpapi(db, reconnect=FALSE)
  conn <- attr(db, "connection")
  query <- .GetQueryString(query_or_scidb)
  
  use_int64 <- conn$int64
  result_size_limit_mb <- getOption("scidb.result_size_limit", 256)
  result_size_limit_bytes <- result_size_limit_mb * 1000000
  
  format <- if (arrow) {
              "arrow"
            } else if (only_attributes || use_aio) {
              "binary"
            } else {
              ## binary+ is not supported by aio_save; instead we use the
              ## "flatten" hack below with format="binary"
              "binary+"
            }
  if (use_aio && !startsWith(format, "aio_")) {
    format <- paste0("aio_", format)
  }
  
  if (use_aio && !only_attributes) {
    ## The HTTP API doesn't let us pass the "atts_only" parameter to
    ## aio_save, so instead we'll use flatten() to turn dims into attrs.
    ## An added advantage of this is that flatten() returns dims followed
    ## by attributes, so we won't need to permute columns after reading.
    query <- sprintf("flatten(%s)", query)
  }
  
  ## Start a new query
  timings <- c(start=proc.time()[["elapsed"]])
  http_query <- New.httpquery(conn, query, 
                              options=list(format=format, 
                                           pageSize=page_size))
  timings <- c(timings, prepare=proc.time()[["elapsed"]])
  
  ## Since this function fetches all the pages it's interested in
  ## before returning, we can close the query as soon as this function exits.
  on.exit(
    suspendInterrupts(
      tryCatch(Close.httpquery(http_query),
               error=function (err) {
                 warning("[SciDBR] Ignored error while closing query ", http_query$id, 
                         ": ", err)
               })),
    add=TRUE)
  
  ## These are the columns we expect to be encoded in the response:
  ## dimensions (if requested), followed by attributes
  dimensions <- NULL
  attributes <- NULL
  ## If the caller provided a schema, it overrides the actual schema
  schema <- schema %||% http_query$schema
  if (!is.null(schema)) {
    dimensions <- .dimsplitter(schema)
    attributes <- .attsplitter(schema)
    
    cols <- NULL
    ## length(dimensions) can be 0 if the query returned a SciDB dataframe
    if (length(dimensions) > 0 && !only_attributes) {
      ## Use stringsAsFactors=FALSE because we want the dataframe to contain
      ## actual strings (instead of dictionary-encoding them, which would turn
      ## every column of the dataframe into an integer vector)
      cols <- data.frame(name=dimensions$name, type="int64", nullable=FALSE, 
                         stringsAsFactors=FALSE)
    }
    cols <- rbind(cols, attributes)
  }
  
  ## a %<?% b: return a<b, or TRUE if either is NULL or NA
  `%<?%` <- function(a, b) {
    if (is.null(a) || is.na(a) || is.null(b) || is.na(b)) TRUE else a < b
  }
  
  ## Fetch one page at a time
  page_number <- 0
  total_nbytes <- 0
  ans <- NULL
  while (page_number %<?% npages 
         && total_nbytes %<?% result_size_limit_bytes) {
    ## Fetch the next page
    page_number <- page_number + 1
    raw_page <- Next.httpquery(http_query)
    timings[[paste0("fetch_page_", page_number)]] <- proc.time()[["elapsed"]]

    page_nbytes <- length(raw_page)
    total_nbytes <- total_nbytes + page_nbytes

    if (is.null(raw_page) || !http_query$is_selective || page_nbytes == 0
        || is.null(schema)) {
      ## We already got the last page, or the result is empty.
      msg.trace("Query page ", page_number, " returned empty result",
                "(page_bytes=", page_nbytes, ")")
      break
    }

    ## Convert the binary data to a dataframe, and add it to the answer
    if (arrow) {
      page_df <- .Arrow2df(raw_page)
    } else {
      page_df <- .Binary2df(raw_page, cols, buffer_size, use_int64)
    }

    ## Special legacy behavior for "binary" datatype: 
    ## if any column has type "binary", don't return a dataframe; 
    ## instead, return a list containing that binary value as the _first_ item,
    ## and the dimension (if present) as the _second_ item. 
    ## Don't fetch any more pages (in fact, don't fetch anything but 
    ## the first row).
    ## See issue #163 and the test for issues #163 in tests/a.R,
    ## as well as the test for issue #195 with the "binary" datatype in a.R.
    if (typeof(page_df) == "list" && "binary" %in% attributes$type) {
      page_list <- page_df  # (it isn't a dataframe)
      if (only_attributes) {
        return(page_list)
      } else {
        ## Move the dimensions to the last columns.
        ndims <- length(dimensions$name)
        ncols <- length(cols$name)
        result <- page_list[c((ndims+1):ncols, 1:ndims)]
        return(result)
      }
    }
    timings[[paste0("parse_page_", page_number)]] <- proc.time()[["elapsed"]]

    if (is.null(ans)) {
      ans <- page_df
    } else {
      ans <- rbind(ans, page_df)
    }
    timings[[paste0("append_page_", page_number)]] <- proc.time()[["elapsed"]]

    page_ncols <- length(page_df)
    page_nrows <- if (page_ncols > 0) length(page_df[[1]]) else 0
    total_ncols <- length(ans)
    total_nrows <- if (total_ncols > 0) length(ans[[1]]) else 0

    msg.debug("Query page ", page_number, " returned ",
              page_nbytes, " bytes, ", 
              page_ncols, " columns, ",
              page_nrows, " rows;",
              " total so far: ", 
              total_nbytes, " bytes, ",
              total_ncols, " columns, ",
              total_nrows, " rows")

    ## If the number of results we received is less than the page size, 
    ## or if we didn't set a page size (which returns all results in one page),
    ## we know this is the last page. We can skip the final call to Next, since
    ## it will only return an empty page.
    if (is.null(page_size) || page_nrows < page_size) {
      msg.debug("Query page ", page_number, " returned ", 
                page_nrows, " rows ",
                "(page_size=", if (is.null(page_size)) "NULL" else page_size,
                "); assuming this is the last page")
      break
    }
  }
  .ReportTimings(paste0("Timings for query ", http_query$id, ":"), 
                 timings)
  
  if (is.null(ans)) {
    ## The response had no data. 
    ## If there was a schema, return an empty dataframe with the correct columns
    if (is.null(schema)) {
      msg.trace("Query returned no data and no schema; returning NULL")
      return(invisible())
    } else {
      msg.trace("Query returned no data; constructing empty dataframe")
      return(.Schema2EmptyDf(attributes, 
                             if (only_attributes) NULL else dimensions))
    }
  }

  ## TODO: For compatibility, uniqueify attribute names if !only_attributes

  return(ans)
}

## UploadDesc is an S4 class that serves as a standard form for metadata
## that can be sent to any upload method. It improves on the original approach
## in shim.R which was a set of ad-hoc argument names which were sometimes
## different in different methods.
##
## Because so much old SciDBR code uses is.null() to check for missing values,
## it's important to be able to turn any missing value into NULL (instead of a
## non-null empty vector).
setClassUnion("NumericOrNull",members=c("numeric", "NULL"))
setClassUnion("CharOrNull", members=c("character", "NULL"))
setClass("UploadDesc",
         slots=c(
           ## Name of the SciDB array that will result from the upload
           array_name="CharOrNull",
           ## If TRUE, the SciDB array will be temporary (not versioned)
           temp="logical",
           ## Name of the file used for uploading
           filename="CharOrNull",
           ## Name of each attribute
           attr_names="CharOrNull",
           ## SciDB type of each attribute
           attr_types="CharOrNull",
           ## Name of each dimension
           dim_names="CharOrNull",
           ## Start coordinate of each dimension
           dim_start_coordinates="NumericOrNull",
           ## Chunk size of each dimension
           dim_chunk_sizes="NumericOrNull"
         ))

#' Turn an UploadDesc into a named list.
#' This function exists to help with printing and debugging.
#' @param desc an UploadDesc
#' @return a list where each element is named after a slot in the
#'   original UploadDesc
#' @importFrom methods slot slotNames
#' @keywords internal
as.list.UploadDesc <- function(desc)
{
  return(mapply(function(x) {methods::slot(desc,x)},
                methods::slotNames("UploadDesc"),
                SIMPLIFY=FALSE))
}

#' Given an UploadDesc, return the attributes as a list of the form:
#'  list(list(name="attr_name_1", type="attr_type_1"),
#'       list(...), ...)
#' @param desc an UploadDesc
#' @param default_names an optional list of names to use as attribute names
#'   when the UploadDesc doesn't provide its own names
#' @param default_types an optional list of strings to use as attribute types
#'   when the UploadDesc doesn't provide its own types
#' @return a list of {name, type} lists
#' @keywords internal
.Attrs.UploadDesc <- function(desc, default_names=NULL, default_types=NULL)
{
  stopifnot(class(desc) == "UploadDesc")
  attrs <- mapply(
    function(name, type) {
      list(name=as.character(name), type=as.character(type))
    },
    desc@attr_names %||% default_names,
    desc@attr_types %||% default_types,
    SIMPLIFY=FALSE
  )
  return(attrs)
}

#' Given an UploadDesc, return the dimensions as a list of the form:
#'  list(list(name="dim_name", start=start, length=length, chunk=chunk),
#'       list(...), ...)
#' @param desc an UploadDesc
#' @param default_names an optional list of names to use as dimension names
#'   when the UploadDesc doesn't provide its own names
#' @param default_chunk_sizes an optional list of chunk sizes to use
#'   when the UploadDesc doesn't provide its own chunk sizes
#' @param lengths an optional list of chunk lengths; if not provided,
#'   each dimension has an unbounded length ("*")
#' @param make_unique if TRUE, dimension names are changed to make them
#'   unique from each other and from attribute names in the UploadDesc
#' @return a list of {name, start, end, chunk} lists
#' @keywords internal
.Dims.UploadDesc <- function(desc, 
                             default_names=NULL, 
                             default_chunk_sizes=NULL,
                             lengths=NULL,
                             make_unique=FALSE)
{
  dim_names <- desc@dim_names %||% default_names
  if (make_unique) {
    dim_names <- make.unique_(desc@attr_names, dim_names)
  }
  dims <- mapply(
    function(name, start, length, chunk) {
      list(name=as.character(name), 
           start=as.integer(start),
           end=(if (is.null(length)) NULL 
                else as.integer(start) + as.integer(length) - 1),
           chunk=as.integer(chunk))
    },
    dim_names,
    desc@dim_start_coordinates %||% replicate(length(dim_names), 0),
    lengths %||% replicate(length(dim_names), NULL),
    (desc@dim_chunk_sizes 
     %||% default_chunk_sizes 
     %||% replicate(length(dim_names), NULL)),
    SIMPLIFY=FALSE)
  return(dims)
}

#' Given an UploadDesc, return it as a SciDB schema string
#' @param db a scidb connection object from \code{\link{scidbconnect}}
#' @param desc an UploadDesc
#' @return (character) a SciDB schema string
#' @keywords internal
.Desc2Schema <- function(db, desc)
{
  attrs <- .Attrs.UploadDesc(desc)
  dims <- .Dims.UploadDesc(desc)
  return(make.schema(db, attrs, dims, array_name=desc@array_name %||% ""))
}

#' Upload R data and store() it into a SciDB array.
#' Return a scidb object wrapping the array.
#' This function takes the same arguments as \code{\link{Upload}}
#' but with extra optional arguments.
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload an R data frame, raw value, Matrix, matrix, or vector object
#' @param name a SciDB array name to store the payload into, or NULL to 
#'    generate a unique name
#' @param gc (logical) if TRUE, the SciDB array will be removed when the 
#'    return value gets garbage-collected. Set to FALSE to disconnect the SciDB
#'    array from R's garbage collector, i.e. to persist the SciDB array 
#'    beyond the lifetime of this session.
#' @param temp (logical) make the SciDB array a temporary array
#'    (only lasting for the lifetime of the session)
#' @param attr_names names for the uploaded attributes; if not provided,
#'    the default names will vary depending on the type of object being
#'    uploaded. \code{attrs} and \code{attr} are allowable (but deprecated)
#'    synonyms for this parameter, for legacy compatibility.
#' @param attr_types types of the uploaded attributes; if not provided,
#'    the types will be derived from the object being uploaded.
#'    \code{types} and \code{type} are allowable (but deprecated) synonyms
#'    for this parameter, for legacy compatibility.
#' @param dim_names names for the dimensions in the uploaded array;
#'    if not provided, the dimension names will vary depending on the type of
#'    object being uploaded.
#' @param dim_start_coordinates starting coordinates for each dimension;
#'    if not provided, dimension coordinates start at zero.
#'    \code{start} is an allowable (but deprecated) synonym for this parameter,
#'    for legacy compatibility.
#' @param dim_chunk_sizes chunk sizes for each dimension; if not provided,
#'    the default chunk sizes will vary depending on the type of object
#'    being uploaded. \code{chunk_size} is an allowable (but deprecated)
#'    synonym for this parameter, for legacy compatibility.
#' @param ... other options, see implementations for each uploaded type
#' @return A \code{scidb} object wrapping the uploaded SciDB array
#' @seealso Upload(), as.scidb()
#' @keywords internal
Upload.httpapi <- function(db,
                           payload, 
                           name=NULL,
                           gc=TRUE,
                           temp=FALSE, 
                           attr_names=NULL,
                           attr_types=NULL,
                           dim_names=NULL,
                           dim_start_coordinates=NULL,
                           dim_chunk_sizes=NULL,
                           ## Legacy arguments (deprecated)
                           attrs=NULL,      # -> attr_names
                           attr=NULL,       # -> attr_names
                           types=NULL,      # -> attr_types
                           type=NULL,       # -> attr_types
                           start=NULL,      # -> dim_start_coordinates
                           chunk_size=NULL, # -> dim_chunk_sizes
                           ...)
{
  if (is.null(name)) {
    name <- tmpnam(db)
  }
  ## Sanitize the array name
  name <- gsub("[^[:alnum:]]", "_", name)

  EnsureSession.httpapi(db, reconnect=FALSE)
  conn <- attr(db, "connection")
  
  ## Collect "name" and implicit args into an upload descriptor
  desc <- new("UploadDesc")
  desc@array_name <- name
  desc@temp <- temp
  
  ## Generate an upload filename of the form:
  ##  upload.<connection-id>.<array-name>.<random-suffix>
  desc@filename <- basename(tempfile(sprintf("upload.%s.%s.", conn$id, name)))
  
  ## Convert legacy parameter names (deprecated) to the new names
  ## Because so much SciDBR code uses is.null() to check for missing values,
  ## it's important to set missing values to NULL (instead of a non-null
  ## empty vector).
  desc@attr_names <- as.character(attr_names %||% attrs %||% attr) %||% NULL
  desc@attr_types <- as.character(attr_types %||% types %||% type) %||% NULL
  desc@dim_names <- as.character(dim_names) %||% NULL
  desc@dim_start_coordinates <- as.numeric(dim_start_coordinates 
                                           %||% start) %||% NULL
  desc@dim_chunk_sizes <- as.numeric(dim_chunk_sizes %||% chunk_size) %||% NULL

  if (inherits(payload, "raw")) {
    .UploadRaw.httpapi(db, payload, desc, ...)
  } else if (inherits(payload, "data.frame")) {
    .UploadDf.httpapi(db, payload, desc, ...)
  } else if (inherits(payload, "dgCMatrix")) {
    .UploadSparseMatrix.httpapi(db, payload, desc, ...)
  } else if (is.null(dim(payload))) {
    .UploadVector.httpapi(db, payload, desc, ...)
  } else {
    .UploadDenseDimensioned.httpapi(db, payload, desc, ...)
  }
  
  ## Wrap the SciDB array with a "scidb" object.
  ## If gc==TRUE, the SciDB array will be removed from SciDB when this object
  ## gets garbage-collected.
  return(scidb(db, name, gc=gc))
}

#' Upload an R raw value to a special 1-element SciDB array
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload a single-element SciDB array containing a raw value
#' @param desc an UploadDesc with metadata about the upload schema
#' @seealso .raw2scidb.shim()
#' @keywords internal
.UploadRaw.httpapi <- function(db, payload, desc)
{
  timings <- c(start=proc.time()[["elapsed"]])
  if (!is.raw(payload)) stop("payload must be a raw value")
  if (!is.present(desc)) stop("No upload descriptor provided")
  if (!has.chars(only(desc@array_name))) stop ("No array name provided")
  if (!has.chars(only(desc@filename))) stop("No upload filename provided")

  bytes <- .Call(C_scidb_raw, payload)
  timings <- c(timings, encode=proc.time()[["elapsed"]])

  schema <- sprintf("<%s:binary null>[%s=%d:%d:0:%d]",
                    first(desc@attr_names) %||% "val",
                    first(desc@dim_names) %||% "i",
                    first(desc@dim_start_coordinates) %||% 0,
                    first(desc@dim_start_coordinates) %||% 0,
                    first(desc@dim_chunk_sizes) %||% 1)
  
  query <- sprintf("store(input(%s, '%s', -2, '(binary null)'), %s)",
                   schema, 
                   only(desc@filename),
                   only(desc@array_name))
  
  content_type <- "application/octet-stream"
  
  .UploadWithQuery.httpapi(db, bytes, query, schema, content_type, desc)
  timings <- c(timings, upload=proc.time()[["elapsed"]])
  
  .ReportTimings(paste0("Timings for raw upload to array '", 
                        only(desc@array_name), "':"), 
                 timings)
}

#' Upload an R dataframe to SciDB
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload an R dataframe
#' @param desc an UploadDesc with metadata about the upload schema
#' @param use_aio_input if TRUE, use the \code{aio_input()} operator
#'   from the \code{accelerated_io_tools} library to process the upload
#' @seealso .df2scidb.shim()
#' @keywords internal
.UploadDf.httpapi <- function(db, payload, desc,
                              use_aio_input=FALSE)
{
  timings <- c(start=proc.time()[["elapsed"]])
  if (!is.data.frame(payload)) stop("payload must be a dataframe")
  if (!is.present(desc)) stop("No upload descriptor provided")
  if (!has.chars(only(desc@array_name))) stop ("No array name provided")
  if (!has.chars(only(desc@filename))) stop("No upload filename provided")
  
  ## Preprocess the dataframe to prepare it for upload
  processed <- .PreprocessDfTypes(payload, desc@attr_types, use_aio_input)
  payload <- processed$df
  aio_apply_args <- processed$aio_apply_args
  desc@attr_names <- names(payload)
  desc@attr_types <- processed$attr_types
  
  timings <- c(timings, preprocess=proc.time()[["elapsed"]])
  
  ## Serialize the dataframe to TSV format
  data <- .TsvWrite(payload, file=.Primitive("return"))
  timings <- c(timings, encode=proc.time()[["elapsed"]])

  ## Construct the schema
  schema <- dfschema(desc@attr_names,
                     desc@attr_types,
                     nrow(payload),
                     chunk=desc@dim_chunk_sizes,
                     start=desc@dim_start_coordinates,
                     dim_name=desc@dim_names)

  ## Construct the AFL query used to process the data after uploading
  has_aio <- (length(grep("aio_input", names(db))) > 0)
  if (use_aio_input && has_aio) {
    ## Use aio_input() to load the data. 
    aioSettings = list(num_attributes=ncol(payload))
    if (is.present(desc@dim_chunk_sizes) && only(desc@dim_chunk_sizes) > 0) {
      aioSettings[['chunk_size']] = only(desc@dim_chunk_sizes)
    }
    
    ## Wrap it in apply() to restore the attribute names, because aio_input()
    ## returns a schema with attributes named (a1, a2, ...)
    query <- sprintf("store(project(apply(aio_input('%s', %s), %s), %s), %s)",
                     only(desc@filename),
                     get_setting_items_str(db, aioSettings),
                     only(aio_apply_args),
                     paste(desc@attr_names, collapse=", "),
                     desc@array_name)
  } else {
    query <- sprintf("store(input(%s, '%s', -2, 'tsv'), %s)",
                     schema,
                     only(desc@filename),
                     desc@array_name)
  }

  content_type <- "text/tab-separated-values"
  
  .UploadWithQuery.httpapi(db, data, query, schema, content_type, desc)
  timings <- c(timings, upload=proc.time()[["elapsed"]])
  
  .ReportTimings(paste0("Timings for dataframe upload to array '", 
                        only(desc@array_name), "':"), 
                 timings)
}

#' Upload an R sparse matrix into SciDB
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload an R sparse matrix
#' @param desc an UploadDesc with metadata about the upload schema
#' @seealso .matrix2scidb.shim()
#' @keywords internal
.UploadSparseMatrix.httpapi <- function(db, payload, desc)
{
  timings <- c(start=proc.time()[["elapsed"]])
  if (!inherits(payload, "dgCMatrix")) stop("payload must be a dgCMatrix")
  if (!is.present(desc)) stop("No upload descriptor provided")
  if (!has.chars(only(desc@array_name))) stop ("No array name provided")
  if (!has.chars(only(desc@filename))) stop("No upload filename provided")

  if (!is.present(desc@dim_start_coordinates)) {
    desc@dim_start_coordinates <- c(0,0)
  }
  desc@dim_start_coordinates <- as.integer(desc@dim_start_coordinates)
  
  type <- .scidbtypes[[typeof(payload@x)]]
  if (is.null(type) || type != "double") {
    stop("Sorry, the package only supports double-precision sparse matrices",
         " right now.")
  }

  desc@attr_types <- desc@attr_types %||% .scidbtypes[[typeof(payload@x)]]
  attrs <- .Attrs.UploadDesc(desc, default_names="val")
  if (length(attrs) > 1) stop("too many attributes")
  
  dims <- .Dims.UploadDesc(desc, default_names=c("i", "j"))
  if (length(dims) != 2) stop("matrix should have 2 dimensions")
  dims[[1]]$end <- nrow(payload)-1 + dims[[1]]$start
  dims[[2]]$end <- ncol(payload)-1 + dims[[2]]$start
  dims[[1]]$chunk <- min(nrow(payload), dims[[1]]$chunk)
  dims[[2]]$chunk <- min(ncol(payload), dims[[2]]$chunk)
  
  schema <- make.schema(db, attrs, dims)
  schema1d <- sprintf("<i:int64, j:int64, val:%s>[idx=0:*:0:100000]", 
                      desc@attr_types[[1]])

  col_binary_format <- sprintf("%s null", desc@attr_types[[1]])
  binary_format_str <- sprintf("(%s)", paste(rep(col_binary_format, 3), 
                                             collapse=", "))

  # Compute the indices and assemble message to SciDB in the form
  # (i:double, j:double, val:double) for indices (i, j) and data (val).
  dp <- diff(payload@p)
  j <- rep(seq_along(dp), dp) - 1
  bytes <- .Call(C_scidb_raw, as.vector(
    t(matrix(c(payload@i + dims[[1]]$start, j + dims[[2]]$start, payload@x),
             length(payload@x)))))
  timings <- c(timings, encode=proc.time()[["elapsed"]])

  query <- sprintf(
    "store(redimension(input(%s, '%s', -2, '%s'), %s), %s)",
    only(schema1d), 
    only(desc@filename), 
    only(binary_format_str),
    only(schema), 
    only(desc@array_name))
  
  content_type <- "application/octet-stream"
  
  .UploadWithQuery.httpapi(db, bytes, query, schema, content_type, desc)
  timings <- c(timings, upload=proc.time()[["elapsed"]])
  
  .ReportTimings(paste0("Timings for matrix upload to array '", 
                        only(desc@array_name), "':"), 
                 timings)
}

#' Upload an R vector to SciDB
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload an R sparse matrix
#' @param desc an UploadDesc with metadata about the upload schema
#' @param max_byte_size the maximum amount of data to upload in a single page.
#'   If the data exceeds this amount, it is uploaded in multiple pages.
#'   Set options(scidb.max_byte_size) to override the default for all
#'   vector uploads. 
#' @seealso .matvec2scidb.shim()
#' @keywords internal
.UploadVector.httpapi <- function(db, payload, desc, 
                                  max_byte_size=NULL)
{
  timings <- c(start=proc.time()[["elapsed"]])

  processed <- .PreprocessArrayType(payload, type=desc@attr_types)
  payload <- processed$array
  load_type <- processed$load_type
  desc@attr_types <- processed$attr_type
  attrs <- .Attrs.UploadDesc(desc, default_names="val")
  if (length(attrs) != 1) stop("expected 1 attribute")

  ## dims_unbounded is [i=0:*:0:min(1000,n)]
  dims_unbounded <- .Dims.UploadDesc(desc,
                                     default_names="i",
                                     default_chunk_sizes=min(1000, 
                                                             length(payload)),
                                     make_unique=TRUE)
  if (length(dims_unbounded) != 1) stop("expected 1 dimension")
  
  ## dims is [i=0:n-1:0:min(1000,n)]
  dims <- dims_unbounded
  dims[[1]]$end <- dims[[1]]$start + length(payload) - 1

  ## schema is <val:type>[i=0:n-1:0:min(1000,n)]
  schema <- make.schema(db, attrs, dims)
  ## schema_unbounded: the same as `schema` but with "*" for the dimension end.
  ## Used for uploading a single page.
  schema_unbounded <- make.schema(db, attrs, dims_unbounded)
  
  page_size <- .get_multipart_post_load_block_size(
    data=payload,
    debug=is.debug(),
    max_byte_size=max_byte_size %||% getOption("scidb.max_byte_size",
                                               500*(10^6)))
  
  ## Upload page by page
  page_number <- 0
  npages <- ceiling(length(payload) / page_size)
  total_nbytes <- 0
  for (page_begin in seq(1, length(payload), page_size)) {
    page_number <- page_number + 1
    page_end <- min((page_begin + page_size - 1), length(payload))
    
    ## Convert data to raw
    page_payload <- as.matrix(payload[page_begin:page_end])
    page_bytes <- .Call(C_scidb_raw, as.vector(aperm(page_payload)))
    total_nbytes <- total_nbytes + length(page_bytes)
    timings[[paste0("encode_page_", page_number)]] <- proc.time()[["elapsed"]]

    msg.trace("Uploading page ", page_number, " of ", npages)
    if (page_number == 1) {
      query <- sprintf("store(input(%s, '%s', -2, '(%s null)'), %s)", 
                       only(schema), 
                       only(desc@filename),
                       only(load_type),
                       only(desc@array_name))
    } else {
      query <- sprintf("append(input(%s, '%s', -2, '(%s null)'), %s, i)", 
                       only(schema_unbounded), 
                       only(desc@filename),
                       only(load_type),
                       only(desc@array_name))
      ## If we created a temp array for the first page, don't create it again!
      desc@temp <- FALSE
    }
    
    content_type <- "application/octet-stream"
    .UploadWithQuery.httpapi(db, page_bytes, query, schema,
                             content_type, desc)
    timings[[paste0("upload_page_", page_number)]] <- proc.time()[["elapsed"]]
  }
  
  .ReportTimings(paste0("Timings for uploading R vector to array '",
                        only(desc@array_name), "' (", 
                        only(page_number), " pages, ",
                        only(total_nbytes), " bytes): "), 
                 timings)
}

#' Upload a dense n-dimensional array or matrix to SciDB
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param payload an R dense n-dimensional array or matrix
#' @param desc an UploadDesc with metadata about the upload schema
#' @param reshape (default: TRUE) by default, the SciDB \code{reshape()} 
#'   operator is called on the uploaded array to make the SciDB array match
#'   the shape of the R array. Set this to FALSE to disable this; the resulting
#'   SciDB array will be flat instead of N-dimensional.
#' @seealso .matvec2scidb.shim
#' @keywords internal
.UploadDenseDimensioned.httpapi <- function(db, payload, desc, 
                                            reshape=TRUE, ...)
{
  if (!is.array(payload)) stop ("payload must be an array or vector")
  timings <- c(start=proc.time()[["elapsed"]])

  processed <- .PreprocessArrayType(payload, type=desc@attr_types)
  payload <- processed$array
  load_type <- processed$load_type
  desc@attr_types <- processed$attr_type
  attrs <- .Attrs.UploadDesc(desc, default_names="val")
  if (length(attrs) != 1) stop("expected 1 attribute")
  
  ## For a matrix, dimension names are (i, j)
  ## For an n-dimensional array (n>2), dim names are (i1, i2, ...)
  dim_lengths <- dim(payload)
  ndims <- length(dim_lengths)
  dim_names <- if (ndims == 2) c("i", "j") else paste0("i", 1:ndims)
  dims <- .Dims.UploadDesc(desc,
                           default_names=dim_names,
                           default_chunk_sizes=floor(10e6 ^ (1/ndims)),
                           lengths=dim_lengths,
                           make_unique=TRUE)
  
  schema <- make.schema(db, attrs, dims)
  load_schema <- sprintf("<%s:%s>[__row=1:%.0f:0:1000000]", 
                         attrs[[1]]$name, 
                         attrs[[1]]$type,
                         length(payload))

  ## Convert data to raw
  bytes <- .Call(C_scidb_raw, as.vector(aperm(payload)))
  timings[["encode"]] <- proc.time()[["elapsed"]]
  
  if (as.logical(reshape)) {
    query <- sprintf("store(reshape(input(%s, '%s', -2, '(%s null)'), %s), %s)", 
                     load_schema,
                     desc@filename, 
                     load_type, 
                     schema, 
                     desc@array_name)
    action <- "upload_and_reshape"
  } else {
    query <- sprintf("store(input(%s, '%s', -2, '(%s null)'), %s)", 
                     load_schema, 
                     desc@filename, 
                     load_type, 
                     desc@array_name)
    action <- "upload"
  }
  
  content_type <- "application/octet-stream"
  .UploadWithQuery.httpapi(db, bytes, query, schema, content_type, desc)
  timings[[action]] <- proc.time()[["elapsed"]]
  
  .ReportTimings(paste0("Timings for uploading dense ", 
                        ndims, "-dimensional array '",
                        desc@array_name, "', ", 
                        nchar(bytes, type="bytes"), " bytes: "), 
                 timings)
}

#' Common function called by all upload methods.
#' Upload bytes or text to SciDB as an attachment to a query.  The
#' query must contain an \code{input()} or \code{aio_input()} operator
#' that reads data from the attached file (whose name matches
#' \code{desc@filename}) and does something with it; usually, the
#' query uses \code{store()} to write the data into a new array.
#' @param db scidb database connection returned from \code{\link{scidbconnect}}
#' @param data the data to upload, as text or raw bytes
#' @param query an AFL query to process the uploaded data (and, usually, to
#'    store it into an array). The query must contain an operator that reads
#'    from a file whose name is equal to \code{desc@filename}.
#' @param schema the schema of the array, only needed if \code{desc@temp}
#'    is TRUE
#' @param content_type the HTTP Content-Type header to use for the attachment
#' @param desc an UploadDesc which must have an \code{@array_name} and
#'    \code{@filename} set
#' @return if the query returns data, this function returns the raw data
#'    from the HTTP response. If the query doesn't return anything (e.g.
#'    if it's a \code{store()} query), this function returns NULL.
#'    If the server returns any status code outside of the 200 successful
#'    range, this function stops with a message that includes the
#'    content of the error page returned by the server.
#' @keywords internal
.UploadWithQuery.httpapi <- function(db, data, query, 
                                     schema, content_type, desc)
{
  if (is.null(data)) stop("No attachment data provided")
  if (!has.chars(only(query))) stop("No query provided")
  if (!is.present(desc)) stop("No upload descriptor provided")
  if (!has.chars(only(desc@array_name))) stop ("No array name provided")
  if (!has.chars(only(desc@filename))) stop("No upload filename provided")

  ## Pre-create the array if the client requested a temp array
  success <- FALSE
  
  if (desc@temp) {
    ## Caller asked for a temp array instead of a SciDB versioned array
    if (!has.chars(schema)) stop("Temp array requested, but no schema provided")
    create_temp_array(db, desc@array_name, schema)
    on.exit(if (!success) {
              Execute(db, sprintf("remove(%s)", desc@array_name))
            },
            add=TRUE)
  }

  ## Write the query JSON
  query_and_options <- list(query=query,
                            attachment=desc@filename)
  query_json <- jsonlite::toJSON(query_and_options, auto_unbox=TRUE)
  
  ## Prepare the attachments: a "query" attachment with the query JSON,
  ## and an "attachment" attachment with the uploaded data
  attachments <- list(
    query=list(data=query_json, content_type="application/json"),
    attachment=list(data=data, content_type=content_type)
  )
  
  ## Execute the POST, which uploads the data and executes the query
  conn <- attr(db, "connection")
  uri <- URI(conn, conn$links$execute)
  resp <- .HttpRequest(conn, "POST", uri, data=NULL, attachments=attachments)
  
  if (resp$status_code != 200 && resp$status_code != 204) {
    stop("[SciDBR] Upload request ", uri, 
         " responded with unexpected status ", resp$status_code,
         ": ", rawToChar(resp$content))
  }

  success <- TRUE  
  if (resp$status_code == 204) {
    ## 204 No Content means the query returned no data
    return(NULL)
  } 
  return(resp$content)
}


#' Start a query running asynchronously in SciDB. Return an env of class
#' "httpquery" to wrap the query while it executes.
#' @param conn the "connection" env attribute of a scidb database connection
#' @param afl the AFL query to execute
#' @param options a named list of extra options to include in the metadata
#'   for the query. See "Query Options" in the
#'   [SciDB HTTP API User Guide](https://paradigm4.atlassian.net/l/cp/4pUzXzzh)
#'   for a full list. Options include:
#'    * \code{format}: requested output format
#'    * \code{pageSize}: requested number of elements per page
#'    * \code{precision}: number of significant floating-point digits to output
#' @return an env of class "httpquery" to manage the query. Call
#'    \code{\link{Next.httpquery}} to get the first page of output.
#'    Call \code{\link{Close.httpquery}} to close the query.
#'    The query is closed automatically when the httpquery object
#'    goes out is garbage-collected.
#' @keywords internal
New.httpquery <- function(conn, afl, options=list())
{
  ## Remove options whose values are NULL
  options <- Filter(Negate(is.null), options)
  
  ## If the query only consists of an identifier (alphanumeric + "_"),
  ## or an identifier, single period, and identifier,
  ## assume it's an array name. Wrap it with scan().
  if (grepl("^[[:alnum:]_]+([.][[:alnum:]_]+)?$", only(trimws(afl)))) {
    afl <- sprintf("scan(%s)", afl)
  }
  
  ## Prepare the JSON for posting
  query_and_options <- append(list(query=afl), options)
  query_json <- jsonlite::toJSON(query_and_options, auto_unbox=TRUE)
  
  ## Execute the POST
  resp <- .HttpRequest(conn, "POST", URI(conn, conn$links$queries),
                       data=query_json, content_type="application/json")
  stopifnot(resp$status_code == 201)  # 201 Created
  
  ## Read the headers and JSON body
  links <- .ParseLinkHeaders(resp$parsed_headers)
  json <- jsonlite::fromJSON(rawToChar(resp$content))
  
  ## Create an env for the result (use an env because it's always passed by
  ## reference, not copy-on-write, and it can be assigned a finalizer)
  query <- new.env()
  class(query) <- "httpquery"
  query$connection <- conn
  query$options <- options
  query$location <- resp$parsed_headers[["location"]]
  query$next_page_number <- 1
  query$next_page_url <- links[["first"]]
  query$id <- json[["queryId"]]
  query$is_selective <- json[["isSelective"]]
  query$schema <- json[["schema"]]
  
  ## When the query gets garbage-collected, call .gc.httpquery(query)
  reg.finalizer(query, .gc.httpquery, onexit=TRUE)
  
  return(query)
}

#' Close a query that was started with \code{\link{New.httpquery}}.
#' This is registered as a finalizer on httpquery objects returned by
#' \code{\link{New.httpquery}}.
#' @param query an env of class "httpquery" returned by
#'    \code{\link{New.httpquery}}.
#' @keywords internal
Close.httpquery <- function(query)
{
  suspendInterrupts({
    if (is.null(query$location) || is.null(query$connection)) {
      msg.trace("Query ", query$id, " already closed")
      return(invisible(NULL));
    }
    
    uri <- URI(query$connection, query$location)
    msg.trace("Closing SciDB query ", query$id, " url=", uri)
    tryCatch(
      {
        resp <- .HttpRequest(query$connection, "DELETE", uri)
        ## We usually expect status 204 (No Content),
        ## but we can get status 200 with a message if the query was
        ## already closed or if it was canceled. In those cases, we can ignore
        ## the message because the query got closed, which is what we wanted.
        if (resp$status_code != 204 && resp$status_code != 200) {
          stop("DELETE ", uri, " gave unexpected status ", resp$status_code,
               ": ", rawToChar(resp$content))
        }
      },
      error=function(err) {
        warning("[SciDBR] Ignored error while closing query ", query$id, 
                ": ", err)
      })
    
    ## Set the location to NULL so we don't double-delete the session.
    ## Because query is an env, this side effect should immediately take effect
    ## outside of the current function call.
    query$location <- NULL
    query$next_page_url <- NULL
    query$connection <- NULL
  })
}

#' Called when an httpquery object is garbage-collected.
#' Differences between this and an explicit call to Close():
#'  1. Suppress warnings
#'  2. Use TraceEnter/Exit to log the call to .gc.httpapi so we can distinguish it
#'     from an explicit Close()
#' @keywords internal
.gc.httpquery <- function(query)
{
  suspendInterrupts({
    trace <- .TraceEnter(".gc.httpquery", query$id)
    on.exit(.TraceExit(trace), add=TRUE)

    suppressWarnings({
      Close.httpquery(query)
    })
  })
}

#' Retrieve the next page of a query that was started with
#' \code{\link{New.httpquery}}. The data in the server's HTTP response
#' is returned in raw form. If the return value is NULL, the query has
#' reached its last page of results and there is no more data to fetch.
#' @param query an env of class "httpquery" returned by
#'    \code{\link{New.httpquery}}.
#' @return the raw data of the HTTP response, or NULL if there is no more
#'    data to fetch from this query.
#' @keywords internal
Next.httpquery <- function(query)
{
  if (is.null(query$next_page_url) || is.null(query$location)) {
    msg.trace("Query ", query$id, " already finished")
    return(NULL)
  }
  if (is.null(query$connection)) {
    stop("Query ", query$id, " has no connection")
  }
  
  ## Get the next page of the query by following the query$next URL
  uri <- URI(query$connection, query$next_page_url)
  msg.trace("Fetching page ", query$next_page_number, 
            " of query ", query$id)
  resp <- .HttpRequest(query$connection, "GET", uri)
  
  if (resp$status_code == 204) {
    ## 204 No Content means the last page has already been fetched
    query$next_page_url <- NULL
    return(NULL)
  } 
  if (resp$status_code != 200) {
    stop("[SciDBR] GET request ", uri, 
         " responded with unexpected status ", resp$status_code,
         ": ", rawToChar(resp$content))
  }
  
  links <- .ParseLinkHeaders(resp$parsed_headers)
  
  ## The response header should contain a link to the next page,
  ## or NULL if there is no next page.
  query$next_page_url <- links[["next"]]
  query$next_page_number <- 1 + query$next_page_number
  
  return(resp$content)
}

#' Close the session.
#' This is registered as a finalizer on attr(db, "connection") so it closes 
#' the session when the connection gets garbage-collected; it can also be called
#' from Close.httpapi().
#' @param conn the connection environment, usually obtained from 
#'    attr(db, "connection")
#' @keywords internal
.CloseSession.httpapi <- function(conn)
{
  suspendInterrupts({
    if (is.null(conn$location)) {
      msg.trace("Session ", conn$session, " already closed")
      return(invisible(NULL));
    }
    
    uri <- URI(conn, conn$location)
    msg.trace("Closing SciDB session ", conn$session, " url=", uri)
    tryCatch(
      {
        resp <- .HttpRequest(conn, "DELETE", uri)
        if (resp$status_code != 204) {  # 204 No Content
          stop("DELETE ", uri, " gave unexpected status ", resp$status_code,
               ": ", rawToChar(resp$content))
        }
      },
      error=function(err) {
        warning("[SciDBR] Ignored error while closing SciDB session ", conn$session, 
                ": ", err)
      })
    
    ## Set the location to NULL so we don't double-delete the session.
    ## Because conn is an env, this side effect should immediately take effect
    ## outside of the current function call.
    conn$location <- NULL
    ## Set the links to NULL to make sure nobody can use the session anymore
    conn$links <- NULL
    ## conn$session does not get cleared, so it can be used for reporting about
    ## the old session
  })
}

#' Called when an httpapi connection is garbage-collected.
#' Differences between this and an explicit call to Close():
#'  1. Suppress warnings
#'  2. Use TraceEnter/Exit to log the call to .gc.httpapi so we can distinguish it
#'     from an explicit Close()
#' @keywords internal
.gc.httpapi <- function(conn)
{
  suspendInterrupts({
    trace <- .TraceEnter(".gc.httpapi", conn$session)
    on.exit(.TraceExit(trace), add=TRUE)

    suppressWarnings({
      .CloseSession.httpapi(conn)
    })
  })
}

#' Set default options on the given curl handle.
#' @param handle a curl handle returned by \code{curl::new_handle()}
#' @param conn the "connection" env attribute of a scidb database connection
#' @param method a string specifying the HTTP method: "GET", "POST", "DELETE"
#' @param data the data that will be sent via the curl handle
#' @keywords internal
.CurlSetOptions <- function(handle, conn, method, data)
{
  options <- switch(method, 
                    GET = list(httpget=TRUE),
                    POST = list(post=TRUE, 
                                postfieldsize=(
                                  if (is.null(data)) 0 else nchar(data)),
                                postfields=data),
                    DELETE = list(customrequest="DELETE"),
                    stop("unsupported HTTP method ", method))
  
  options$http_version <- 2
  options$ssl_verifyhost <- as.integer(getOption("scidb.verifyhost", 0))
  options$ssl_verifypeer <- as.integer(getOption("scidb.verifypeer", 0))
  
  if (!is.null(conn$username) && !is.null(conn$password)) {
    options$username <- conn$username
    options$password <- conn$password
    ## Do not store the password after this connection attempt!
    ## If successful, the server will return an authorization cookie; curl will
    ## automatically save it on the connection handle and reuse it in 
    ## future requests to continue the authenticated session.
    conn$password <- NULL
  }

  ## If there are extra curl_options configured, check if they are valid
  ## and if so, add them
  options_check <- check_curl_options()
  if (identical(TRUE, options_check)) {
    options <- c(
      options,
      getOption("scidb.curl_options")
    )
  }
  
  curl::handle_setopt(handle, .list=options)
}

#' Given a dataframe of cookies from a curl handle (obtained by
#' \code{curl::handle_cookies(h)}), return the cookies in the TSV format
#' that libcurl wants to parse them.
#' @param cookies  a data.frame containing the cookie list from
#'   another curl handle, obtained by \code{curl::handle_cookies(h)}
#' @return a single-element character vector containing the cookies
#'   as a single TSV string
#' @keywords internal
.CookiesToTsv <- function(cookies)
{
  ## cookies[[5]] is the cookie expiration date, which in R is represented as
  ## a numeric with class=POSIXct. curl::handle_setopt(, cookielist=)
  ## expects to parse it as a stringified integer (Unix epoch time),
  ## not as a date string - so unclass it to make sure it gets printed as a
  ## numeric. See https://curl.se/libcurl/c/CURLOPT_COOKIELIST.html .
  cookies[[5]] <- unclass(cookies[[5]])

  cookie_list <- capture.output(
    write.table(cookies, sep = "\t",
                row.names = FALSE, col.names = FALSE,
                quote=FALSE))
  cookie_string <- paste0(cookie_list, collapse="\n")
  return(cookie_string)
}

#' Add cookies to the curl handle.
#' @param handle a curl handle returned by \code{curl::new_handle()}
#' @param cookies a data.frame containing the cookie list from
#'   another curl handle, obtained with \code{curl::handle_cookies(h)}
#' @keywords internal
.CurlUseCookies <- function(handle, cookies)
{
  if (length(cookies) == 0) {
    return();
  }
  ## Copy cookies to the handle.
  ## If we were reusing the same handle for all requests, we wouldn't need
  ## to do this because curl would track the cookies for us. But we use a 
  ## new handle for each request to allow concurrent HTTP requests to happen
  ## during garbage collection, so we need to copy the cookies ourselves.
  ##
  ## In a previous implementation of this function, we copied the cookies
  ## from the last response's Set-Cookie header instead of from the handle
  ## itself. That approach doesn't work with libcurl 7.61 and above because
  ## the newer libcurl won't send a cookie to a domain it didn't originate from,
  ## and copying from the Set-Cookie header didn't give libcurl the info it
  ## needed about of the cookie origin, so it ended up dropping the cookie.

  ## Turn the `cookies` data frame into a TSV string for parsing by libcurl;
  ## see https://curl.se/libcurl/c/CURLOPT_COOKIELIST.html
  cookielist_tsv <- .CookiesToTsv(cookies)
  curl::handle_setopt(handle, cookielist=cookielist_tsv)
}

#' Set the given headers, as well as default headers, on the curl handle.
#' @param handle a curl handle returned by \code{curl::new_handle()}
#' @param headers a named list of headers to add
#' @param content_type overrides \code{headers[['Content-Type']]}
#' @keywords internal
.CurlSetHeaders <- function(handle, headers, content_type)
{
  if (has.chars(content_type)) {
    headers[['Content-Type']] <- content_type
  }
  if (length(headers) > 0) {
    curl::handle_setheaders(handle, .list=headers)
  }
}

#' Add attachments to a curl handle using the multipart/form-data MIME type.
#' @param handle a curl handle returned by \code{curl::new_handle()}
#' @param method a string specifying the HTTP method (which must be "POST")
#' @param attachments a named list of attachments, where the name is the
#'   attachment's fileame. Each attachment is a named list where \code{$data}
#'   is the data being attached, and \code{$content_type} is the Content-Type
#'   to use for the attachment.o
#' @keywords internal
.CurlAddAttachments <- function(handle, method, attachments)
{
  if (length(attachments) == 0) {
    return()
  }
  if (method != "POST") { 
    stop("attachment not allowed for method ", method) 
  }

  setform_args <- list(handle)
  for (iatt in seq_along(attachments)) {
    name <- names(attachments)[[iatt]]
    att <- attachments[[iatt]]
    
    if (is.null(name) || !nzchar(only(name))) { 
      stop("no name given for attachment ", iatt) 
    }
    if (is.null(att$data)) { 
      stop("no data given for attachment ", name)
    }
    if (is.null(att$content_type)) { 
      stop("no content_type given for attachment ", name) 
    }

    setform_args[[name]] <- curl::form_data(att$data, type=att$content_type)
  }
  do.call(curl::handle_setform, setform_args)
}

#' Save cookies from an HTTP response on a connection env object, to use
#' in a later request.
#' @param h a curl handle
#' @param conn the "connection" env attribute of a scidb database connection.
#'   The cookies are stored in this env's \code{$cookies} field.
#' @keywords internal
.SaveCookies <- function(h, conn)
{
  ## Store the cookies on the connection (but if there are no new cookies,
  ## take care not to erase the old ones!)
  if (length(curl::handle_cookies(h)) > 0) {
    conn$cookies <- curl::handle_cookies(h)
  }
}

#' Handle an HTTP response that returned an error code.
#' In most cases, the response will contain a JSON document with metadata
#' about the error. The JSON is turned into a string and added to the message.
#' This method stops execution and does not return.
#' @param method a string specifying the HTTP method ("GET", "POST", "DELETE")
#' @param uri the URI/URL that was requested which produced the error
#' @param resp the response object from \code{curl::curl_fetch_memory()}
#' @keywords internal
.HandleHttpError <- function(method, uri, resp)
{
  error_msg <- rawToChar(resp$content)
  if (startsWith(error_msg, "{")) {
    ## Treat it as a JSON error message
    json <- jsonlite::fromJSON(error_msg)
    ## Add potentially useful client-side information to the error JSON
    json[["http_method"]] <- method
    json[["http_uri"]] <- uri
    json[["http_status"]] <- resp$status_code
    ## Unparse the JSON back into a string and throw it as an exception
    stop(jsonlite::toJSON(json, auto_unbox=TRUE))
  } else {
    stop(sprintf("HTTP error %s\n%s", resp$status_code, error_msg))
  }
}

#' Issue an HTTP request.
#' This function allows multiple threads to issue requests concurrently,
#' so the GC thread can issue DELETE requests while the main thread
#' is in the middle of another request.
#' @param db_or_conn scidb database connection returned from
#'   \code{\link{scidbconnect}}, or its "connection" env attribute
#' @param method a string specifying the HTTP method ("GET", "POST", "DELETE")
#' @param uri the URI/URL to request
#' @param content_type the MIME type of any data sent to the server (for POST)
#' @param headers a named list of headers to include in the request
#' @param data data to include in the request (for POST)
#' @param attachments attachments to include in the request (for POST).
#'   This sets the Content-Type to "multipart/form-data" and each attachment
#'   is included in a separate multipart section.
#'   Each attachment is a named list where \code{$data} is the data being
#'   attached, and \code{$content_type} is the Content-Type
#'   to use for the attachment.
#' @return If the server returns a response with a status code in the
#'   "success" range (200-299), returns the result of the call to
#'   \code{curl::curl_fetch_memory}. If not, stops execution with an
#'   error message.
#' @keywords internal
.HttpRequest <- function(db_or_conn, method, uri, content_type=NULL,
                         headers=list(), data=NULL, attachments=NULL)
{
  conn <- .GetConnectionEnv(db_or_conn)
  if (is.null(conn)) stop("No connection environment")

  ## data must be NULL or a 1-element vector containing a character element
  stopifnot(length(data) <= 1)
  
  if (!inherits(uri, "URI")) {
    stop("String '", uri, "' is not of class URI;",
         " please use the URI() function to construct it.")
  }
  uri <- .SanitizeUri(uri)

  ## Create a new connection handle and populate it for this request.
  ## We create a new handle for each request so that multiple requests
  ## can execute concurrently without interfering with each other.
  h <- curl::new_handle()
  .CurlSetOptions(h, conn, method, data)
  .CurlUseCookies(h, conn$cookies)
  .CurlSetHeaders(h, headers, content_type)
  .CurlAddAttachments(h, method, attachments)
  
  .TraceHttpRequestSent(h, method, uri, headers, data, attachments)

  ## Make the HTTP request
  resp <- curl::curl_fetch_memory(uri, h)
  if (is.present(resp$headers)) {
    resp$parsed_headers <- curl::parse_headers_list(resp$headers)
  }
  
  ## If we received a authorization cookies, store them on the connection
  .SaveCookies(h, conn)
  
  .TraceHttpResponseReceived(h, method, uri, resp)
  
  if (resp$status_code == 0 && conn$protocol == "http") {
    ## Attempted to access an https port using http
    ## If this "stop" message is changed, also change the code that handles it 
    ## in .Handshake() in utility.R
    stop("https required")
  }
  if (resp$status_code <= 199 || resp$status_code > 299) {
    ## The server responded with an error or unexpected status code
    .HandleHttpError(method, uri, resp)
  }
  
  return(resp)
}

#' Given a list of HTTP response headers parsed by curl::parse_headers_list,
#' return the URLs of all "Link" headers indexed by link name.
#' For example, given these Link headers:
#'   <http://www.paradigm4.com/index.html>; rel="P4 Home"
#'   <http://www.github.com/Paradigm4>; rel="P4 GitHub Space"
#' The function would return a list like:
#'   list(
#'      "P4 Home" = "http://www.paradigm4.com/index.html",
#'      "P4 GitHub Space" = "http://www.github.com/Paradigm4"
#'   )
#' @param headers_list the result of curl::parse_headers_list() on some
#'   HTTP response from a server
#' @return a named list where each name is the link name from "rel"
#'   and the value is the URL of the link.
#' @keywords internal
.ParseLinkHeaders <- function(headers_list)
{
  ## Find all "Link" headers named
  link_headers <- headers_list[names(headers_list) == "link"]
  
  ## Each Link header looks like: '<URL>; rel="link_name"'
  ## Match the URL and the link name.
  m <- regexec("<([^>]*)>; rel=\"([^\"]+)\".*", link_headers, perl=TRUE) 
  matches <- regmatches(link_headers, m)
  
  ## First matched group (match number 2) of every match is the URL
  result <- lapply(matches, function(x) x[[2]])
  ## Second group (match number 3) is the link name; make it the element name
  names(result) <- lapply(matches, function(x) x[[3]])
  
  ## Voila, now we have an array with named indices where result[[name]] 
  ## is the link URL.
  return(result)
}

#' Given a relative path to an endpoint within httpapi,
#' return the full URL/URI for the endpoint, including server DNS and port.
#' @param db_or_conn scidb database connection object,
#'     _or_ its "connection" attribute
#' @param path the path to the endpoint, 
#'     relative to "http[s]://host/api/v{version}/". 
#'     It should not begin with a slash.
#' @param args optional key=value args to add to the URL.
#' @return the fully qualified URI/URL for \code{path} on the given connection
#' @keywords internal
.EndpointUri <- function(db_or_conn, path, args=list())
{
  conn <- .GetConnectionEnv(db_or_conn)
  if (substr(path, 1, 1) == "/") {
    stop("relative path expected, but got ", path)
  }
  
  api_version <- conn$httpapi_version
  if (is.null(api_version)) {
    return(URI(db_or_conn, sprintf("/api/%s", path), args))
  } else {
    # We know what httpapi version to use, so add v{version} to the path
    path <- sprintf("/api/v%d/%s",
                    conn$httpapi_version,
                    path)
    return(URI(db_or_conn, path, args))
  }
}

#' @noRd
.SanitizeUri <- function(uri)
{
  uri <- oldURLencode(uri)
  uri <- gsub("\\+", "%2B", uri, perl=TRUE)
  return(uri)
}

#' Given a list of actions and when they happened, e.g. 
#' (start=time1, firstAction=time2, secondAction=time3, ...),
#' this function will output a message indicating that
#' firstAction took (time2-time1) seconds, secondAction took
#' (time3-time2) seconds, etc.
#' @param msg a message to print before the timings
#' @param timings a list of the form (start=time1, firstAction=time2, ...)
#'   where all timings were obtained from proc.time()[["elapsed"]].
.ReportTimings <- function(msg, timings)
{
  if (!is.debug()) {
    return()
  }

  for (ii in seq(2, length(timings))) {
    if (ii > 2) { 
      msg <- paste0(msg, ",") 
    }
    msg <- paste(msg, names(timings)[[ii]], 
                 timings[[ii]] - timings[[ii-1]])
  }
  msg.debug(msg)
}
Paradigm4/SciDBR documentation built on Nov. 9, 2023, 4:58 a.m.