R/AzureDataLake.R

# ADLS Metadata APIs ----

#' List the statuses of the files/directories in the given path.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake Store account.
#' @param relativePath Relative path of a file/directory.
#' @param verbose Print tracing information (default FALSE).
#' @return Returns a FileStatuses data frame with a row for each directory element. 
#'         Each row has 11 columns (12 in case of a file):
#'         FileStatuses.FileStatus.(accessTime, modificationTime, replication, permission, owner, group, aclBit, msExpirationtime (in case of file))
#' @details Exceptions - FileNotFoundException, IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#list-folders}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#List_a_Directory}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#List_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatus-org.apache.hadoop.fs.Path-}
adls.ls <- function(azureActiveContext, azureDataLakeAccount, relativePath, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=LISTSTATUS",
    getAzureDataLakeApiVersion()
    )
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL,
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
  resJsonObj <- jsonlite::fromJSON(resJsonStr)
  if (length(resJsonObj$FileStatuses$FileStatus) == 0) {
    # Return empty data frame in case of an empty directory
    #
    # LISTSTATUS on a ROOT (api-version=2018-05-01):
    # {\"FileStatuses\":
    # {\"FileStatus\":[
    # {\"length\":0,\"pathSuffix\":\"tempfolder1?1文件夹1\",\"type\":\"DIRECTORY\",\"blockSize\":0,\"accessTime\":1529649111352,\"modificationTime\":1529649111352,\"replication\":0,\"permission\":\"770\",\"owner\":\"c1717280-f74e-4e59-9efc-12432daa51e9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"aclBit\":false}
    # ,
    # {\"length\":0,\"pathSuffix\":\"testfolder\",\"type\":\"DIRECTORY\",\"blockSize\":0,\"accessTime\":1489143030207,\"modificationTime\":1489143030207,\"replication\":0,\"permission\":\"770\",\"owner\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"aclBit\":true}
    # ]}}
    #
    # LISTSTATUS on a FOLDER (api-version=2018-05-01):
    # {\"FileStatuses\":
    # {\"FileStatus\":[
    # {\"length\":4,\"pathSuffix\":\"tempfile01.txt\",\"type\":\"FILE\",\"blockSize\":268435456,\"accessTime\":1529649112231,\"modificationTime\":1529649112363,\"replication\":1,\"permission\":\"755\",\"owner\":\"c1717280-f74e-4e59-9efc-12432daa51e9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"msExpirationTime\":0,\"aclBit\":false}
    # ,
    # {\"length\":4,\"pathSuffix\":\"tempfile02.txt\",\"type\":\"FILE\",\"blockSize\":268435456,\"accessTime\":1529649113270,\"modificationTime\":1529649113348,\"replication\":1,\"permission\":\"755\",\"owner\":\"c1717280-f74e-4e59-9efc-12432daa51e9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"msExpirationTime\":0,\"aclBit\":false}
    # ]}}
    #
    # LISTSTATUS on a FILE (api-version=2018-05-01):
    # {\"FileStatuses\":
    # {\"FileStatus\":[
    # {\"length\":4,\"pathSuffix\":\"\",\"type\":\"FILE\",\"blockSize\":268435456,\"accessTime\":1529649112231,\"modificationTime\":1529649112363,\"replication\":1,\"permission\":\"755\",\"owner\":\"c1717280-f74e-4e59-9efc-12432daa51e9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"msExpirationTime\":0,\"aclBit\":false}
    # ]}}
    return(
      data.frame(
        FileStatuses.FileStatus.length = character(0),
        FileStatuses.FileStatus.pathSuffix = character(0),
        FileStatuses.FileStatus.type = character(0),
        FileStatuses.FileStatus.blockSize = character(0),
        FileStatuses.FileStatus.accessTime = character(0),
        FileStatuses.FileStatus.modificationTime = character(0),
        FileStatuses.FileStatus.replication = character(0),
        FileStatuses.FileStatus.permission = character(0),
        FileStatuses.FileStatus.owner = character(0),
        FileStatuses.FileStatus.group = character(0),
        # msExpirationTime is present only for "type":"FILE"
        #FileStatuses.FileStatus.msExpirationTime = character(0),
        FileStatuses.FileStatus.aclBit = character(0)
      )
    )
  }
  resDf <- as.data.frame(resJsonObj)
  return(resDf)
}

