R/endpoint.R

Defines functions copy_to.kusto_database_endpoint print.kusto_database_endpoint check_endpoint_properties find_endpoint_token find_type_from_connstring normalize_connstring_properties kusto_database_endpoint

Documented in copy_to.kusto_database_endpoint kusto_database_endpoint

#' Endpoints for communicating with a Kusto database
#'
#' @param ... Named arguments which are the properties for the endpoint object. See 'Details' below for the properties that AzureKusto recognises.
#' @param .connection_string An alternative way of specifying the properties, as a database connection string. Properties supplied here override those in `...` if they overlap.
#' @param .query_token Optionally, an Azure Active Directory (AAD) token to authenticate with. If this is supplied, it overrides other tokens specified in `...` or in the connection string.
#' @param .use_integer64 For `kusto_database_endpoint`, whether to convert columns with Kusto `long` datatype into 64-bit integers in R, using the bit64 package. If FALSE, represent them as numeric instead.
#'
#' @details
#' This is a list of properties recognised by `kusto_database_endpoint`, and their alternate names. Property names not in this list will generate an error. Note that not all properties that are recognised are currently supported by AzureKusto.
#'
#' General properties:
#' - server: The URI of the server, usually of the form 'https://clustername.location.kusto.windows.net'.
#'   * addr, address, network address, datasource, host
#' - database: The database.
#'   * initialcatalog, dbname
#' - tenantid: The AAD tenant name or ID to authenticate with.
#'   * authority
#' - appclientid: The AAD app/service principal ID
#'   * applicationclientid
#' - traceclientversion: The client version for tracing.
#' - queryconsistency: The level of query consistency. Defaults to "weakconsistency".
#' - response_dynamic_serialization: How to serialize dynamic responses.
#' - response_dynamic_serialization_2: How to serialize dynamic responses.
#'
#' User authentication properties:
#  - pwd: The user password.
#'   * password
#' - user: The user name.
#'   * uid, userid
#' - traceusername: The user name for tracing.
#' - usertoken: The AAD token for user authentication.
#' - * usertoken, usrtoken
#' - fed: Logical, whether federated authentication is enabled. Currently unsupported; if this is TRUE, `kusto_database_endpoint` will print a warning and ignore it.
#'   * federated security, federated, aadfed, aadfederatedsecurity
#'
#' App authentication properties:
#' - appkey: The secret key for the app.
#'   * applicationkey
#' - traceappname: The AAD app for tracing.
#' - apptoken: The AAD token for app authentication.
#'   * apptoken, applicationtoken
#'
#' Currently, AzureKusto only supports authentication via Azure Active Directory. Authenticating with DSTS is planned for the future.
#'
#' The way `kusto_database_endpoint` obtains an AAD token is as follows.
#' 1. If the `.query_token` argument is supplied, use it.
#' 2. Otherwise, if the `usertoken` property is supplied, use it.
#' 3. Otherwise, if the `apptoken` property is supplied, use it.
#' 4. Otherwise, if the `appclientid` property is supplied, use it to obtain a token:
#'    - With the `user` and `pwd` properties if available
#'    - Or with the `appkey` property if available
#'    - Otherwise do an interactive authentication and ask for the user credentials
#' 5. Otherwise, if no `appclientid` property is supplied, authenticate with the KustoClient app:
#'    - With the `user` and `pwd` properties if available
#'    - Otherwise do an interactive authentication and ask for the user credentials using a device code
#'
#' @return
#' An object of class `kusto_database_endpoint`.
#'
#' @examples
#' \dontrun{
#'
#' kusto_database_endpoint(server="myclust.australiaeast.kusto.windows.net", database="db1")
#'
#' # supplying a token obtained previously
#' token <- get_kusto_token("myclust.australiaeast.kusto.windows.net")
#' kusto_database_endpoint(server="myclust.australiaeast.kusto.windows.net", database="db1",
#'                         .query_token=token)
#'
#' }
#' @seealso
#' [run_query], [az_kusto_database]
#' @rdname database_endpoint
#' @export
kusto_database_endpoint <- function(..., .connection_string=NULL, .query_token=NULL, .use_integer64=FALSE)
{
    props <- list(...)
    names(props) <- tolower(names(props))

    if(!is.null(.connection_string))
    {
        # simplified connection string handling: ignores quoting issues
        conn_props <- strsplit(.connection_string, ";")[[1]]
        names(conn_props) <- tolower(sapply(conn_props, function(x) sub("[ ]*=.+$", "", x)))
        conn_props <- lapply(conn_props, function(x)
        {
            x <- sub("^[^=]+=[ ]*", "", x)
            find_type_from_connstring(x)
        })
        props <- utils::modifyList(props, conn_props)
    }

    # fix all property names to a given (sub)set, remove quotes from quoted values
    props <- normalize_connstring_properties(props)

    # Make bare cluster name into FQDN for server if it's not already
    if (!startsWith(props$server, "https://"))
    {
        props$server <- paste0("https://", props$server)
        if (!endsWith(props$server, ".kusto.windows.net"))
            props$server <- paste0(props$server, ".kusto.windows.net")
    }
    props$token <- find_endpoint_token(props, .query_token)

    props$use_integer64 <- .use_integer64
    props <- check_endpoint_properties(props)

    class(props) <- "kusto_database_endpoint"
    props
}


