context("Dataset stream")
#
mock_stream_rows <- data.frame(
birthyr = c(0.577530, 0.577530),
gender = c(2, 1), # Female , Male
subvar1 = c(2, 2),
subvar2 = c(1, 1),
subvar3 = c(1, 1),
textVar = c("a", "b"),
starttime = c("1955-12-28", "1955-12-29")
)
with_mock_crunch({
ds <- loadDataset("1streaming", project = NULL) ## has 2 messages waiting, 4 rows received
ds2 <- loadDataset("2", "archived", project = NULL) ## Has no streams mock
ds3 <- loadDataset("streaming-no-msg", project = NULL) ## has 0 messages waiting, 0 rows received
test_that("pendingStream gets pending messages", {
expect_equal(pendingStream(ds), 2)
expect_GET(pendingStream(ds2), "https://app.crunch.io/api/datasets/2/stream")
})
test_that("streaming attribute setting", {
expect_equal(streaming(ds), "streaming")
expect_PATCH(
streaming(ds) <- "finished",
"https://app.crunch.io/api/datasets/1streaming/",
'{"streaming":"finished"}'
)
expect_error(
streaming(ds) <- "foo bar",
"'arg' should be one of .*no.*, .*streaming.*, .*finished.*"
)
})
test_that("streamRows streams rows", {
expect_equal(streamRows(ds, data = data.frame()), ds)
expect_POST(
streamRows(ds, data = mock_stream_rows),
"https://app.crunch.io/api/datasets/1streaming/stream/",
'{"birthyr":0.5775,"gender":2,"subvar1":2,"subvar2":1,"subvar3":1,',
'"textVar":"a","starttime":"1955-12-28"}\n',
'{"birthyr":0.5775,"gender":1,"subvar1":2,"subvar2":1,"subvar3":1,',
'"textVar":"b","starttime":"1955-12-29"}'
)
expect_POST(
streamRows(ds, data = mock_stream_rows[2, ]),
"https://app.crunch.io/api/datasets/1streaming/stream/",
'{"birthyr":0.5775,"gender":1,"subvar1":2,"subvar2":1,"subvar3":1,',
'"textVar":"b","starttime":"1955-12-29"}'
)
})
test_that("appendStream", {
expect_POST(
appendStream(ds),
"https://app.crunch.io/api/datasets/1streaming/batches/",
'{"element":"shoji:entity","body":{',
'"type":"ldjson","stream":null}'
)
expect_message(appendStream(ds3), "There's no pending stream data to be appended.")
})
})
stream_rows <- data.frame(
v1 = c(0.75331105, 0.75331105),
v2 = c("a", "b"),
v3 = c(28, 29),
v4 = c(1, 1),
v5 = c("1955-11-25T00:00:00.000000+00:00", "1955-11-26T00:00:00.000000+00:00"),
v6 = c(1, 1)
)
with_test_authentication({
metadata <- fromJSON(datasetFixturePath("streaming.json"), simplifyVector = FALSE)
ds <- suppressMessages(uploadMetadata(metadata))
test_that("streamRows streams rows", {
expect_equal(pendingStream(ds), 0)
ds <- streamRows(ds, data = stream_rows)
expect_equal(pendingStream(ds), 1)
})
test_that("appendStream appends all pending rows", {
ds <- appendStream(ds)
expect_equal(nrow(ds), 2)
expect_equal(pendingStream(ds), 0)
})
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.