#' Get the status of a file/directory in the given path.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param verbose Print tracing information (default FALSE).
#' @return Returns a FileStatus data frame with one row for directory element. 
#'         The row has 11 columns (12 in case of a [file]):
#'         FileStatuses.FileStatus.(accessTime, modificationTime, replication, permission, owner, group, aclBit, msExpirationtime (in case of file))
#' @details Exceptions - FileNotFoundException, IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Status_of_a_File.2FDirectory}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#getFileStatus-org.apache.hadoop.fs.Path-}
adls.file.info <- function(azureActiveContext, azureDataLakeAccount, relativePath, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=GETFILESTATUS",
    getAzureDataLakeApiVersion()
  )
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL,
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
  resJsonObj <- jsonlite::fromJSON(resJsonStr)
  if (length(resJsonObj$FileStatus) == 0) {
    # Return empty data frame in case of an empty directory
    #
    # GETFILESTATUS on a FOLDER (api-version=2018-05-01):
    # {\"FileStatus\":
    # {\"length\":0,\"pathSuffix\":\"\",\"type\":\"DIRECTORY\",\"blockSize\":0,\"accessTime\":1489142850467,\"modificationTime\":1529652246462,\"replication\":0,\"permission\":\"770\",\"owner\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"aclBit\":true}
    # }
    #
    # GETFILESTATUS on a FILE (api-version=2018-05-01):
    # {\"FileStatus\":
    # {\"length\":4,\"pathSuffix\":\"\",\"type\":\"FILE\",\"blockSize\":268435456,\"accessTime\":1529652247451,\"modificationTime\":1529652247542,\"replication\":1,\"permission\":\"755\",\"owner\":\"c1717280-f74e-4e59-9efc-12432daa51e9\",\"group\":\"b7c7db8c-3117-43cc-870d-946f3add73c9\",\"msExpirationTime\":0,\"aclBit\":false}
    # }
    return(
      data.frame(
        FileStatuses.FileStatus.length = character(0),
        FileStatuses.FileStatus.pathSuffix = character(0),
        FileStatuses.FileStatus.type = character(0),
        FileStatuses.FileStatus.blockSize = character(0),
        FileStatuses.FileStatus.accessTime = character(0),
        FileStatuses.FileStatus.modificationTime = character(0),
        FileStatuses.FileStatus.replication = character(0),
        FileStatuses.FileStatus.permission = character(0),
        FileStatuses.FileStatus.owner = character(0),
        FileStatuses.FileStatus.group = character(0),
        # msExpirationTime is present only for "type":"FILE"
        #FileStatuses.FileStatus.msExpirationTime = character(0),
        FileStatuses.FileStatus.aclBit = character(0)
      )
    )
  }
  resDf <- as.data.frame(resJsonObj)
  return(resDf)
}

#' Create a directory with the provided permission.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param permission Permission to be set for the directory (default is 755).
#' @param verbose Print tracing information (default FALSE).
#' @return Returns true if the directory creation succeeds; false otherwise.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Make_a_Directory}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Permission}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#mkdirs-org.apache.hadoop.fs.FileSystem-org.apache.hadoop.fs.Path-org.apache.hadoop.fs.permission.FsPermission-}
adls.mkdir <- function(azureActiveContext, azureDataLakeAccount, relativePath, permission, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  if (!missing(permission) && !is.null(permission)) assert_that(is_permission(permission))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=MKDIRS",
    getAzureDataLakeApiVersion()
  )
  if (!missing(permission) && !is.null(permission)) URL <- paste0(URL, "&permission=", permission)
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL, verb = "PUT",
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
  resJsonObj <- jsonlite::fromJSON(resJsonStr)
  resDf <- as.data.frame(resJsonObj)
  return(resDf$boolean)
}

