Offloading: streaming, monoids, and out-of-core fits

knitr::opts_chunk$set(collapse = TRUE, comment = "#>")
set.seed(1)
has_biglm <- requireNamespace("biglm", quietly = TRUE)
library(vectra)

This article works through one question: when can a computation run over data that does not fit in memory, and at what cost. The answer turns out to be a clean piece of algebra, and vectra exposes it through two pieces: collect_chunked() for bounded folds, and offload() for spilling a query to disk (or, with a key, splitting one into shards). Along the way the same principle explains both what streams cheaply and what needs a sort.

Two cost models

There are two distinct settings, and conflating them causes most of the confusion about what is possible.

The streaming model assumes one pass over the data, working memory much smaller than the data, and no ability to store or re-read the input. In this setting some exact computations are provably impossible: computing the exact number of distinct values, or the exact median, requires memory proportional to the data. These are information-theoretic lower bounds, and no cleverness removes them. The escape is to approximate (sketches such as HyperLogLog and quantile sketches).

The external-memory model assumes bounded RAM and a disk to spill to, with cost measured in input/output volume. Here the picture changes completely. Any computable reduction over data that fits on disk is feasible; the lower bounds above do not bind, because they assumed the data could not be stored. The exact median is a sort followed by reading the middle element. The exact distinct count is a sort followed by a scan. What varies between reductions is the I/O cost, not whether they can be done. The only hard limit is data that exceeds the disk.

vectra lives in the second model. So the design question is never "is this fittable" but "which cost tier does it land in," and the rest of this article is about reading that tier off the structure of the reduction.

When a reduction streams in one pass

The cheapest tier is a single pass with a fixed-size accumulator. A reduction belongs here exactly when it is a fold whose accumulator is a bounded monoid:

reduce(data) = combine over chunks of ( accumulate over rows )

with three properties carrying their weight:

collect_chunked() is the fold. The engine pulls one batch, applies f(acc, chunk), frees the batch, and continues, so a result far larger than RAM reduces to a small summary in bounded memory.

f <- tempfile(fileext = ".vtr")
df <- data.frame(g = rep(c("a", "b", "c"), length.out = 6000),
                 wt = rnorm(6000, 3, 1), hp = rnorm(6000, 150, 40))
df$mpg <- 30 - 5 * df$wt - 0.05 * df$hp + rnorm(6000, 0, 1)
write_vtr(df, f)

A row count is the simplest monoid (the accumulator is an integer, combine is addition):

collect_chunked(tbl(f), function(acc, chunk) acc + nrow(chunk), .init = 0L)

The normal-equation pieces X'X and X'y form a monoid too: each is a matrix, and addition is the combine. Accumulating them in one pass gives an exact least-squares fit without ever holding the design matrix.

acc <- collect_chunked(
  tbl(f) |> select(mpg, wt, hp),
  function(acc, chunk) {
    X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
    list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
  },
  .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1))
)
drop(solve(acc$XtX, acc$Xty))
coef(lm(mpg ~ wt + hp, data = df))

The two agree because summing per-batch cross-products is associative: the fit is a monoid homomorphism from the data to the 3 x 3 accumulator.

The offload functor

A generalized linear model does not fit in one pass. Iteratively reweighted least squares reweights every row by the current coefficient estimate, so the data must be read once per iteration. biglm::bigglm() does exactly this, and chunk_feeder() hands it a resettable stream. The catch is that re-reading a query re-runs the whole pipeline (scan, joins, mutate) on every pass.

offload() removes that cost. It materializes a query once to a .vtr file and returns a node that streams from the file. The returned node holds the same rows as the input; the only change is where the bytes live and that replaying is now a disk scan. In categorical terms it is an endofunctor on query streams that is the identity on values and changes only the cost profile:

s <- offload(tbl(f) |> select(mpg, wt, hp))

identical(collect(s), collect(tbl(f) |> select(mpg, wt, hp)))
s

Printing an offloaded node shows its cost grade, the honest label for what the stream costs. A plain node reports a streaming scan; an offloaded node reports the replay cache. explain() shows the same grade alongside the plan.

Feeding the replay cache to a fit is the point. chunk_feeder() accepts an offloaded node directly, so each reweighted pass reads the prepared columns from disk rather than rebuilding them:

s <- offload(tbl(f) |> select(mpg, wt, hp))
fit <- biglm::bigglm(mpg ~ wt + hp, data = chunk_feeder(s),
                     family = gaussian())
coef(fit)
cat("> The out-of-core GLM example needs the **biglm** package, which is not",
    "installed, so it was not run here.\n")

Because the spill holds exactly the modelling columns, replay does no further work. Bake the selects and mutates into the query you offload, and the per-pass cost is one scan of the prepared table.