normalize_connstring_properties <- function(properties)
{
    # valid property names for a Kusto connection string
    property_list <- list(
        # general properties
        traceclientversion="traceclientversion",
        server=c("server", "addr", "address", "network address", "datasource", "host", "cluster"),
        database=c("database", "initialcatalog", "dbname"),
        tenantid=c("tenantid", "authority"),
        queryconsistency="queryconsistency",
        response_dynamic_serialization="response_dynamic_serialization",
        response_dynamic_serialization_2="response_dynamic_serialization_2",

        # userauth properties -- DSTS not yet supported
        fed=c("fed", "federated security", "federated", "aadfed", "aadfederatedsecurity"),
        pwd=c("pwd", "password"),
        user=c("user", "uid", "userid"),
        traceusername="traceusername",
        usertoken=c("usertoken", "usrtoken"),

        # appauth properties -- cert, DSTS not yet supported
        appclientid=c("appclientid", "applicationclientid"),
        appkey=c("appkey", "applicationkey"),
        traceappname="traceappname",
        apptoken=c("apptoken", "applicationtoken")
    )

    normalize_name <- function(x)
    {
        for(i in seq_along(property_list))
        {
            if(x %in% property_list[[i]])
                return(names(property_list)[i])
        }
        stop("Invalid/unsupported property name: ", x, call.=FALSE)
    }

    strip_quotes <- function(x)
    {
        if(is.character(x))
            sub("^['\"](.+)['\"]$", "\\1", x)
        else x
    }

    names(properties) <- sapply(names(properties), normalize_name)
    lapply(properties, strip_quotes)
}


# for a property obtained from a connection string, convert to a type other than char if possible
find_type_from_connstring <- function(string)
{
    if(identical(string, "NA"))
        return(NA)
    if(!is.na(x <- suppressWarnings(as.numeric(string)))) # assign inside comparison!
        return(x)
    if(!is.na(l <- as.logical(string)))
        return(l)
    string
}


find_endpoint_token <- function(properties, .query_token)
{
    if(!is.null(.query_token))
        return(.query_token)

    # properties to check for token: usertoken, apptoken, appclientid, appkey
    if(!is_empty(properties$usertoken))
        return(properties$usertoken)

    if(!is_empty(properties$apptoken))
        return(properties$apptoken)

    # if no app ID supplied, insert Kusto app ID
    if(is_empty(properties$appclientid))
    {
        message("No app ID supplied; using KustoClient app")
        properties$appclientid <- .kusto_app_id
        auth_type <- "authorization_code"
    }
    else auth_type <- NULL  # KustoClient needs devicecode, otherwise let get_azure_token choose

    # possibilities for authenticating with AAD:
    # - appid + username + userpwd
    # - appid + appkey
    # - appid only (auth_code/device_code flow)
    token_pwd <- token_user <- NULL

    if(!is_empty(properties$user) && !is_empty(properties$pwd))
    {
        token_pwd <- properties$pwd
        token_user <- properties$user
        auth_type <- "resource_owner"
    }
    else if(!is_empty(properties$appkey) && properties$appclientid != .kusto_app_id)
    {
        token_pwd <- properties$appkey
        auth_type <- "client_credentials"
    }

    return(get_kusto_token(properties$server, tenant=properties$tenantid,
        app=properties$appclientid, password=token_pwd, username=token_user, auth_type=auth_type))
}


# internal checks on various properties
check_endpoint_properties <- function(props)
{
    if(isTRUE(props$use_integer64) && !requireNamespace("bit64", quietly=TRUE))
    {
        warning("bit64 package not installed, cannot use 64-bit integers")
        props$use_integer64 <- FALSE
    }

    if(!is_empty(props[["response_dynamic_serialization"]]) &&
       tolower(props[["response_dynamic_serialization"]]) != "string")
    {
        warning("Serialization of dynamic response to JSON is not yet supported")
        props[["response_dynamic_serialization"]] <- NULL
    }

    props
}


#' @export
print.kusto_database_endpoint <- function(x, ...)
{
    url <- httr::parse_url(x$server)
    url$path <- x$database
    cat("<Kusto database endpoint '", httr::build_url(url), "'>\n", sep="")
    invisible(x)
}

#' This function uploads a local data frame into a remote data source, creating the table definition as needed.
#' If the table exists, it will append the data to the existing table. If not, it will create a new table.
#' @export
#' @param dest remote data source
#' @param df local data frame
#' @param name Name for new remote table
#' @param overwrite If `TRUE`, will overwrite an existing table with
#'   name `name`. If `FALSE`, will throw an error if `name` already
#'   exists.
#' @param method For local ingestion, the method to use. "inline", "streaming", or "indirect".
#' @param ... other parameters passed to the query
#' @seealso [collect()] for the opposite action; downloading remote data into a local tbl.
copy_to.kusto_database_endpoint <- function(dest, df, name=deparse(substitute(df)), overwrite = FALSE, method = "inline", ...)
{
    if (!is.data.frame(df) && !inherits(df, "tbl_kusto"))
        stop("`df` must be a local dataframe or a remote tbl_kusto", call. = FALSE)

    if (inherits(df, "tbl_kusto") && dest$server == df$src$server)
        out <- compute(df, name = name, ...)
    else
    {
        df <- collect(df)
        class(df) <- "data.frame" # avoid S4 dispatch problem in dbSendPreparedQuery

        #initialize DBI connection
        cnxn <- new("AzureKustoConnection", endpoint=dest)
        tableExists <- DBI::dbExistsTable(cnxn, name)
        if (tableExists)
        {
            if(overwrite)
                DBI::dbRemoveTable(cnxn, name)
            else stop(paste0("table ",
                             name,
                             " already exists. If you wish to overwrite it, specify overwrite=TRUE"))
        }
        dbWriteTable(cnxn, name, df, method=method)

        out <- tbl_kusto(dest, name)
    }
    invisible(out)
}
Azure/AzureKusto documentation built on Oct. 16, 2023, 7:04 p.m.