#' Create and write to a file.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file.
#' @param permission Permission to be set for file (default 644).
#' @param overwrite Whether to overwrite existing files with same name (default FALSE).
#' @param bufferSize Size of the buffer to be used.
#' @param replication Required block replication for a file.
#' @param blockSize Block size of the file.
#' @param contents raw contents to be written to the newly created file (default raw(0)).
#' @param verbose Print tracing information (default FALSE).
#' @return NULL (void)
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Create_and_Write_to_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Overwrite}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Block_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Replication}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Permission}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#create-org.apache.hadoop.fs.Path-org.apache.hadoop.fs.permission.FsPermission-boolean-int-short-long-org.apache.hadoop.util.Progressable-}
adls.create <- function(azureActiveContext, azureDataLakeAccount, relativePath,
                                permission, overwrite = FALSE,
                                bufferSize, replication, blockSize,
                                contents = raw(0), verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  if (!missing(permission) && !is.null(permission)) assert_that(is_permission(permission))
  if (!missing(bufferSize) && !is.null(bufferSize)) assert_that(is_bufferSize(bufferSize))
  if (!missing(replication) && !is.null(replication)) assert_that(is_replication(replication))
  if (!missing(blockSize) && !is.null(blockSize)) assert_that(is_blockSize(blockSize))
  if (!missing(contents) && !is.null(contents)) assert_that(is_content(contents))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=CREATE", "&write=true",
    getAzureDataLakeApiVersion()
  )
  if (!missing(permission) && !is.null(permission)) URL <- paste0(URL, "&permission=", permission)
  if (!missing(overwrite) && !is.null(overwrite)) URL <- paste0(URL, "&overwrite=", overwrite)
  if (!missing(bufferSize) && !is.null(bufferSize)) URL <- paste0(URL, "&buffersize=", bufferSize)
  if (!missing(replication) && !is.null(replication)) URL <- paste0(URL, "&replication=", replication)
  if (!missing(blockSize) && !is.null(blockSize)) URL <- paste0(URL, "&blocksize=", blockSize)
  retryPolicy <- NULL
  if(overwrite) {
    retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  } else {
    retryPolicy <- createAdlRetryPolicy(azureActiveContext, retryPolicyEnum$NONIDEMPOTENT, verbose = verbose)
  }
  resHttp <- callAzureDataLakeApi(URL, verb = "PUT",
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  content = contents,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  # return a NULL (void)
  return(NULL)
}

#' Delete a file/directory.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param recursive If path is a directory, recursively delete contents and directory (default FALSE).
#' @param verbose Print tracing information (default FALSE).
#' @return true if delete is successful else false.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#delete-a-file}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Recursive}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#delete-org.apache.hadoop.fs.Path-boolean-}
adls.delete <- function(azureActiveContext, azureDataLakeAccount, relativePath, recursive = FALSE, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=DELETE",
    getAzureDataLakeApiVersion()
  )
  if (!missing(recursive)  && !is.null(recursive)) URL <- paste0(URL, "&recursive=", recursive)
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL, verb = "DELETE",
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
  resJsonObj <- jsonlite::fromJSON(resJsonStr)
  resDf <- as.data.frame(resJsonObj)
  return(resDf$boolean)
}

#' Rename a file/folder.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of source file/folder.
#' @param destinationRelativePath Relative path of destination file/folder.
#' @param overwrite Whether to overwrite existing files with same name (default FALSE).
#' @param verbose Print tracing information (default FALSE).
#' @return TRUE if successful, FALSE otherwise
#' @details Exceptions - FileNotFoundException, FileAlreadyExistsException, ParentNotDirectoryException, IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#rename-a-file}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Rename_a_FileDirectory}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Destination}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#rename(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Options.Rename...)}
adls.rename <- function(azureActiveContext, azureDataLakeAccount, relativePath, destinationRelativePath, overwrite = FALSE, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  assert_that(is_destinationRelativePath(destinationRelativePath))
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=RENAME",
    getAzureDataLakeApiVersion()
  )
  URL <- paste0(URL, "&destination=", getAzureDataLakeURLEncodedString(destinationRelativePath))
  if (overwrite) URL <- paste0(URL, "&renameoptions=", "OVERWRITE")
  # TODO: Shouldn't we use NONIDEMPOTENTRETRYPOLICY like CREATE API?
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL, verb = "PUT",
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  resJsonStr <- content(resHttp, "text", encoding = "UTF-8")
  resJsonObj <- jsonlite::fromJSON(resJsonStr)
  resDf <- as.data.frame(resJsonObj)
  return(resDf$boolean)
}

