Nothing
#############################################################################################
# ingestion can have significant and unpredictable latencies, do not run in automated fashion
#############################################################################################
context("Ingesting")
# skip from testing
skip("Automated database ingestion tests skipped: run manually")
tenant <- Sys.getenv("AZ_TEST_TENANT_ID")
app <- Sys.getenv("AZ_TEST_APP_ID")
password <- Sys.getenv("AZ_TEST_PASSWORD")
subscription <- Sys.getenv("AZ_TEST_SUBSCRIPTION")
if(tenant == "" || app == "" || password == "" || subscription == "")
skip("Tests skipped: ARM credentials not set")
# this will add AzureStor to search path
if(!require("AzureStor"))
skip("Database ingestion tests skipped: AzureStor package not found")
# use persistent testing resources
rgname <- Sys.getenv("AZ_TEST_KUSTO_SERVER_RG")
username <- Sys.getenv("AZ_TEST_KUSTO_USERNAME")
srvname <- Sys.getenv("AZ_TEST_KUSTO_SERVER")
srvloc <- Sys.getenv("AZ_TEST_KUSTO_SERVER_LOCATION")
blobacct <- Sys.getenv("AZ_TEST_KUSTO_BLOBACCT")
blobcont <- Sys.getenv("AZ_TEST_KUSTO_BLOBCONT")
adlsacct <- Sys.getenv("AZ_TEST_KUSTO_ADLSACCT")
adlscont <- Sys.getenv("AZ_TEST_KUSTO_ADLSCONT")
storage_app <- Sys.getenv("AZ_TEST_KUSTO_STORAGE_APP")
storage_pwd <- Sys.getenv("AZ_TEST_KUSTO_STORAGE_PWD")
if(rgname == "" || username == "" ||
srvname == "" || srvloc == "" ||
blobacct == "" || blobcont == "" ||
adlsacct == "" || adlscont == "")
skip("Database ingestion tests skipped: server info not set")
rg <- AzureRMR::az_rm$
new(tenant=tenant, app=app, password=password)$
get_subscription(subscription)$
get_resource_group(rgname)
srv <- rg$get_kusto_cluster(srvname)
# generate random ingestion database
dbname <- paste0("ingest", paste0(sample(letters, 5), collapse=""))
dbres <- srv$create_database(dbname)
dbres$add_principals(name="user1", role="Admin", fqn=paste0("aaduser=", username))
db <- dbres$get_database_endpoint()
blobstor <- rg$get_storage_account(blobacct)
adlsstor <- rg$get_storage_account(adlsacct)
test_that("blob ingestion works",
{
bloburl <- sprintf("https://%s.blob.core.windows.net/%s/iris.csv", blobacct, blobcont)
# blob with key
blobkey <- blobstor$list_keys()[1]
run_query(db, ".create table irisblobkey (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_blob(db, bloburl, "irisblobkey", key=blobkey, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisblobkey | count")$Count, 150)
# blob with sas
blobsas <- blobstor$get_account_sas()
run_query(db, ".create table irisblobsas (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_blob(db, bloburl, "irisblobsas", sas=blobsas, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisblobsas | count")$Count, 150)
})
test_that("ADLSgen2 ingestion works",
{
adlsurl <- sprintf("https://%s.dfs.core.windows.net/%s/iris.csv", adlsacct, adlscont)
# adls2 with key
adlskey <- adlsstor$list_keys()[1]
run_query(db, ".create table irisadlskey (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_adls2(db, adlsurl, "irisadlskey", key=adlskey, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisadlskey | count")$Count, 150)
# adls2 with token
adlstok <- AzureAuth::get_azure_token("https://storage.azure.com/",
tenant=tenant, app=storage_app, password=storage_pwd)
run_query(db, ".create table irisadlstok (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_adls2(db, adlsurl, "irisadlstok", tok=adlstok, ignoreFirstRecord=TRUE)
expect_equal(run_query(db, "irisadlstok | count")$Count, 150)
})
test_that("local indirect ingestion works",
{
bl <- blobstor$get_blob_endpoint()
stagecont <- AzureStor::create_blob_container(bl, db$database)
run_query(db, ".create table irisfileindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileindirect", method="indirect",
staging_container=stagecont)
expect_equal(run_query(db, "irisfileindirect | count")$Count, 150)
run_query(db, ".create table irisdfindirect (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfindirect", method="indirect",
staging_container=stagecont)
expect_equal(run_query(db, "irisdfindirect | count")$Count, 150)
})
test_that("local streaming ingestion works",
{
run_query(db,
sprintf(".alter database %s policy streamingingestion '{\"NumberOfRowStores\": 10}'", db$database))
run_query(db, ".create table irisfilestream (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfilestream", method="streaming",
streamFormat="Csv")
expect_equal(run_query(db, "irisfilestream | count")$Count, 150)
run_query(db, ".create table irisdfstream (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfstream", method="streaming")
expect_equal(run_query(db, "irisdfstream | count")$Count, 150)
})
test_that("local inline ingestion works",
{
run_query(db, ".create table irisfileinline (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline", method="inline")
expect_equal(run_query(db, "irisfileinline | count")$Count, 150)
run_query(db, ".create table irisdfinline (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, iris, "irisdfinline", method="inline")
expect_equal(run_query(db, "irisdfinline | count")$Count, 150)
})
test_that("ingestion via compute() verb works",
{
run_query(db, ".create table irisfileinline2 (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline2", method="inline")
irisfileinline2 <- tbl_kusto(db, "irisfileinline2")
q <- irisfileinline2 %>%
dplyr::group_by(species) %>%
dplyr::summarize(max_sepal_length = max(sl))
new_tbl <- dplyr::compute(q, "irismaxsepallength")
expect_equal(new_tbl$src$table, "['irismaxsepallength']")
})
test_that("ingestion via copy_to() verb works",
{
tbl_iris <- iris
names(tbl_iris) <- c('SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species')
iris_copy_to <- dplyr::copy_to(db, tbl_iris, "iris_copy_to")
expect_equal(iris_copy_to$src$table, "['iris_copy_to']")
})
test_that("copy_to uses compute() when dest and src are in same Kusto database",
{
run_query(db, ".create table irisfileinline3 (sl:real, sw:real, pl:real, pw:real, species:string)")
ingest_local(db, "../resources/iris.csv", "irisfileinline3", method="inline")
irisfileinline3 <- tbl_kusto(db, "irisfileinline3")
q <- irisfileinline3 %>%
dplyr::group_by(species) %>%
dplyr::summarize(max_sepal_length = max(sl))
irismaxsepallength2 <- dplyr::copy_to(db, q, 'irismaxsepallength2')
expect_equal(irismaxsepallength2$src$table, "['irismaxsepallength2']")
})
srv$delete_database(dbname, confirm=FALSE)
delete_blob_container(blobstor$get_blob_endpoint(), dbname, confirm=FALSE)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.