| collect_chunked | R Documentation |
Streams a lazy query through R in bounded pieces and reduces them with f,
instead of materializing the whole result the way collect() does. The
engine pulls one batch (a data.frame of up to a few hundred thousand rows)
at a time; f is called as f(acc, chunk) and its return value becomes the
accumulator for the next batch. Peak memory is one batch plus whatever the
accumulator holds, so a result far larger than RAM can be reduced to a small
summary in a single pass.
collect_chunked(x, f, .init = NULL, combine = NULL, commutative = FALSE)
## Default S3 method:
collect_chunked(x, f, .init = NULL, combine = NULL, commutative = FALSE)
## S3 method for class 'vectra_node'
collect_chunked(x, f, .init = NULL, combine = NULL, commutative = FALSE)
## S3 method for class 'vectra_partition'
collect_chunked(x, f, .init = NULL, combine = NULL, commutative = FALSE)
x |
A |
f |
A function of two arguments |
.init |
Initial accumulator value. Passed to |
combine |
Optional function |
commutative |
Logical; declare that |
This is the streaming counterpart to a fold (Reduce()): use it when the
query returns more rows than fit in memory but the reduction is small. A
running count, per-group sufficient statistics, the cross-products X'X and
X'y behind a linear fit, an online mean or histogram - all accumulate in
bounded space across the stream. When you instead need the model-fitting
consumer to drive the iteration (and to re-read the data on each pass, as an
iteratively reweighted GLM does), use chunk_feeder().
The final accumulator. For a node: f applied left-to-right across
every batch, seeded with .init. For a partition: each shard folded with
f/.init, then those per-shard accumulators merged with combine.
chunk_feeder() for pull-based consumers such as biglm::bigglm(),
offload() for the replay cache and the partitioned monoidal reduce,
group_map() and group_modify() for per-shard application, and
collect() to materialize the full result.
f <- tempfile(fileext = ".vtr")
write_vtr(mtcars, f)
# Row count without materializing the result.
collect_chunked(tbl(f), function(acc, chunk) acc + nrow(chunk), .init = 0L)
# Accumulate the normal-equation pieces X'X and X'y for an exact OLS fit
# of mpg ~ wt + hp, in one streaming pass.
acc <- collect_chunked(
tbl(f) |> select(mpg, wt, hp),
function(acc, chunk) {
X <- cbind(1, chunk$wt, chunk$hp)
y <- chunk$mpg
list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
},
.init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1))
)
solve(acc$XtX, acc$Xty) # same as coef(lm(mpg ~ wt + hp, mtcars))
unlink(f)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.