#' Concat existing files together. (MSCONCAT).
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a destination file.
#' @param sourceRelativePaths Simple vector of relative paths of source files to be concatenated.
#' @param verbose Print tracing information (default FALSE).
#' @return NULL (void)
#' @details Exceptions - IOException, UnsupportedOperationException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Concat_Files}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Sources}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#concat\(org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.Path[]\)}
adls.concat <- function(azureActiveContext, azureDataLakeAccount, relativePath, sourceRelativePaths, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  assert_that(is_sourceRelativePaths(sourceRelativePaths))
  # verify and url encode each of the source paths
  i <- 1
  for(sourceRelativePath in sourceRelativePaths) {
    assert_that(is_sourceRelativePath(sourceRelativePath))
    if(substr(sourceRelativePath, 1, 1) != "/") sourceRelativePath <- paste0("/", sourceRelativePath)
    # DONT encode the sourceRelativePath, it will go as is in the body
    sourceRelativePaths[i] <- sourceRelativePath
    i <- (i + 1)
  }
  # json format the source paths
  sourceRelativePaths <- paste0("{\"sources\":", jsonlite::toJSON(sourceRelativePaths), "}")
  URL <- paste0(
    getAzureDataLakeBasePath(azureDataLakeAccount),
    getAzureDataLakeURLEncodedString(relativePath),
    "?op=MSCONCAT", # use MSCONCAT instead of CONCAT
    getAzureDataLakeApiVersionForConcat()
  )
  retryPolicy <- createAdlRetryPolicy(azureActiveContext, verbose = verbose)
  resHttp <- callAzureDataLakeApi(URL, verb = "POST",
                                  azureActiveContext = azureActiveContext,
                                  adlRetryPolicy = retryPolicy,
                                  content = charToRaw(sourceRelativePaths),
                                  # MUST specify content type for latest implementation of MSCONCAT
                                  contenttype = "application/json",
                                  verbose = verbose)
  stopWithAzureError(resHttp)
  # return a NULL (void)
  return(NULL)
}

# ADLS Ingress APIs ----

#' Append to an existing file.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file.
#' @param bufferSize Size of the buffer to be used.
#' @param contents raw contents to be written to the file.
#' @param contentSize size of `contents` to be written to the file.
#' @param verbose Print tracing information (default FALSE).
#' @return NULL (void)
#' @details Exceptions - IOException
#' 
#' @family Azure Data Lake Store functions
#' @export
#' 
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-}
adls.append.direct <- function(azureActiveContext, azureDataLakeAccount, relativePath, 
                                bufferSize, contents, contentSize = -1L, verbose = FALSE) {
  resHttp <- adls.append.core(azureActiveContext, NULL, azureDataLakeAccount, relativePath,
                                     bufferSize, contents, contentSize, verbose = verbose)
  stopWithAzureError(resHttp)
  # retrun a NULL (void)
  return(NULL)
}

#' Append to an existing file.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file.
#' @param verbose Print tracing information (default FALSE).
#' @return adlFileOutputStream object.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-}
adls.append <- function(azureActiveContext, azureDataLakeAccount, relativePath, verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  adlFOS <- adls.fileoutputstream.create(azureActiveContext, azureDataLakeAccount, relativePath, verbose)
  return(adlFOS)
}

# ADLS Egress APIs ----

#' Open and read a file.
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param offset Provide the offset to read from.
#' @param length Provide length of data to read.
#' @param bufferSize Size of the buffer to be used. (not honoured).
#' @param verbose Print tracing information (default FALSE).
#' @return raw contents of the file.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-}
adls.read.direct <- function(azureActiveContext, 
                              azureDataLakeAccount, relativePath, 
                              offset, length, bufferSize, 
                              verbose = FALSE) {
  resHttp <- adls.read.core(azureActiveContext, 
                                   azureDataLakeAccount, relativePath, 
                                   offset, length, bufferSize, 
                                   verbose)
  stopWithAzureError(resHttp)
  resRaw <- content(resHttp, "raw", encoding = "UTF-8")
  return(resRaw)
}

