tests/testthat/test04_ingest.R

#############################################################################################
# ingestion can have significant and unpredictable latencies, do not run in automated fashion
#############################################################################################


context("Ingesting")

# skip from testing
skip("Automated database ingestion tests skipped: run manually")

tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")

if(tenant == "" || app == "" || password == "" || subscription == "")
    skip("Tests skipped: ARM credentials not set")

# this will add AzureStor to search path
if(!require("AzureStor"))
    skip("Database ingestion tests skipped: AzureStor package not found")

# use persistent testing resources
rgname <- Sys.getenv("AZ_TEST_KUSTO_SERVER_RG")
username <- Sys.getenv("AZ_TEST_KUSTO_USERNAME")
srvname <- Sys.getenv("AZ_TEST_KUSTO_SERVER")
srvloc <- Sys.getenv("AZ_TEST_KUSTO_SERVER_LOCATION")
blobacct <- Sys.getenv("AZ_TEST_KUSTO_BLOBACCT")
blobcont <- Sys.getenv("AZ_TEST_KUSTO_BLOBCONT")
adlsacct <- Sys.getenv("AZ_TEST_KUSTO_ADLSACCT")
adlscont <- Sys.getenv("AZ_TEST_KUSTO_ADLSCONT")

storage_app <- Sys.getenv("AZ_TEST_KUSTO_STORAGE_APP")
storage_pwd <- Sys.getenv("AZ_TEST_KUSTO_STORAGE_PWD")

if(rgname == "" || username == "" ||
   srvname == "" || srvloc == "" ||
   blobacct == "" || blobcont == "" ||
   adlsacct == "" || adlscont == "")
    skip("Database ingestion tests skipped: server info not set")

rg <- AzureRMR::az_rm$
    new(tenant=tenant, app=app, password=password)$
    get_subscription(subscription)$
    get_resource_group(rgname)

srv <- rg$get_kusto_cluster(srvname)

# generate random ingestion database
dbname <- paste0("ingest", paste0(sample(letters, 5), collapse=""))
dbres <- srv$create_database(dbname)

dbres$add_principals(name="user1", role="Admin", fqn=paste0("aaduser=", username))
db <- dbres$get_database_endpoint()


blobstor <- rg$get_storage_account(blobacct)
adlsstor <- rg$get_storage_account(adlsacct)


test_that("blob ingestion works",
{
    bloburl <- sprintf("https://%s.blob.core.windows.net/%s/iris.csv", blobacct, blobcont)

    # blob with key
    blobkey <- blobstor$list_keys()[1]
    run_query(db, ".create table irisblobkey (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_blob(db, bloburl, "irisblobkey", key=blobkey, ignoreFirstRecord=TRUE)
    expect_equal(run_query(db, "irisblobkey | count")$Count, 150)

    # blob with sas
    blobsas <- blobstor$get_account_sas()
    run_query(db, ".create table irisblobsas (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_blob(db, bloburl, "irisblobsas", sas=blobsas, ignoreFirstRecord=TRUE)
    expect_equal(run_query(db, "irisblobsas | count")$Count, 150)
})

test_that("ADLSgen2 ingestion works",
{
    adlsurl <- sprintf("https://%s.dfs.core.windows.net/%s/iris.csv", adlsacct, adlscont)

    # adls2 with key
    adlskey <- adlsstor$list_keys()[1]
    run_query(db, ".create table irisadlskey (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_adls2(db, adlsurl, "irisadlskey", key=adlskey, ignoreFirstRecord=TRUE)
    expect_equal(run_query(db, "irisadlskey | count")$Count, 150)

    # adls2 with token
    adlstok <- AzureAuth::get_azure_token("https://storage.azure.com/",
        tenant=tenant, app=storage_app, password=storage_pwd)
    run_query(db, ".create table irisadlstok (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_adls2(db, adlsurl, "irisadlstok", tok=adlstok, ignoreFirstRecord=TRUE)
    expect_equal(run_query(db, "irisadlstok | count")$Count, 150)
})

test_that("local indirect ingestion works",
{
    bl <- blobstor$get_blob_endpoint()
    stagecont <- AzureStor::create_blob_container(bl, db$database)

    run_query(db, ".create table irisfileindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, "../resources/iris.csv", "irisfileindirect", method="indirect",
                 staging_container=stagecont)
    expect_equal(run_query(db, "irisfileindirect | count")$Count, 150)

    run_query(db, ".create table irisdfindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, iris, "irisdfindirect", method="indirect",
                 staging_container=stagecont)
    expect_equal(run_query(db, "irisdfindirect | count")$Count, 150)
})

test_that("local streaming ingestion works",
{
    run_query(db,
        sprintf(".alter database %s policy streamingingestion '{\"NumberOfRowStores\": 10}'", db$database))

    run_query(db, ".create table irisfilestream (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, "../resources/iris.csv", "irisfilestream", method="streaming",
                 streamFormat="Csv")
    expect_equal(run_query(db, "irisfilestream | count")$Count, 150)

    run_query(db, ".create table irisdfstream (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, iris, "irisdfstream", method="streaming")
    expect_equal(run_query(db, "irisdfstream | count")$Count, 150)
})

test_that("local inline ingestion works",
{
    run_query(db, ".create table irisfileinline (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, "../resources/iris.csv", "irisfileinline", method="inline")
    expect_equal(run_query(db, "irisfileinline | count")$Count, 150)

    run_query(db, ".create table irisdfinline (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, iris, "irisdfinline", method="inline")
    expect_equal(run_query(db, "irisdfinline | count")$Count, 150)
})

test_that("ingestion via compute() verb works",
{
    run_query(db, ".create table irisfileinline2 (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, "../resources/iris.csv", "irisfileinline2", method="inline")
    irisfileinline2 <- tbl_kusto(db, "irisfileinline2")

    q <- irisfileinline2 %>%
        dplyr::group_by(species) %>%
        dplyr::summarize(max_sepal_length = max(sl))

    new_tbl <- dplyr::compute(q, "irismaxsepallength")
    expect_equal(new_tbl$src$table, "['irismaxsepallength']")
})

test_that("ingestion via copy_to() verb works",
{
    tbl_iris <- iris
    names(tbl_iris) <- c('SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species')
    iris_copy_to <- dplyr::copy_to(db, tbl_iris, "iris_copy_to")
    expect_equal(iris_copy_to$src$table, "['iris_copy_to']")
})

test_that("copy_to uses compute() when dest and src are in same Kusto database",
{
    run_query(db, ".create table irisfileinline3 (sl:real, sw:real, pl:real, pw:real, species:string)")
    ingest_local(db, "../resources/iris.csv", "irisfileinline3", method="inline")
    irisfileinline3 <- tbl_kusto(db, "irisfileinline3")

    q <- irisfileinline3 %>%
        dplyr::group_by(species) %>%
        dplyr::summarize(max_sepal_length = max(sl))

    irismaxsepallength2 <- dplyr::copy_to(db, q, 'irismaxsepallength2')
    expect_equal(irismaxsepallength2$src$table, "['irismaxsepallength2']")
})

srv$delete_database(dbname, confirm=FALSE)
delete_blob_container(blobstor$get_blob_endpoint(), dbname, confirm=FALSE)

Try the AzureKusto package in your browser

Any scripts or data that you put into this service are public.

AzureKusto documentation built on Oct. 12, 2023, 5:11 p.m.