Localizing coupling by sorting and partitioning

The one-pass tier covers reductions that decompose over independent rows. Many models do not: a mixed model couples rows within a group, a Cox model couples each event with everyone still at risk, a model with a scale() or spline term couples through a quantity estimated from all the data. The move that rescues them is to reorder so the coupling becomes local, then handle each local piece on its own.

Two orderings cover most cases. Sorting by a key makes a cumulative coupling into a running aggregate; partitioning by a key makes a within-group coupling into independent blocks.

arrange() is already an external merge sort: it spills sorted runs to .vtr and merges them with a bounded-memory heap, so sorting larger-than-RAM data is built in.

tbl(f) |> arrange(desc(mpg)) |> head(3) |> collect()

offload(x, by = ...) splits a query into disjoint shards on disk in a single streaming pass: one per key value for a discrete key, or per value range (method = "range") or hash bucket (method = "hash") for a continuous or high-cardinality key. The union of the shards reproduces the input, and row totals are checked.

p <- offload(tbl(f), by = "g")
p

Each shard is an ordinary node that fits in memory on its own, so a per-group model is a function run over the shards. group_map() reads each shard into a data.frame, hands it to the function with its key, and returns the results keyed by shard:

fits <- group_map(p, function(d, g) coef(lm(mpg ~ wt + hp, data = d)))
fits

When the per-shard result is itself a data.frame, group_modify() binds the pieces into one table and restores the key as a column:

group_modify(p, function(d, g)
  data.frame(n = nrow(d), mean_mpg = mean(d$mpg)))

The monoidal reduce, and the law it rests on

Partitioning also lets a single global reduction run as independent per-shard folds whose results merge. This is where the algebra returns: merging partial accumulators needs combine, and collect_chunked() takes it as an argument. Supplying combine declares the reduction a monoid, with .init as its identity.

accumulate <- function(acc, chunk) {
  X <- cbind(1, chunk$wt, chunk$hp); y <- chunk$mpg
  list(XtX = acc$XtX + crossprod(X), Xty = acc$Xty + crossprod(X, y))
}
combine <- function(a, b) list(XtX = a$XtX + b$XtX, Xty = a$Xty + b$Xty)

acc <- collect_chunked(
  offload(tbl(f) |> select(g, mpg, wt, hp), by = "g"),
  accumulate,
  .init = list(XtX = matrix(0, 3, 3), Xty = matrix(0, 3, 1)),
  combine = combine, commutative = TRUE
)
drop(solve(acc$XtX, acc$Xty))
coef(lm(mpg ~ wt + hp, data = df))

The partitioned reduce reproduces the global fit, and that is the correctness law made concrete. Offloading preserves the answer exactly when the reduction factors through the quotient the spill plan quotients by. Associativity of combine lets the chunk boundaries move (the reduction descends from the free monoid). Commutativity lets the shards merge in any order (it descends to the multiset). Here combine is matrix addition, which is both, so the partition order is irrelevant and commutative = TRUE records that.

The declared algebra also fixes the cheapest legal plan:

| declared | what it permits | cheapest plan | |---|---|---| | semigroup (associative combine) | moving chunk boundaries | sequential spill and merge | | monoid (identity .init too) | empty and parallel merges | partition, k-way merge | | commutative monoid | reordering shards | shuffle, no stable sort |

The cost tiers

Reading the structure of a reduction gives its tier directly:

| tier | I/O | peak memory | examples | |---|---|---|---| | monoid fold | O(n), one pass | O(1) | sum, count, mean, X'X | | sort or partition localizable | O(n log n) | O(1) | median, distinct, per-group fits, Cox | | all-to-all, no structure | O(n^2) or more | varies | dense kernel or Gaussian-process Gram |

Every tier is feasible with a disk; the tier sets the cost. The first is collect_chunked() on a plain node. The second is arrange() or offload(x, by = ...) ahead of a fold or a per-shard fit. The third still runs, at quadratic I/O, and is usually attacked with approximation (inducing points, low rank).

A note on the abstraction

It is tempting to reach for heavier type theory here, since the correctness condition is a statement about quotients. The lighter framing is the one that fits the engine. The equality that matters, collect(offload(x)) equals collect(x), is a plain proposition with no higher structure, and nothing about it is checkable inside C or R at compile time. So the contract is a declared monoid plus property tests: the test suite asserts the identity on values, replay equivalence, and that a partitioned reduce equals the single-pass fold. Those tests are the working proof of the quotient law.

Choosing a path

unlink(f)


Try the vectra package in your browser

Any scripts or data that you put into this service are public.

vectra documentation built on June 12, 2026, 1:06 a.m.