#' Open a file and return an adlFileInputStream
#'
#' @inheritParams setAzureContext
#' @param azureDataLakeAccount Name of the Azure Data Lake account.
#' @param relativePath Relative path of a file/directory.
#' @param bufferSize Size of the buffer to be used. (not honoured).
#' @param verbose Print tracing information (default FALSE).
#' @return an object of adlFileInputStream.
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
#'
#' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length}
#' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
#' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-}
adls.read <- function(azureActiveContext, azureDataLakeAccount, 
                                 relativePath, bufferSize, 
                                 verbose = FALSE) {
  if (!missing(azureActiveContext) && !is.null(azureActiveContext)) {
    assert_that(is.azureActiveContext(azureActiveContext))
    azureCheckToken(azureActiveContext)
  }
  assert_that(is_adls_account(azureDataLakeAccount))
  assert_that(is_relativePath(relativePath))
  if (!missing(bufferSize) && !is.null(bufferSize)) assert_that(is_bufferSize(bufferSize))
  adlFIS <- adls.fileinputstream.create(azureActiveContext, azureDataLakeAccount, relativePath, verbose)

  return(adlFIS)
}

# ADLS Ingress - AdlFileOutputStream ----

#' Write to an adlFileOutputStream.
#'
#' @param adlFileOutputStream adlFileOutputStream of the file
#' @param contents contents to write to the file
#' @param off the start offset in the data (must be > 1)
#' @param len the number of bytes to write
#' @param verbose Print tracing information (default FALSE)
#' @return NULL (void)
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileoutputstream.write <- function(adlFileOutputStream, contents, off, len, 
                                     verbose = FALSE) {
  if (!missing(adlFileOutputStream) && !is.null(adlFileOutputStream)) {
    assert_that(is.adlFileOutputStream(adlFileOutputStream))
    adlFileOutputStreamCheck(adlFileOutputStream)
  }
  if (!is.null(adlFileOutputStream$azureActiveContext)) {
    assert_that(is.azureActiveContext(adlFileOutputStream$azureActiveContext))
    azureCheckToken(adlFileOutputStream$azureActiveContext)
  }
  assert_that(is_content(contents))
  contentlen <- getContentSize(contents)
  if (off < 1 || off > contentlen || len < 0 
      || (off + len) > contentlen + 1
      || (off + len) < 1) {
    stop("IndexOutOfBoundsException: specify valid offset and length")
  }
  if (off > contentlen + 1 || len > (contentlen + 1 - off)) {
    stop("IllegalArgumentException: array offset and length are > array size")
  }
  if (len == 0) {
    return(NULL)
  }

  # TODO: need to implement a buffer manager ?
  adlFileOutputStream$buffer <- raw(adlFileOutputStream$blockSize)

  # if len > 4MB, then we force-break the write into 4MB chunks
  while (len > adlFileOutputStream$blockSize) {
    adls.fileoutputstream.flush(adlFileOutputStream, syncFlagEnum$DATA, verbose) # flush first, because we want to preserve record boundary of last append
    adls.fileoutputstream.addtobuffer(adlFileOutputStream, contents, off, adlFileOutputStream$blockSize)
    off <- off + adlFileOutputStream$blockSize
    len <- len - adlFileOutputStream$blockSize
  }
  # now len == the remaining length

  # if adding this to buffer would overflow buffer, then flush buffer first
  if (len > getContentSize(adlFileOutputStream$buffer) - (adlFileOutputStream$cursor - 1)) {
    adls.fileoutputstream.flush(adlFileOutputStream, syncFlagEnum$DATA, verbose)
  }
  # now we know b will fit in remaining buffer, so just add it in
  adls.fileoutputstream.addtobuffer(adlFileOutputStream, contents, off, len)

  return(NULL)
}

