Nothing
test_that("stream_* helpers work without collect()", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), msg = string_col(), val = float64())
s <- shards(10, block_size = 3, workers = 2)
out_path <- file.path(tempdir(), paste0("shard_test_stream_", shard:::unique_id()))
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
s,
out = list(sink = sink),
fun = function(shard, sink) {
df <- data.frame(
id = as.integer(shard$idx),
msg = paste0("x", shard$idx),
val = as.double(shard$idx),
stringsAsFactors = FALSE
)
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
expect_true(succeeded(res))
ds <- table_finalize(sink, materialize = "never")
expect_s3_class(ds, "shard_dataset")
expect_equal(stream_count(ds), 10L)
ssum <- stream_reduce(
ds,
f = function(chunk) sum(chunk$val),
init = 0,
combine = `+`
)
expect_equal(ssum, sum(as.double(1:10)))
filtered_path <- file.path(tempdir(), paste0("shard_test_stream_filt_", shard:::unique_id()))
ds2 <- stream_filter(ds, predicate = function(chunk) chunk$id %% 2L == 0L, path = filtered_path)
expect_s3_class(ds2, "shard_dataset")
df2 <- collect(ds2)
df2 <- as.data.frame(df2, stringsAsFactors = FALSE)
expect_equal(df2$id, as.integer(seq(2, 10, by = 2)))
unlink(out_path, recursive = TRUE, force = TRUE)
unlink(filtered_path, recursive = TRUE, force = TRUE)
pool_stop()
})
test_that("stream_map applies function to each partition", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), val = float64())
s <- shards(6, block_size = 2, workers = 2)
out_path <- file.path(tempdir(), paste0("shard_test_stream_map_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
s,
out = list(sink = sink),
fun = function(shard, sink) {
df <- data.frame(id = as.integer(shard$idx), val = as.double(shard$idx) * 2)
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
# stream_map returns list of per-partition results
results <- stream_map(ds, f = function(chunk) nrow(chunk))
expect_true(is.list(results))
expect_equal(sum(unlist(results)), 6L)
pool_stop()
})
test_that("stream_sum computes correct sum", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), val = float64())
s <- shards(10, block_size = 2, workers = 2)
out_path <- file.path(tempdir(), paste0("shard_test_stream_sum_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
s,
out = list(sink = sink),
fun = function(shard, sink) {
df <- data.frame(id = as.integer(shard$idx), val = as.double(shard$idx))
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
# Sum of 1:10 = 55
total <- stream_sum(ds, col = "val")
expect_equal(total, sum(1:10))
pool_stop()
})
test_that("stream_sum validates inputs", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), msg = string_col())
out_path <- file.path(tempdir(), paste0("shard_test_stream_sum_err_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(2, block_size = 1),
out = list(sink = sink),
fun = function(shard, sink) {
df <- data.frame(id = shard$id, msg = paste0("x", shard$id))
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
# String column cannot be summed
expect_error(stream_sum(ds, col = "msg"), "requires a numeric column")
# Unknown column
expect_error(stream_sum(ds, col = "nonexistent"), "Unknown column")
pool_stop()
})
test_that("stream_reduce validates f and combine", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32())
out_path <- file.path(tempdir(), paste0("shard_test_reduce_err_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(2, block_size = 1),
out = list(sink = sink),
fun = function(shard, sink) {
table_write(sink, shard$id, data.frame(id = shard$id))
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
expect_error(stream_reduce(ds, f = "not_a_function", init = 0, combine = `+`),
"f must be a function")
expect_error(stream_reduce(ds, f = nrow, init = 0, combine = "not_a_function"),
"combine must be a function")
pool_stop()
})
test_that("stream_filter validates predicate", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32())
out_path <- file.path(tempdir(), paste0("shard_test_filter_err_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(2, block_size = 1),
out = list(sink = sink),
fun = function(shard, sink) {
table_write(sink, shard$id, data.frame(id = shard$id))
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
expect_error(stream_filter(ds, predicate = "not_a_function"),
"predicate must be a function")
pool_stop()
})
test_that("stream_map validates f", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32())
out_path <- file.path(tempdir(), paste0("shard_test_map_err_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(2, block_size = 1),
out = list(sink = sink),
fun = function(shard, sink) {
table_write(sink, shard$id, data.frame(id = shard$id))
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
expect_error(stream_map(ds, f = "not_a_function"),
"f must be a function")
pool_stop()
})
test_that("stream_top_k finds top k rows", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), score = float64())
out_path <- file.path(tempdir(), paste0("shard_test_topk_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(10, block_size = 2),
out = list(sink = sink),
fun = function(shard, sink) {
df <- data.frame(id = as.integer(shard$idx), score = as.double(shard$idx))
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
# Top 3 by score (decreasing)
top3 <- stream_top_k(ds, col = "score", k = 3, decreasing = TRUE)
expect_true(nrow(top3) == 3)
expect_equal(as.integer(top3$id), c(10L, 9L, 8L))
# Bottom 3 (ascending)
bottom3 <- stream_top_k(ds, col = "score", k = 3, decreasing = FALSE)
expect_true(nrow(bottom3) == 3)
expect_equal(as.integer(bottom3$id), c(1L, 2L, 3L))
pool_stop()
})
test_that("stream_top_k validates inputs", {
skip_on_cran()
pool_stop()
sch <- schema(id = int32(), msg = string_col())
out_path <- file.path(tempdir(), paste0("shard_test_topk_err_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(2, block_size = 1),
out = list(sink = sink),
fun = function(shard, sink) {
table_write(sink, shard$id, data.frame(id = shard$id, msg = paste0("x", shard$id)))
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
# k must be >= 1
expect_error(stream_top_k(ds, col = "id", k = 0), "k must be >= 1")
# Unknown column
expect_error(stream_top_k(ds, col = "nonexistent", k = 3), "Unknown column")
# String column not supported
expect_error(stream_top_k(ds, col = "msg", k = 3), "requires an int32\\(\\) or float64\\(\\)")
pool_stop()
})
test_that("stream_group_sum computes grouped sums", {
skip_on_cran()
pool_stop()
sch <- schema(grp = factor_col(c("A", "B", "C")), val = float64())
out_path <- file.path(tempdir(), paste0("shard_test_grpsum_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(9, block_size = 3),
out = list(sink = sink),
fun = function(shard, sink) {
# Create pattern: A=1, B=2, C=3, A=4, B=5, C=6, A=7, B=8, C=9
grps <- c("A", "B", "C")[(shard$idx - 1L) %% 3L + 1L]
df <- data.frame(
grp = factor(grps, levels = c("A", "B", "C")),
val = as.double(shard$idx)
)
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
result <- stream_group_sum(ds, group = "grp", value = "val")
expect_true(is.data.frame(result))
expect_true("group" %in% names(result))
expect_true("sum" %in% names(result))
# A: 1+4+7=12, B: 2+5+8=15, C: 3+6+9=18
expect_equal(result$sum[result$group == "A"], 12)
expect_equal(result$sum[result$group == "B"], 15)
expect_equal(result$sum[result$group == "C"], 18)
pool_stop()
})
test_that("stream_group_count counts rows per group", {
skip_on_cran()
pool_stop()
sch <- schema(grp = factor_col(c("X", "Y")), id = int32())
out_path <- file.path(tempdir(), paste0("shard_test_grpcnt_", shard:::unique_id()))
on.exit(unlink(out_path, recursive = TRUE, force = TRUE), add = TRUE)
sink <- table_sink(sch, mode = "partitioned", path = out_path)
res <- shard_map(
shards(6, block_size = 2),
out = list(sink = sink),
fun = function(shard, sink) {
# Alternating X, Y
grps <- c("X", "Y")[(shard$idx - 1L) %% 2L + 1L]
df <- data.frame(
grp = factor(grps, levels = c("X", "Y")),
id = as.integer(shard$idx)
)
table_write(sink, shard$id, df)
NULL
},
workers = 2
)
ds <- table_finalize(sink, materialize = "never")
result <- stream_group_count(ds, group = "grp")
expect_true(is.data.frame(result))
expect_true("group" %in% names(result))
expect_true("n" %in% names(result))
# X: 1,3,5 (3 items), Y: 2,4,6 (3 items)
expect_equal(result$n[result$group == "X"], 3L)
expect_equal(result$n[result$group == "Y"], 3L)
pool_stop()
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.