knitr::opts_chunk$set(collapse = TRUE, comment = "#>") set.seed(1) has_biglm <- requireNamespace("biglm", quietly = TRUE) library(vectra)
This article works through one question: when can a computation run over data
that does not fit in memory, and at what cost. The answer turns out to be a
clean piece of algebra, and vectra exposes it through two pieces:
collect_chunked() for bounded folds, and offload() for spilling a query to
disk (or, with a key, splitting one into shards). Along the way the same
principle explains both what streams cheaply and what needs a sort.
There are two distinct settings, and conflating them causes most of the confusion about what is possible.
The streaming model assumes one pass over the data, working memory much smaller than the data, and no ability to store or re-read the input. In this setting some exact computations are provably impossible: computing the exact number of distinct values, or the exact median, requires memory proportional to the data. These are information-theoretic lower bounds, and no cleverness removes them. The escape is to approximate (sketches such as HyperLogLog and quantile sketches).
The external-memory model assumes bounded RAM and a disk to spill to, with cost measured in input/output volume. Here the picture changes completely. Any computable reduction over data that fits on disk is feasible; the lower bounds above do not bind, because they assumed the data could not be stored. The exact median is a sort followed by reading the middle element. The exact distinct count is a sort followed by a scan. What varies between reductions is the I/O cost, not whether they can be done. The only hard limit is data that exceeds the disk.
vectra lives in the second model. So the design question is never "is this
fittable" but "which cost tier does it land in," and the rest of this article is
about reading that tier off the structure of the reduction.
The cheapest tier is a single pass with a fixed-size accumulator. A reduction belongs here exactly when it is a fold whose accumulator is a bounded monoid:
reduce(data) = combine over chunks of ( accumulate over rows )
with three properties carrying their weight:
combine. combine(combine(a, b), c) equals
combine(a, combine(b, c)). This is what lets the data be split into chunks,
reduced independently, and merged. Algebraically the reduction descends from
the free monoid (sequences under concatenation): it does not depend on where
the chunk boundaries fall.combine, when present. The result does not depend on
the order of the chunks, so the reduction descends to the multiset and the
pieces may be processed in any order.collect_chunked() is the fold. The engine pulls one batch, applies
f(acc, chunk), frees the batch, and continues, so a result far larger than RAM
reduces to a small summary in bounded memory.
f <- tempfile(fileext = ".vtr") df <- data.frame(g = rep(c("a", "b", "c"), length.out = 6000), wt = rnorm(6000, 3, 1), hp = rnorm(6000, 150, 40)) df$mpg <- 30 - 5 * df$wt - 0.05 * df$hp + rnorm(6000, 0, 1) write_vtr(df, f)
A row count is the simplest monoid (the accumulator is an integer, combine is
addition):
collect_chunked(tbl(f), function(acc, chunk) acc + nrow(chunk), .init = 0L)
The normal-equation pieces X'X and X'y form a monoid too: each is a matrix,
and addition is the combine. Accumulating them in one pass gives an exact
least-squares fit without ever holding the design matrix.
acc <- collect_chunked( tbl(f) |> select(mpg, wt, hp), function(acc, chunk) { X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y)) }, .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1)) ) drop(solve(acc$XtX, acc$Xty)) coef(lm(mpg ~ wt + hp, data = df))
The two agree because summing per-batch cross-products is associative: the fit
is a monoid homomorphism from the data to the 3 x 3 accumulator.
A generalized linear model does not fit in one pass. Iteratively reweighted
least squares reweights every row by the current coefficient estimate, so the
data must be read once per iteration. biglm::bigglm() does exactly this, and
chunk_feeder() hands it a resettable stream. The catch is that re-reading a
query re-runs the whole pipeline (scan, joins, mutate) on every pass.
offload() removes that cost. It materializes a query once to a .vtr file and
returns a node that streams from the file. The returned node holds the same rows
as the input; the only change is where the bytes live and that replaying is now
a disk scan. In categorical terms it is an endofunctor on query streams that is
the identity on values and changes only the cost profile:
s <- offload(tbl(f) |> select(mpg, wt, hp)) identical(collect(s), collect(tbl(f) |> select(mpg, wt, hp))) s
Printing an offloaded node shows its cost grade, the honest label for what the
stream costs. A plain node reports a streaming scan; an offloaded node reports
the replay cache. explain() shows the same grade alongside the plan.
Feeding the replay cache to a fit is the point. chunk_feeder() accepts an
offloaded node directly, so each reweighted pass reads the prepared columns from
disk rather than rebuilding them:
s <- offload(tbl(f) |> select(mpg, wt, hp)) fit <- biglm::bigglm(mpg ~ wt + hp, data = chunk_feeder(s), family = gaussian()) coef(fit)
cat("> The out-of-core GLM example needs the **biglm** package, which is not", "installed, so it was not run here.\n")
Because the spill holds exactly the modelling columns, replay does no further work. Bake the selects and mutates into the query you offload, and the per-pass cost is one scan of the prepared table.
The one-pass tier covers reductions that decompose over independent rows. Many
models do not: a mixed model couples rows within a group, a Cox model couples
each event with everyone still at risk, a model with a scale() or spline term
couples through a quantity estimated from all the data. The move that rescues
them is to reorder so the coupling becomes local, then handle each local piece
on its own.
Two orderings cover most cases. Sorting by a key makes a cumulative coupling into a running aggregate; partitioning by a key makes a within-group coupling into independent blocks.
arrange() is already an external merge sort: it spills sorted runs to .vtr
and merges them with a bounded-memory heap, so sorting larger-than-RAM data is
built in.
tbl(f) |> arrange(desc(mpg)) |> head(3) |> collect()
offload(x, by = ...) splits a query into disjoint shards on disk in a single
streaming pass: one per key value for a discrete key, or per value range
(method = "range") or hash bucket (method = "hash") for a continuous or
high-cardinality key. The union of the shards reproduces the input, and row
totals are checked.
p <- offload(tbl(f), by = "g") p
Each shard is an ordinary node that fits in memory on its own, so a per-group
model is a function run over the shards. group_map() reads each shard into a
data.frame, hands it to the function with its key, and returns the results
keyed by shard:
fits <- group_map(p, function(d, g) coef(lm(mpg ~ wt + hp, data = d))) fits
When the per-shard result is itself a data.frame, group_modify() binds the
pieces into one table and restores the key as a column:
group_modify(p, function(d, g) data.frame(n = nrow(d), mean_mpg = mean(d$mpg)))
Partitioning also lets a single global reduction run as independent per-shard
folds whose results merge. This is where the algebra returns: merging partial
accumulators needs combine, and collect_chunked() takes it as an argument.
Supplying combine declares the reduction a monoid, with .init as its
identity.
accumulate <- function(acc, chunk) { X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y)) } combine <- function(a, b) list(XtX = a$XtX + b$XtX, Xty = a$Xty + b$Xty) acc <- collect_chunked( offload(tbl(f) |> select(g, mpg, wt, hp), by = "g"), accumulate, .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1)), combine = combine, commutative = TRUE ) drop(solve(acc$XtX, acc$Xty)) coef(lm(mpg ~ wt + hp, data = df))
The partitioned reduce reproduces the global fit, and that is the correctness
law made concrete. Offloading preserves the answer exactly when the reduction
factors through the quotient the spill plan quotients by. Associativity of
combine lets the chunk boundaries move (the reduction descends from the free
monoid). Commutativity lets the shards merge in any order (it descends to the
multiset). Here combine is matrix addition, which is both, so the partition
order is irrelevant and commutative = TRUE records that.
The declared algebra also fixes the cheapest legal plan:
| declared | what it permits | cheapest plan |
|---|---|---|
| semigroup (associative combine) | moving chunk boundaries | sequential spill and merge |
| monoid (identity .init too) | empty and parallel merges | partition, k-way merge |
| commutative monoid | reordering shards | shuffle, no stable sort |
Reading the structure of a reduction gives its tier directly:
| tier | I/O | peak memory | examples |
|---|---|---|---|
| monoid fold | O(n), one pass | O(1) | sum, count, mean, X'X |
| sort or partition localizable | O(n log n) | O(1) | median, distinct, per-group fits, Cox |
| all-to-all, no structure | O(n^2) or more | varies | dense kernel or Gaussian-process Gram |
Every tier is feasible with a disk; the tier sets the cost. The first is
collect_chunked() on a plain node. The second is arrange() or
offload(x, by = ...) ahead of a fold or a per-shard fit. The third still runs,
at quadratic I/O, and is usually attacked with approximation (inducing points,
low rank).
It is tempting to reach for heavier type theory here, since the correctness
condition is a statement about quotients. The lighter framing is the one that
fits the engine. The equality that matters, collect(offload(x)) equals
collect(x), is a plain proposition with no higher structure, and nothing about
it is checkable inside C or R at compile time. So the contract is a declared
monoid plus property tests: the test suite asserts the identity on values,
replay equivalence, and that a partitioned reduce equals the single-pass fold.
Those tests are the working proof of the quotient law.
collect_chunked().biglm::bigglm()): offload() the prepared
query once and feed it to chunk_feeder().offload(x, by = ...), then group_map() for a fit per shard,
group_modify() to recombine per-shard tables, or a monoidal reduce with
collect_chunked(..., combine = ).arrange(), which sorts externally.unlink(f)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.