#' Flush an adlFileOutputStream.
#'
#' @param adlFileOutputStream adlFileOutputStream of the file
#' @param syncFlag type of sync (DATA, METADATA, CLOSE)
#' @param verbose Print tracing information (default FALSE)
#' @return NULL (void)
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileoutputstream.flush <- function(adlFileOutputStream, syncFlag = syncFlagEnum$METADATA,
                                     verbose = FALSE) {
  # Ignoring this, because HBase actually calls flush after close() <sigh>
  if (adlFileOutputStream$streamClosed) {
    return(NULL)
  }
  # nothing to flush
  if (adlFileOutputStream$cursor == 1 
      && (syncFlag == syncFlagEnum$DATA || syncFlag == syncFlagEnum$PIPELINE)) {
    return(NULL)
  }
  # do not send a flush if the last flush updated metadata and there is no data
  if ((adlFileOutputStream$cursor == 1)
      && adlFileOutputStream$lastFlushUpdateMetadata
      && (syncFlag == syncFlagEnum$METADATA)) {
    return(NULL)
  }
  resHttp <- adls.append.core(adlFileOutputStream$azureActiveContext, adlFileOutputStream,
                             adlFileOutputStream$accountName, adlFileOutputStream$relativePath, 4194304L, 
                             adlFileOutputStream$buffer, as.integer(adlFileOutputStream$cursor - 1), 
                             adlFileOutputStream$leaseId, adlFileOutputStream$leaseId, syncFlag, 
                             adlFileOutputStream$remoteCursor,
                             verbose)
  if(!isSuccessfulResponse(resHttp)) {
    errMessage <- content(resHttp, "text", encoding = "UTF-8")
    if (adlFileOutputStream$numRetries > 0 && status_code(resHttp) == 400
        && grepl("BadOffsetException", errMessage, ignore.case = TRUE)) {
      # if this was a retry and we get bad offset, then this might be because we got a transient
      # failure on first try, but request succeeded on back-end. In that case, the retry would fail
      # with bad offset. To detect that, we check if there was a retry done, and if the current error we
      # have is bad offset.
      # If so, do a zero-length append at the current expected Offset, and if that succeeds,
      # then the file length must be good - swallow the error. If this append fails, then the last append
      # did not succeed and we have some other offset on server - bubble up the error.
      expectedRemoteLength <- (adlFileOutputStream$remoteCursor + adlFileOutputStream$cursor)
      append0Succeeded <- adls.fileoutputstream.dozerolengthappend(adlFileOutputStream, 
                                             adlFileOutputStream$accountName, 
                                             adlFileOutputStream$relativePath, 
                                             expectedRemoteLength)
      if (append0Succeeded) {
        printADLSMessage("AzureDataLake.R", "adls.fileoutputstream.flush", 
                         paste0("zero-length append succeeded at expected offset (", expectedRemoteLength, "), ",
                                " ignoring BadOffsetException for session: ", adlFileOutputStream$leaseId,
                                ", file: ", adlFileOutputStream$relativePath))
        adlFileOutputStream$remoteCursor <- (adlFileOutputStream$remoteCursor + adlFileOutputStream$cursor)
        adlFileOutputStream$cursor <- 0
        adlFileOutputStream$lastFlushUpdatedMetadata <- FALSE
        return(NULL)
      } else {
        printADLSMessage("AzureDataLake.R", "adls.fileoutputstream.flush", 
                         paste0("Append failed at expected offset(", expectedRemoteLength,
                                "). Bubbling exception up for session: ", adlFileOutputStream$leaseId,
                                ", file: ", adlFileOutputStream$relativePath))
      }
    }
  }
  stopWithAzureError(resHttp)
  adlFileOutputStream$remoteCursor <- (adlFileOutputStream$remoteCursor + (adlFileOutputStream$cursor - 1))
  adlFileOutputStream$cursor <- 1L
  adlFileOutputStream$lastFlushUpdatedMetadata <- (syncFlag != syncFlagEnum$DATA)
  return(NULL)
}

