tests/testthat/test-dataset-stream.R

context("Dataset stream")
#
mock_stream_rows <- data.frame(
    birthyr = c(0.577530, 0.577530),
    gender = c(2, 1), # Female , Male
    subvar1 = c(2, 2),
    subvar2 = c(1, 1),
    subvar3 = c(1, 1),
    textVar = c("a", "b"),
    starttime = c("1955-12-28", "1955-12-29")
)

with_mock_crunch({
    ds <- loadDataset("streaming test ds") ## has 2 messages waiting, 4 rows received
    ds2 <- loadDataset("an archived dataset", kind = "archived") ## Has no streams mock
    ds3 <- loadDataset("streaming no messages") ## has 0 messages waiting, 0 rows received
    test_that("pendingStream gets pending messages", {
        expect_equal(pendingStream(ds), 2)
        expect_GET(pendingStream(ds2), "https://app.crunch.io/api/datasets/2/stream")
    })

    test_that("streaming attribute setting", {
        expect_equal(streaming(ds), "streaming")
        expect_PATCH(
            streaming(ds) <- "finished",
            "https://app.crunch.io/api/datasets/1streaming/",
            '{"streaming":"finished"}'
        )
        expect_error(
            streaming(ds) <- "foo bar",
            "'arg' should be one of .*no.*, .*streaming.*, .*finished.*"
        )
    })

    test_that("streamRows streams rows", {
        expect_equal(streamRows(ds, data = data.frame()), ds)
        expect_POST(
            streamRows(ds, data = mock_stream_rows),
            "https://app.crunch.io/api/datasets/1streaming/stream/",
            '{"birthyr":0.5775,"gender":2,"subvar1":2,"subvar2":1,"subvar3":1,',
            '"textVar":"a","starttime":"1955-12-28"}\n',
            '{"birthyr":0.5775,"gender":1,"subvar1":2,"subvar2":1,"subvar3":1,',
            '"textVar":"b","starttime":"1955-12-29"}'
        )
        expect_POST(
            streamRows(ds, data = mock_stream_rows[2, ]),
            "https://app.crunch.io/api/datasets/1streaming/stream/",
            '{"birthyr":0.5775,"gender":1,"subvar1":2,"subvar2":1,"subvar3":1,',
            '"textVar":"b","starttime":"1955-12-29"}'
        )
    })

    test_that("appendStream", {
        expect_POST(
            appendStream(ds),
            "https://app.crunch.io/api/datasets/1streaming/batches/",
            '{"element":"shoji:entity","body":{',
            '"type":"ldjson","stream":null}'
        )
        expect_message(appendStream(ds3), "There's no pending stream data to be appended.")
    })
})


stream_rows <- data.frame(
    v1 = c(0.75331105, 0.75331105),
    v2 = c("a", "b"),
    v3 = c(28, 29),
    v4 = c(1, 1),
    v5 = c("1955-11-25T00:00:00.000000+00:00", "1955-11-26T00:00:00.000000+00:00"),
    v6 = c(1, 1)
)
with_test_authentication({
    metadata <- fromJSON(datasetFixturePath("streaming.json"), simplifyVector = FALSE)
    ds <- suppressMessages(uploadMetadata(metadata))
    test_that("streamRows streams rows", {
        expect_equal(pendingStream(ds), 0)
        ds <- streamRows(ds, data = stream_rows)
        expect_equal(pendingStream(ds), 1)
    })

    test_that("appendStream appends all pending rows", {
        ds <- appendStream(ds)
        expect_equal(nrow(ds), 2)
        expect_equal(pendingStream(ds), 0)
    })
})

Try the crunch package in your browser

Any scripts or data that you put into this service are public.

crunch documentation built on May 29, 2024, 5:03 a.m.