#' Close an adlFileOutputStream.
#'
#' @param adlFileOutputStream adlFileOutputStream of the file
#' @param verbose Print tracing information (default FALSE)
#' @return NULL (void)
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileoutputstream.close <- function(adlFileOutputStream,
                                     verbose = FALSE) {
  if(adlFileOutputStream$streamClosed) return(NULL) # Return silently upon multiple closes
  adls.fileoutputstream.flush(adlFileOutputStream, syncFlagEnum$CLOSE, verbose)
  adlFileOutputStream$streamClosed <- TRUE
  adlFileOutputStream$buffer <- raw(0) # release byte buffer so it can be GC'ed even if app continues to hold reference to stream
  return(NULL)
}

# ADLS Egress - AdlFileInputStream ----

#' Read an adlFileInputStream.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @param position position in file to read from (starts from zero)
#' @param buffer raw buffer to read into
#' @param offset offset into the byte buffer at which to read the data into
#' @param length number of bytes to read
#' @param verbose Print tracing information (default FALSE)
#' @return list that contains number of bytes read and the `buffer`
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.readfully <- function(adlFileInputStream, 
                                   position, buffer, offset, length, 
                                   verbose = FALSE) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }
  assert_that(is_position(position))
  assert_that(is_content(buffer))
  assert_that(is_offset(offset))
  assert_that(is_length(length))

  if (position < 0) {
    stop("IllegalArgumentException: attempting to read from negative offset")
  }
  if (position >= adlFileInputStream$directoryEntry$FileStatus.length) {
    return -1;  # Hadoop prefers -1 to EOFException
  }
  if (is.null(buffer)) {
    stop("IllegalArgumentException: null byte array passed in to read() method")
  }
  if (offset < 1) {
    stop("IllegalArgumentException: offset less than 1")
  }
  if (offset >= getContentSize(buffer)) {
    stop("IllegalArgumentException: offset greater than length of array")
  }
  if (length < 0) {
    stop("IllegalArgumentException: requested read length is less than zero")
  }
  if (length > (getContentSize(buffer) - (offset - 1))) {
    stop("IllegalArgumentException: requested read length is more than will fit after requested offset in buffer")
  }

  resHttp <- adls.read.core(adlFileInputStream$azureActiveContext, 
                    adlFileInputStream$accountName, adlFileInputStream$relativePath, 
                    position, length, 
                    verbose = verbose)
  stopWithAzureError(resHttp)
  resRaw <- content(resHttp, "raw", encoding = "UTF-8")
  resRawLen <- getContentSize(resRaw)
  buffer[offset:(offset + length - 1)] <- resRaw[1:resRawLen]
  res <- list(resRawLen, buffer)
  return(res)
}

#' Buffered read an adlFileInputStream.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @param buffer raw buffer to read into
#' @param offset offset into the byte buffer at which to read the data into
#' @param length number of bytes to read
#' @param verbose Print tracing information (default FALSE)
#' @return list that contains number of bytes read and the `buffer`
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.read <- function(adlFileInputStream, 
                                   buffer, offset, length, 
                                   verbose = FALSE) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }
  if (!is.null(adlFileInputStream$azureActiveContext)) {
    assert_that(is.azureActiveContext(adlFileInputStream$azureActiveContext))
    azureCheckToken(adlFileInputStream$azureActiveContext)
  }
  assert_that(is_content(buffer))
  assert_that(is_offset(offset))
  assert_that(is_length(length))

  if (offset < 1L || length < 0L || length > (getContentSize(buffer) - (offset - 1L))) {
    stop("IndexOutOfBoundsException")
  }
  if (length == 0) {
    res <- list(0, buffer)
    return(res)
  }

  #If buffer is empty, then fill the buffer. If EOF, then return -1
  if (adlFileInputStream$bCursor == adlFileInputStream$limit) {
    if (adls.fileinputstream.readfromservice(adlFileInputStream) < 0) {
      res <- list(-1, buffer)
      return(res)
    }
  }

  bytesRemaining <- (adlFileInputStream$limit - adlFileInputStream$bCursor)
  limits <- c(length, bytesRemaining)
  bytesToRead <- limits[which.min(limits)]
  buffer[offset:(offset - 1 + bytesToRead)] <- 
    adlFileInputStream$buffer[adlFileInputStream$bCursor:(adlFileInputStream$bCursor - 1 + bytesToRead)]
  adlFileInputStream$bCursor <- adlFileInputStream$bCursor + bytesToRead
  res <- list(bytesToRead, buffer)
  return(res)
}

#' Seek to given position in stream.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @param n position to seek to
#' @return NULL (void)
#' @details Exceptions, EOFException
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.seek <- function(adlFileInputStream, n) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if (adlFileInputStream$streamClosed) stop("IOException: attempting to seek into a closed stream")
  if (n<0) stop("EOFException: Cannot seek to before the beginning of file")
  if (n>adlFileInputStream$directoryEntry$FileStatus.length) stop("EOFExceptionCannot: seek past end of file")

  if (n>=adlFileInputStream$fCursor-adlFileInputStream$limit && n<=adlFileInputStream$fCursor) { # within buffer
    adlFileInputStream$bCursor <- (n-(adlFileInputStream$fCursor-adlFileInputStream$limit));
    return;
  }

  # next read will read from here
  adlFileInputStream$fCursor = n

  #invalidate buffer
  adlFileInputStream$limit = 0;
  adlFileInputStream$bCursor = 0;
  
  return(NULL)
}

#' Skip to given position in stream.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @param n position to seek to
#' @return Skips over and discards n bytes of data from the input stream. 
#'     The skip method may, for a variety of reasons, end up skipping over some smaller 
#'     number of bytes, possibly 0. The actual number of bytes skipped is returned.
#' @details Exceptions - IOException, EOFException
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.skip <- function(adlFileInputStream, n) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if (adlFileInputStream$streamClosed) stop("IOException: attempting to seek into a closed stream")
  currentPos <- adls.fileinputstream.getpos(adlFileInputStream)
  newPos <- (currentPos + n)
  if (newPos < 0) {
    newPos <- 0
    n <- (newPos - currentPos)
  }
  if (newPos > adlFileInputStream$directoryEntry$FileStatus.length) {
    newPos <- adlFileInputStream$directoryEntry$FileStatus.length
    n <- newPos - currentPos
  }
  adls.fileinputstream.seek(adlFileInputStream, newPos)
  return(n)
}

#' returns the remaining number of bytes available to read from the buffer, without having to call
#' the server
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @return the number of bytes availabel
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.available <- function(adlFileInputStream) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if (adlFileInputStream$streamClosed) stop("IOException: attempting to call available() on a closed stream")
  return(adlFileInputStream$limit - adlFileInputStream$bCursor)
}

#' Returns the length of the file that this stream refers to. Note that the length returned is the length
#' as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
#' they wont be reflected in the returned length.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @return length of the file.
#' @details Exceptions - IOException if the stream is closed
#' 
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.length <- function(adlFileInputStream) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if (adlFileInputStream$streamClosed) stop("IOException: attempting to call length() on a closed stream")
  return(adlFileInputStream$directoryEntry$FileStatus.length)
}

#' gets the position of the cursor within the file
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @return position of the cursor
#' @details Exceptions - IOException
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.getpos <- function(adlFileInputStream) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if(adlFileInputStream$streamClosed) {
    stop("IOException: attempting to call getPos() on a closed stream")
  }
  return(adlFileInputStream$fCursor - adlFileInputStream$limit + adlFileInputStream$bCursor)
}

#' Close an adlFileInputStream.
#'
#' @param adlFileInputStream adlFileInputStream of the file
#' @param verbose Print tracing information (default FALSE)
#' @return NULL (void)
#'
#' @family Azure Data Lake Store functions
#' @export
adls.fileinputstream.close <- function(adlFileInputStream,
                                     verbose = FALSE) {
  if (!missing(adlFileInputStream) && !is.null(adlFileInputStream)) {
    assert_that(is.adlFileInputStream(adlFileInputStream))
    adlFileInputStreamCheck(adlFileInputStream)
  }

  if(adlFileInputStream$streamClosed) return(NULL) # Return silently upon multiple closes
  adlFileInputStream$streamClosed <- TRUE
  adlFileInputStream$buffer <- raw(0) # release byte buffer so it can be GC'ed even if app continues to hold reference to stream
  return(NULL)
}
Microsoft/AzureSMR documentation built on July 7, 2019, 11:25 p.m.