Nothing
# --- tf_warp / tf_align -------------------------------------------------------
quiet_expected_registration_warnings <- function(expr) {
expected_patterns <- c(
"evaluations were `NA`",
"All .* evaluations on `arg =",
"Returning irregular <tfd>\\.",
"Returning regular data <tfd_reg> on the reduced grid\\.",
"argument-wise `mean` were `NA`",
"Attempting <tfd_irreg> - <tfd_irreg> for different argument values",
"interpolate = FALSE.*no values present",
"`NA`s created\\.",
"collapsing to unique 'x' values"
)
withCallingHandlers(
expr,
warning = function(w) {
msg <- conditionMessage(w)
is_expected <- any(vapply(
expected_patterns,
\(pat) grepl(pat, msg, perl = TRUE),
logical(1)
))
if (is_expected) {
invokeRestart("muffleWarning")
}
stop(w)
}
)
}
test_that("tf_warp and tf_align are inverses", {
withr::local_seed(1234)
# Two representative domains: one non-standard, one wide
min_t <- c(-2, 1)
max_t <- c(3, 10)
walk2(min_t, max_t, function(min_t, max_t) {
t <- seq(min_t, max_t, length.out = 101)
f0 <- tfd(sin((t - min_t) / (max_t - min_t) * 2 * pi), t)
x <- rep(f0, 10)
w <- tf_rgp(10, t) |> exp() |> tf_integrate(definite = FALSE)
w_min <- tf_fmin(w)
w_max <- tf_fmax(w)
w <- (w - w_min) / (w_max - w_min) * (max_t - min_t) + min_t
# Warp: class, length, domain
unreg <- tf_warp(x, w)
expect_s3_class(unreg, "tfd_reg")
expect_length(unreg, length(x))
expect_identical(tf_domain(unreg), tf_domain(x))
# keep_new_arg = TRUE returns tfd_irreg
expect_s3_class(tf_warp(x, w, keep_new_arg = TRUE), "tfd_irreg")
expect_s3_class(tf_align(x, w, keep_new_arg = TRUE), "tfd_irreg")
# Both inverse directions
expect_equal(tf_align(tf_warp(x, w), w), x, tolerance = 0.1)
expect_equal(tf_warp(tf_align(x, w), w), x, tolerance = 0.1)
})
})
test_that("tf_warp/tf_align identity roundtrip", {
t <- seq(-1, 2, length.out = 50)
f0 <- tfd(cos(t), t)
x <- rep(f0, 5)
w <- rep(tfd(t, arg = t), 5)
# Identity warp: both directions exact
expect_identical(tf_align(tf_warp(x, w), w), x)
expect_identical(tf_warp(tf_align(x, w), w), x)
# Also with keep_new_arg
expect_identical(tf_align(tf_warp(x, w, keep_new_arg = TRUE), w), x)
expect_identical(tf_warp(tf_align(x, w, keep_new_arg = TRUE), w), x)
})
test_that("tf_warp and tf_align accept tfb warps", {
t <- seq(-10, 10, length.out = 101)
x <- tfd(
t(sapply(c(-0.2, 0, 0.15), \(s) sin((t - s) * 0.3))),
arg = t
)
warp_tfd <- rep(tfd(t, arg = t), length(x))
warp_tfb <- suppressMessages(tfb(warp_tfd, k = 9))
expect_no_error(x_unreg <- tf_warp(x, warp_tfb))
expect_no_error(x_reg <- tf_align(x_unreg, warp_tfb))
expect_s3_class(x_unreg, "tfd")
expect_s3_class(x_reg, "tfd")
})
test_that("tf_warp/tf_align reset keep_new_arg for tfb and warn", {
t <- seq(0, 1, length.out = 51)
x_tfb <- suppressMessages(tfb(
matrix(sin(2 * pi * t), nrow = 1),
arg = t,
k = 9
))
warp <- rep(tfd(t, arg = t), length(x_tfb))
expect_warning(
warped <- tf_warp(x_tfb, warp, keep_new_arg = TRUE),
"reset to FALSE"
)
expect_s3_class(warped, "tfb")
expect_warning(
unwarped <- tf_align(x_tfb, warp, keep_new_arg = TRUE),
"reset to FALSE"
)
expect_s3_class(unwarped, "tfb")
})
test_that("tf_align handles irregular grids with non-domain-preserving warps", {
arg_x <- list(
seq(0, 1, length.out = 41),
seq(0, 1, length.out = 73)
)
x <- tfd(
list(
sin(2 * pi * arg_x[[1]]),
cos(2 * pi * arg_x[[2]])
),
arg = arg_x
)
warp_arg <- seq(0, 1, length.out = 101)
warp_tfd <- tfd(
rbind(
warp_arg + 0.15,
warp_arg - 0.12
),
arg = warp_arg
)
warp_tfb <- suppressMessages(tfb(warp_tfd, k = 9))
ret_irreg <- tf_align(x, warp_tfd, keep_new_arg = TRUE)
expect_s3_class(ret_irreg, "tfd_irreg")
expect_length(ret_irreg, length(x))
expect_identical(tf_domain(ret_irreg), tf_domain(x))
ret_irreg_tfb <- tf_align(x, warp_tfb, keep_new_arg = TRUE)
expect_s3_class(ret_irreg_tfb, "tfd_irreg")
expect_length(ret_irreg_tfb, length(x))
expect_identical(tf_domain(ret_irreg_tfb), tf_domain(x))
ret_grid <- quiet_expected_registration_warnings(
tf_align(x, warp_tfd, keep_new_arg = FALSE)
)
expect_length(ret_grid, length(x))
expect_identical(tf_domain(ret_grid), tf_domain(x))
})
# --- tf_registration class ----------------------------------------------------
test_that("tf_register returns tf_registration object", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift")
)
expect_s3_class(reg, "tf_registration")
expect_length(reg, length(x))
# Accessors work
expect_s3_class(tf_aligned(reg), "tfd")
expect_s3_class(tf_inv_warps(reg), "tfd")
expect_s3_class(tf_template(reg), "tfd")
expect_length(tf_aligned(reg), length(x))
expect_length(tf_inv_warps(reg), length(x))
expect_length(tf_template(reg), 1)
# Direct slot access
expect_identical(reg$registered, tf_aligned(reg))
expect_identical(reg$inv_warps, tf_inv_warps(reg))
expect_identical(reg$template, tf_template(reg))
expect_true(is.call(reg$call))
expect_identical(reg$x, x)
})
test_that("tf_register store_x = FALSE omits original data", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift", store_x = FALSE)
)
expect_s3_class(reg, "tf_registration")
expect_null(reg$x)
})
test_that("tf_registration print works", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift")
)
expect_no_error(print(reg))
})
test_that("tf_registration summary works", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift")
)
s <- summary(reg)
expect_s3_class(s, "summary.tf_registration")
expect_true(is.call(s$call))
expect_equal(s$n, 3)
expect_true(is.finite(s$amp_var_reduction))
# Warp deviation quantiles (relative to domain length)
expect_length(s$inv_warp_dev_quantiles, 7)
expect_true(all(is.finite(s$inv_warp_dev_quantiles)))
expect_true(all(s$inv_warp_dev_quantiles >= 0))
# Domain loss quantiles
expect_length(s$domain_loss_quantiles, 7)
expect_true(all(is.finite(s$domain_loss_quantiles)))
# Warp slope stats
expect_length(s$inv_warp_slope_range, 2)
expect_true(s$inv_warp_slope_range["min"] > 0) # monotone warps
expect_length(s$inv_warp_min_slopes, 7)
expect_length(s$inv_warp_max_slopes, 7)
expect_true(s$has_original)
expect_no_error(print(s))
# summary with store_x = FALSE
reg2 <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift", store_x = FALSE)
)
s2 <- summary(reg2)
expect_true(is.na(s2$amp_var_reduction))
expect_false(s2$has_original)
})
test_that("tf_registration plot works", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift")
)
pdf(tempfile(fileext = ".pdf"))
expect_no_error(plot(reg))
dev.off()
# Also with store_x = FALSE (2 panels instead of 3)
reg2 <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift", store_x = FALSE)
)
pdf(tempfile(fileext = ".pdf"))
expect_no_error(plot(reg2))
dev.off()
})
test_that("tf_registration subsetting works", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
reg <- quiet_expected_registration_warnings(
tf_register(x, method = "affine", type = "shift")
)
sub <- reg[1:2]
expect_s3_class(sub, "tf_registration")
expect_length(sub, 2)
expect_length(tf_aligned(sub), 2)
expect_length(tf_inv_warps(sub), 2)
expect_length(tf_template(sub), 1) # template unchanged
expect_length(sub$x, 2)
})
# --- tf_estimate_warps --------------------------------------------------------
test_that("tf_estimate_warps works for SRVF and CC methods", {
skip_if_not_installed("fdasrvf")
withr::local_seed(1234)
n_curves <- 3
t <- seq(0, 2 * pi, length.out = 61)
data <- sapply(seq_len(n_curves), \(i) {
phase_shift <- runif(1, -pi / 2, pi / 2)
amplitude <- runif(1, 0.8, 1.2)
amplitude * sin(t + phase_shift) + rnorm(length(t), 0, 0.1)
})
check_warp <- function(warp, x, label = "") {
domain <- tf_domain(x)
expect_s3_class(warp, "tfd")
expect_length(warp, length(x))
expect_identical(tf_domain(warp), domain)
expect_true(all(tf_fmin(warp) >= domain[1]), label = label)
expect_true(all(tf_fmax(warp) <= domain[2]), label = label)
}
x <- tfd(t(data))
# SRVF: default (Karcher mean) and explicit template
check_warp(tf_estimate_warps(x), x, "SRVF default")
check_warp(tf_estimate_warps(x, template = mean(x)), x, "SRVF template")
# CC: default and non-default crit (tests ... passthrough)
check_warp(
quiet_expected_registration_warnings(tf_estimate_warps(
x,
method = "cc",
max_iter = 1,
iterlim = 1
)),
x,
"CC smoke"
)
# tfd_irreg: SRVF/CC are currently unsupported
expect_error(
tf_estimate_warps(tfd(t(data)) |> tf_sparsify(.2), method = "srvf"),
"only `affine` and `landmark` registration are currently supported"
)
expect_error(
tf_estimate_warps(tfd(t(data)) |> tf_sparsify(.2), method = "cc"),
"only `affine` and `landmark` registration are currently supported"
)
})
test_that("tf_estimate_warps returns tfd (not tf_registration)", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
warp <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", type = "shift")
)
expect_s3_class(warp, "tfd")
expect_false(inherits(warp, "tf_registration"))
expect_true(!is.null(attr(warp, "template")))
})
test_that("tf_estimate_warps supports affine registration for irregular tfd", {
withr::local_seed(99)
t <- seq(0, 1, length.out = 101)
shifts <- c(-0.09, 0, 0.08)
x_vals <- lapply(
shifts,
\(s) sin(2 * pi * (t + s))
)
arg_list <- list(
sort(c(seq(0, 1, length.out = 45), runif(8))),
seq(0, 1, length.out = 63),
seq(0, 1, length.out = 87)
)
x_irreg <- tfd(
lapply(seq_along(arg_list), \(i) {
approx(t, x_vals[[i]], xout = arg_list[[i]], rule = 2)$y
}),
arg = arg_list
)
template <- tfd(
sin(2 * pi * seq(0, 1, length.out = 51)),
arg = seq(0, 1, length.out = 51)
)
warp <- quiet_expected_registration_warnings(tf_estimate_warps(
x_irreg,
method = "affine",
type = "shift",
template = template,
max_iter = 2L
))
expect_s3_class(warp, "tfd_irreg")
expect_length(warp, length(x_irreg))
expect_identical(tf_domain(warp), tf_domain(x_irreg))
expect_identical(tf_arg(warp), tf_arg(x_irreg))
warp_vals <- tf_evaluations(warp)
for (i in seq_along(warp_vals)) {
expect_true(all(diff(warp_vals[[i]]) > 0))
expect_equal(warp_vals[[i]][1], tf_domain(x_irreg)[1], tolerance = 0.2)
expect_equal(
warp_vals[[i]][length(warp_vals[[i]])],
tf_domain(x_irreg)[2],
tolerance = 0.2
)
}
})
test_that("tf_estimate_warps supports landmark registration for irregular tfd", {
withr::local_seed(321)
peaks <- c(0.3, 0.5, 0.7)
arg_list <- lapply(c(51, 71, 93), \(n) sort(c(0, 1, runif(n - 2))))
x_irreg <- tfd(
lapply(seq_along(peaks), \(i) dnorm(arg_list[[i]], peaks[i], 0.08)),
arg = arg_list
)
landmarks <- matrix(peaks, ncol = 1)
warp <- tf_estimate_warps(
x_irreg,
method = "landmark",
landmarks = landmarks
)
expect_s3_class(warp, "tfd_irreg")
expect_length(warp, length(x_irreg))
expect_identical(tf_arg(warp), tf_arg(x_irreg))
expect_identical(tf_domain(warp), tf_domain(x_irreg))
target_peak <- mean(peaks)
warp_at_target <- as.numeric(tf_evaluate(warp, arg = target_peak))
expect_equal(warp_at_target, peaks, tolerance = 0.02)
})
test_that("tf_estimate_warps accepts CC warp basis controls", {
withr::local_seed(123)
t <- seq(0, 1, length.out = 61)
x <- tfd(
t(sapply(c(-0.07, 0, 0.06), \(s) sin(2 * pi * (t + s)))),
arg = t
)
w_default <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "cc",
max_iter = 1,
iterlim = 1,
nbasis = 6L,
lambda = 0,
crit = 2
)
)
expect_s3_class(w_default, "tfd")
expect_length(w_default, length(x))
expect_identical(tf_domain(w_default), tf_domain(x))
expect_no_error(
w_tuned <- quiet_expected_registration_warnings(tf_estimate_warps(
x,
method = "cc",
max_iter = 1,
iterlim = 1,
crit = 1,
nbasis = 8L,
lambda = 1e-3
))
)
expect_s3_class(w_tuned, "tfd")
expect_length(w_tuned, length(x))
expect_identical(tf_domain(w_tuned), tf_domain(x))
expect_error(
tf_estimate_warps(x, method = "cc", nbasis = 1L),
">= 2"
)
expect_error(
tf_estimate_warps(x, method = "cc", lambda = -1),
">= 0"
)
})
test_that("tf_estimate_warps dispatches correctly for tfb subclasses", {
withr::local_seed(1234)
t <- seq(0, 2 * pi, length.out = 100)
data <- sapply(seq_len(5), \(i) {
sin(t + runif(1, -0.5, 0.5))
})
# Use cheap affine method (not SRVF/CC) to test class dispatch
x_tfb <- suppressMessages(tfb(t(data), k = 20))
warp_tfb <- quiet_expected_registration_warnings(
tf_estimate_warps(x_tfb, method = "affine", type = "shift")
)
expect_s3_class(warp_tfb, "tfd")
expect_length(warp_tfb, length(x_tfb))
expect_identical(tf_domain(warp_tfb), tf_domain(x_tfb))
x_fpc <- tfb_fpc(t(data), pve = .95)
warp_fpc <- quiet_expected_registration_warnings(
tf_estimate_warps(x_fpc, method = "affine", type = "shift")
)
expect_s3_class(warp_fpc, "tfd")
expect_length(warp_fpc, length(x_fpc))
expect_identical(tf_domain(warp_fpc), tf_domain(x_fpc))
})
test_that("tf_estimate_warps SRVF/CC works for tfb subclasses", {
skip_if_not_installed("fdasrvf")
withr::local_seed(1234)
t <- seq(0, 2 * pi, length.out = 41)
data <- sapply(seq_len(3), \(i) sin(t + runif(1, -0.4, 0.4)))
x_tfb <- suppressMessages(tfb(t(data), k = 12))
x_fpc <- tfb_fpc(t(data), pve = .95)
warp_srvf <- tf_estimate_warps(x_tfb, method = "srvf")
expect_s3_class(warp_srvf, "tfd")
expect_length(warp_srvf, length(x_tfb))
expect_identical(tf_domain(warp_srvf), tf_domain(x_tfb))
warp_cc <- quiet_expected_registration_warnings(
tf_estimate_warps(x_fpc, method = "cc", max_iter = 1, iterlim = 1)
)
expect_s3_class(warp_cc, "tfd")
expect_length(warp_cc, length(x_fpc))
expect_identical(tf_domain(warp_cc), tf_domain(x_fpc))
})
test_that("tf_estimate_warps validates SRVF/CC templates", {
t <- seq(0, 1, length.out = 51)
x <- tfd(t(cbind(sin(2 * pi * t), cos(2 * pi * t))), arg = t)
template_bad_len <- tfd(
t(cbind(sin(2 * pi * t), cos(2 * pi * t), sin(4 * pi * t))),
arg = t
)
expect_error(
tf_estimate_warps(x, method = "srvf", template = template_bad_len),
"length 1 or the same length"
)
expect_error(
tf_estimate_warps(x, method = "cc", template = template_bad_len),
"length 1 or the same length"
)
template_bad_domain <- tfd(
sin(pi * seq(0, 2, length.out = 51)),
arg = seq(0, 2, length.out = 51)
)
expect_error(
tf_estimate_warps(x, method = "srvf", template = template_bad_domain),
"same domain"
)
expect_error(
tf_estimate_warps(x, method = "cc", template = template_bad_domain),
"same domain"
)
t2 <- seq(0, 1, length.out = 61)
template_bad_grid <- tfd(sin(2 * pi * t2), arg = t2)
expect_error(
tf_estimate_warps(x, method = "srvf", template = template_bad_grid),
"same grid"
)
expect_error(
tf_estimate_warps(x, method = "cc", template = template_bad_grid),
"same grid"
)
})
test_that("tf_estimate_warps SRVF/CC reject unknown method-specific arguments", {
skip_if_not_installed("fdasrvf")
t <- seq(0, 1, length.out = 51)
x <- tfd(t(cbind(sin(2 * pi * t), sin(2 * pi * (t + 0.05)))), arg = t)
expect_error(
tf_estimate_warps(x, method = "srvf", unknown_arg = 1),
"unused argument|unused arguments|formal argument"
)
expect_error(
tf_estimate_warps(x, method = "cc", unknown_arg = 1),
"unused argument|unused arguments|formal argument"
)
})
test_that("tf_estimate_warps CC explicit template path is exercised", {
withr::local_seed(4321)
t <- seq(0, 1, length.out = 61)
x <- tfd(
t(sapply(c(-0.08, 0, 0.08), \(s) sin(2 * pi * (t + s)))),
arg = t
)
template <- mean(x)
w_default <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "cc", max_iter = 1, iterlim = 1)
)
w_template <- quiet_expected_registration_warnings(tf_estimate_warps(
x,
method = "cc",
template = template,
max_iter = 1,
iterlim = 1
))
expect_equal(
as.matrix(w_default),
as.matrix(w_template),
tolerance = 0.15
)
})
# --- Landmark detection -------------------------------------------------------
test_that("tf_landmarks_extrema finds correct extrema and zero crossings", {
# Single-period sin: quantitative accuracy with smooth = TRUE (default path)
t1 <- seq(0, 2 * pi, length.out = 101)
phase_shifts <- c(0, 0.5, -0.3)
x1 <- tfd(t(cbind(sin(t1), sin(t1 + 0.5), sin(t1 - 0.3))), arg = t1)
expected_max <- pi / 2 - phase_shifts
maxima <- tf_landmarks_extrema(x1, "max")
expect_true(is.matrix(maxima))
expect_equal(dim(maxima), c(3, 1))
expect_equal(as.numeric(maxima), expected_max, tolerance = 0.1)
expected_min <- 3 * pi / 2 - phase_shifts
minima <- tf_landmarks_extrema(x1, "min")
expect_equal(dim(minima), c(3, 1))
expect_equal(as.numeric(minima), expected_min, tolerance = 0.1)
both <- tf_landmarks_extrema(x1, "both")
expect_equal(dim(both), c(3, 2))
expect_equal(both[, 1], maxima[, 1])
expect_equal(both[, 2], minima[, 1])
expect_equal(attr(both, "feature_types"), c("max", "min"))
# 2-period sin: structural checks + all types
t2 <- seq(0, 2, length.out = 201)
template <- sin(2 * pi * t2)
x2 <- tfd(
t(sapply(c(0, 0.03, -0.03), \(s) {
approx(t2, template, xout = t2 + s, rule = 2)$y
})),
arg = t2
)
lm_all <- tf_landmarks_extrema(x2, "all")
types <- attr(lm_all, "feature_types")
expect_true(ncol(lm_all) >= 5)
expect_true("max" %in% types)
expect_true("min" %in% types)
expect_true("zero" %in% types)
# Each row sorted
for (i in seq_len(nrow(lm_all))) {
non_na <- lm_all[i, !is.na(lm_all[i, ])]
expect_true(all(diff(non_na) > 0))
}
})
test_that("tf_landmarks_extrema drops boundary features", {
t <- seq(0, 1, length.out = 101)
vals <- pnorm(t, 0.2, 0.03) + 0.3 * dnorm(t, 0.7, 0.08) - 0.5
x <- tfd(
t(sapply(c(0, 0.02, -0.02), \(s) {
approx(t, vals, xout = t + s, rule = 2)$y
})),
arg = t
)
lm <- tf_landmarks_extrema(x, "both")
expect_equal(ncol(lm), 1)
expect_equal(attr(lm, "feature_types"), "max")
expect_true(all(abs(lm[, 1] - 0.7) < 0.05, na.rm = TRUE))
})
test_that("tf_landmarks_extrema detects landmarks on irregular tfd", {
set.seed(123)
peak_locs <- c(0.4, 0.5, 0.6, 0.45)
arg_list <- lapply(1:4, \(i) sort(c(0, 1, runif(80))))
vals_list <- lapply(seq_along(peak_locs), \(i) {
dnorm(arg_list[[i]], mean = peak_locs[i], sd = 0.1)
})
x_irr <- tfd(vals_list, arg = arg_list)
expect_true(inherits(x_irr, "tfd_irreg"))
lm <- tf_landmarks_extrema(x_irr, "max")
expect_true(is.matrix(lm))
expect_equal(nrow(lm), 4)
expect_equal(ncol(lm), 1)
expect_equal(as.numeric(lm), peak_locs, tolerance = 0.05)
lm_both <- tf_landmarks_extrema(x_irr, "both")
expect_equal(ncol(lm_both), 1)
expect_equal(attr(lm_both, "feature_types"), "max")
# Full chain: detection -> registration -> align
x_reg <- tfd(x_irr, arg = seq(0, 1, length.out = 101))
warp <- tf_estimate_warps(x_reg, method = "landmark", landmarks = lm)
aligned <- tf_align(x_reg, warp)
aligned_peaks <- tf_where(aligned, value == max(value), "first")
expect_equal(aligned_peaks, rep(mean(peak_locs), 4), tolerance = 0.03)
})
test_that("tf_landmarks_extrema threshold parameter filters rare features", {
set.seed(101)
t <- seq(0, 1, length.out = 201)
make_curve <- function(main_peak, add_secondary = FALSE) {
y <- dnorm(t, main_peak, 0.06)
if (add_secondary) {
y <- y + 0.8 * dnorm(t, 0.8, 0.04)
}
y
}
vals <- rbind(
make_curve(0.38, add_secondary = TRUE),
make_curve(0.40),
make_curve(0.42),
make_curve(0.39),
make_curve(0.41)
)
x <- tfd(vals, arg = t)
# threshold = 0.5 -> only main peak
lm_strict <- tf_landmarks_extrema(x, "max", threshold = 0.5)
expect_true(is.matrix(lm_strict))
expect_equal(ncol(lm_strict), 1)
expect_true(all(abs(lm_strict[, 1] - 0.4) < 0.1, na.rm = TRUE))
# threshold = 0.2 -> both peaks, warns about missing landmarks
expect_warning(
lm_loose <- tf_landmarks_extrema(x, "max", threshold = 0.2),
"missing landmark"
)
expect_true(is.matrix(lm_loose))
expect_equal(ncol(lm_loose), 2)
expect_true(all(abs(lm_loose[, 1] - 0.4) < 0.1, na.rm = TRUE))
expect_true(any(abs(lm_loose[, 2] - 0.8) < 0.1, na.rm = TRUE))
})
test_that("tf_landmarks_extrema warns when no stable landmarks found", {
set.seed(202)
t <- seq(0, 1, length.out = 101)
x_const <- tfd(rbind(rep(1, 101), rep(2, 101), rep(3, 101)), arg = t)
expect_warning(
lm <- tf_landmarks_extrema(x_const, "max"),
"No stable landmarks"
)
expect_true(is.matrix(lm))
expect_equal(ncol(lm), 0)
expect_equal(nrow(lm), 3)
expect_equal(attr(lm, "feature_types"), character(0))
expect_warning(
lm_both <- tf_landmarks_extrema(x_const, "both"),
"No stable landmarks"
)
expect_equal(ncol(lm_both), 0)
expect_equal(nrow(lm_both), 3)
})
# --- Landmark registration ----------------------------------------------------
test_that("landmark registration aligns to default and custom template", {
t <- seq(0, 1, length.out = 101)
peak_locs <- c(0.3, 0.5, 0.7)
x <- tfd(
t(sapply(peak_locs, \(s) dnorm(t, mean = s, sd = 0.1))),
arg = t
)
landmarks <- matrix(peak_locs, ncol = 1)
# Default template (mean of landmarks = 0.5)
warp <- tf_estimate_warps(x, method = "landmark", landmarks = landmarks)
expect_s3_class(warp, "tfd")
expect_length(warp, length(x))
expect_identical(tf_domain(warp), tf_domain(x))
template_landmark <- mean(peak_locs)
warp_at_template <- as.numeric(warp[, template_landmark])
expect_equal(warp_at_template, peak_locs, tolerance = 1e-10)
# Custom template
custom_template <- 0.4
warp2 <- tf_estimate_warps(
x,
method = "landmark",
landmarks = landmarks,
template_landmarks = custom_template
)
warp_at_custom <- as.numeric(warp2[, custom_template])
expect_equal(warp_at_custom, peak_locs, tolerance = 1e-10)
# After aligning with custom template, peaks aligned near template
x_aligned <- tf_align(x, warp2)
aligned_peaks <- tf_where(x_aligned, value == max(value), "first")
expect_equal(aligned_peaks, rep(custom_template, 3), tolerance = 0.02)
})
test_that("register_landmark handles NA landmarks correctly", {
set.seed(42)
t <- seq(0, 1, length.out = 101)
peak_locs <- c(0.4, 0.45, 0.5, 0.55, 0.6)
x <- tfd(t(sapply(peak_locs, \(p) dnorm(t, p, 0.08))), arg = t)
lm_mat <- matrix(NA_real_, nrow = 5, ncol = 2)
lm_mat[, 1] <- peak_locs
lm_mat[1:3, 2] <- c(0.75, 0.78, 0.80)
warp <- tf_estimate_warps(x, method = "landmark", landmarks = lm_mat)
expect_s3_class(warp, "tfd")
expect_length(warp, 5)
# All warps strictly increasing with correct endpoints
warp_evals <- tf_evaluations(warp)
domain <- tf_domain(x)
for (i in seq_len(5)) {
expect_true(
all(diff(warp_evals[[i]]) > 0),
info = paste("Warp", i, "is not strictly increasing")
)
vals <- warp_evals[[i]]
expect_equal(vals[1], domain[1])
expect_equal(vals[length(vals)], domain[2])
}
})
test_that("tf_estimate_warps landmark method validates input", {
t <- seq(0, 1, length.out = 51)
x <- tfd(t(cbind(sin(t * pi), sin(t * pi + 0.1))), arg = t)
# Wrong number of rows
expect_error(
tf_estimate_warps(
x,
method = "landmark",
landmarks = matrix(c(0.3, 0.5, 0.7), ncol = 1)
),
"rows"
)
# Non-increasing landmarks
expect_error(
tf_estimate_warps(
x,
method = "landmark",
landmarks = matrix(c(0.5, 0.3, 0.4, 0.2), ncol = 2, byrow = TRUE)
),
"strictly increasing"
)
# Landmarks outside domain
expect_error(
tf_estimate_warps(
x,
method = "landmark",
landmarks = matrix(c(-0.1, 1.1), ncol = 1)
),
"strictly inside the domain"
)
# Template landmarks wrong length
expect_error(
tf_estimate_warps(
x,
method = "landmark",
landmarks = matrix(c(0.3, 0.5), ncol = 1),
template_landmarks = c(0.4, 0.5)
),
"len"
)
})
# --- Affine registration ------------------------------------------------------
test_that("affine registration produces linear warps for all types", {
# Shift data on [0, 2*pi]
t1 <- seq(0, 2 * pi, length.out = 101)
template1 <- tfd(sin(t1), arg = t1)
x_shift <- tfd(
t(sapply(c(-0.3, -0.1, 0, 0.1, 0.3), \(s) sin(t1 + s))),
arg = t1
)
# Scale data on [0, 1]
t2 <- seq(0, 1, length.out = 101)
x_scale <- tfd(
t(cbind(sin(t2 * pi), sin(t2 * pi * 0.9), sin(t2 * pi * 1.1))),
arg = t2
)
# Shift_scale data on [0, 2*pi]
x_ss <- tfd(
t(cbind(sin(t1), sin(t1 * 0.95 + 0.2), sin(t1 * 1.05 - 0.1))),
arg = t1
)
check_linear <- function(warp, type) {
warp_evals <- tf_evaluations(warp)
for (i in seq_along(warp_evals)) {
expect_true(
all(diff(warp_evals[[i]]) > 0),
label = paste(type, "function", i, "monotonicity")
)
second_diff <- diff(diff(warp_evals[[i]]))
expect_lt(
max(abs(second_diff)),
1e-10,
label = paste(type, "function", i, "linearity")
)
}
}
check_linear(
quiet_expected_registration_warnings(
tf_estimate_warps(
x_shift,
method = "affine",
template = template1,
type = "shift"
)
),
"shift"
)
check_linear(
quiet_expected_registration_warnings(
tf_estimate_warps(x_scale, method = "affine", type = "scale")
),
"scale"
)
check_linear(
quiet_expected_registration_warnings(
tf_estimate_warps(x_ss, method = "affine", type = "shift_scale")
),
"shift_scale"
)
# Scale centering: warp(midpoint) ~ midpoint
warp_scale <- quiet_expected_registration_warnings(
tf_estimate_warps(x_scale, method = "affine", type = "scale")
)
center <- 0.5
warp_at_center <- as.numeric(warp_scale[, center])
expect_equal(warp_at_center, rep(center, 3), tolerance = 0.1)
})
test_that("affine shift produces warps that recover shift direction", {
t <- seq(0, 2 * pi, length.out = 101)
template <- tfd(sin(t), arg = t)
x_shifted_left <- tfd(sin(t - 0.3), arg = t)
x_shifted_right <- tfd(sin(t + 0.3), arg = t)
warp_left <- quiet_expected_registration_warnings(
tf_estimate_warps(
x_shifted_left,
method = "affine",
template = template,
type = "shift"
)
)
mean_shift_left <- mean(tf_evaluations(warp_left)[[1]] - t)
warp_right <- quiet_expected_registration_warnings(
tf_estimate_warps(
x_shifted_right,
method = "affine",
template = template,
type = "shift"
)
)
mean_shift_right <- mean(tf_evaluations(warp_right)[[1]] - t)
expect_true(mean_shift_left > mean_shift_right)
})
test_that("affine registration aligns functions to template", {
set.seed(4321)
arg <- seq(0, 1, length.out = 101)
template <- tfd(sin(2 * pi * arg), arg = arg)
template_vec <- tf_evaluations(template)[[1]]
# Shift: shifted functions
true_shifts <- c(-0.2, -0.1, 0.1, 0.15)
x_shift <- do.call(
c,
lapply(true_shifts, \(s) {
tfd(sin(2 * pi * (arg - s)), arg = arg)
})
)
# Scale: scaled functions
scale_factors <- c(0.7, 0.85, 1.15, 1.3)
x_scale <- do.call(
c,
lapply(scale_factors, \(a) {
inv_arg <- arg / a + 0.5 * (1 - 1 / a)
tfd(sin(2 * pi * inv_arg), arg = arg)
})
)
# Shift_scale: combined-transform functions
params <- list(
c(a = 0.9, b = 0.05),
c(a = 1.1, b = -0.05),
c(a = 0.95, b = 0.02)
)
x_ss <- do.call(
c,
lapply(params, \(p) {
tfd(sin(2 * pi * ((arg - p["b"]) / p["a"])), arg = arg)
})
)
check_alignment <- function(x, type) {
warps <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", template = template, type = type)
)
x_reg <- quiet_expected_registration_warnings(tf_align(x, warps))
# Domain preserved (replaces former standalone smoke test)
expect_identical(
tf_domain(x_reg),
tf_domain(x),
label = paste(type, "domain preservation")
)
for (i in seq_along(x_reg)) {
reg_on_grid <- tf_evaluate(x_reg[i], arg = arg)[[1]]
valid <- !is.na(reg_on_grid)
expect_gt(
sum(valid),
20,
label = paste(type, "function", i, "valid points")
)
max_diff <- max(abs(reg_on_grid[valid] - template_vec[valid]))
expect_lt(
max_diff,
0.01,
label = paste(type, "function", i, "alignment")
)
}
}
check_alignment(x_shift, "shift")
check_alignment(x_scale, "scale")
check_alignment(x_ss, "shift_scale")
})
test_that("affine registered functions may have NA at boundaries", {
t <- seq(0, 2 * pi, length.out = 101)
true_shifts <- c(-0.8, 0.8)
x <- tfd(t(sapply(true_shifts, \(s) sin(t + s))), arg = t)
template <- tfd(sin(t), arg = t)
warp <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", template = template, type = "shift")
)
x_aligned <- quiet_expected_registration_warnings(tf_align(x, warp))
evals <- tf_evaluations(x_aligned)
expect_length(evals, length(true_shifts))
# Warp values should extend outside domain for large shifts
warp_evals <- tf_evaluations(warp)
warp_ranges <- vapply(warp_evals, range, numeric(2))
expect_true(
any(warp_ranges[1, ] < tf_domain(x)[1]) ||
any(warp_ranges[2, ] > tf_domain(x)[2])
)
})
test_that("tf_estimate_warps affine method validates input", {
t <- seq(0, 1, length.out = 51)
x <- tfd(t(cbind(sin(t * pi), sin(t * pi + 0.1))), arg = t)
expect_error(
tf_estimate_warps(x, method = "affine", template = x),
"length 1"
)
x2 <- tfd(
sin(seq(0, 2, length.out = 51) * pi),
arg = seq(0, 2, length.out = 51)
)
expect_error(
tf_estimate_warps(x, method = "affine", template = x2),
"same domain"
)
expect_error(
tf_estimate_warps(x, method = "affine", shift_range = c(0.5))
)
expect_error(
tf_estimate_warps(x, method = "affine", shift_range = c(0.5, 0.3))
)
expect_error(
tf_estimate_warps(x, method = "affine", scale_range = c(-1, 2))
)
expect_error(
tf_estimate_warps(x, method = "affine", scale_range = c(0.5, NA))
)
})
test_that("tf_estimate_warps affine method respects custom bounds", {
arg <- seq(0, 1, length.out = 101)
template <- tfd(sin(2 * pi * arg), arg = arg)
# Function with shift of 0.4
x <- tfd(sin(2 * pi * (arg - 0.4)), arg = arg)
# Default bounds find the shift
warp_default <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "affine",
template = template,
type = "shift"
)
)
b_default <- tf_evaluations(warp_default)[[1]][51] - 0.5
expect_equal(b_default, 0.4, tolerance = 0.01)
# Restrictive bounds hit boundary
warp_restricted <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "affine",
template = template,
type = "shift",
shift_range = c(-0.1, 0.1)
)
)
b_restricted <- tf_evaluations(warp_restricted)[[1]][51] - 0.5
expect_equal(b_restricted, 0.1, tolerance = 0.01)
# Custom scale bounds
x_scaled <- tfd(sin(2 * pi * (arg / 0.6 + 0.5 * (1 - 1 / 0.6))), arg = arg)
warp_scale <- quiet_expected_registration_warnings(
tf_estimate_warps(
x_scaled,
method = "affine",
template = template,
type = "scale",
scale_range = c(0.4, 0.8)
)
)
a_scale <- tf_evaluations(warp_scale)[[1]][101] -
tf_evaluations(warp_scale)[[1]][1]
expect_equal(a_scale, 0.6, tolerance = 0.01)
})
# --- Procrustes Iteration (max_iter / tol) ------------------------------------
test_that("tf_estimate_warps rejects invalid max_iter and tol", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
expect_error(
tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 0L)
)
expect_error(tf_estimate_warps(
x,
method = "affine",
type = "shift",
max_iter = -1L
))
expect_error(
tf_estimate_warps(x, method = "affine", type = "shift", tol = -1)
)
})
test_that("template forces single-pass regardless of max_iter", {
withr::local_seed(123)
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
template <- tfd(matrix(sin(t), nrow = 1), arg = t)
# Without template: explicit max_iter=3 matches default
w_default <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", type = "shift")
)
w_explicit <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 3L)
)
expect_equal(as.matrix(w_default), as.matrix(w_explicit))
# With template: max_iter=1 and max_iter=5 are identical
w1 <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "affine",
type = "shift",
template = template,
max_iter = 1L
)
)
w5 <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "affine",
type = "shift",
template = template,
max_iter = 5L
)
)
expect_equal(as.matrix(w1), as.matrix(w5))
})
test_that("tf_estimate_warps Procrustes iteration runs and improves for affine", {
withr::local_seed(42)
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
w1 <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 1L)
)
w3 <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 3L)
)
expect_s3_class(w3, "tfd_reg")
expect_length(w3, 3)
# Procrustes iteration should not increase residual variance.
# Interpolate to a common regular grid to avoid irregular-grid arithmetic warnings.
aligned_1 <- quiet_expected_registration_warnings(tf_align(x, w1))
aligned_3 <- quiet_expected_registration_warnings(tf_align(x, w3))
arg <- tf_arg(x)
aligned_1_mat <- quiet_expected_registration_warnings(
as.matrix(tf_interpolate(aligned_1, arg = arg))
)
aligned_3_mat <- quiet_expected_registration_warnings(
as.matrix(tf_interpolate(aligned_3, arg = arg))
)
center_1 <- matrix(
colMeans(aligned_1_mat, na.rm = TRUE),
nrow = nrow(aligned_1_mat),
ncol = ncol(aligned_1_mat),
byrow = TRUE
)
center_3 <- matrix(
colMeans(aligned_3_mat, na.rm = TRUE),
nrow = nrow(aligned_3_mat),
ncol = ncol(aligned_3_mat),
byrow = TRUE
)
var_1 <- mean((aligned_1_mat - center_1)^2, na.rm = TRUE)
var_3 <- mean((aligned_3_mat - center_3)^2, na.rm = TRUE)
expect_lte(var_3, var_1 + 1e-6)
})
test_that("tf_estimate_warps Procrustes iteration handles irregular affine updates", {
t <- seq(0, 2 * pi, length.out = 101)
x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t)
expect_no_error(
w <- quiet_expected_registration_warnings(
tf_estimate_warps(
x,
method = "affine",
type = "shift",
max_iter = 2L,
shift_range = c(0.05, 0.2)
)
)
)
expect_s3_class(w, "tfd_reg")
expect_length(w, length(x))
})
test_that("tf_estimate_warps Procrustes iteration runs for CC", {
skip_on_cran()
withr::local_seed(42)
t <- seq(0, 1, length.out = 61)
x <- tfd(
t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))),
arg = t
)
w <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "cc", max_iter = 2L, iterlim = 5)
)
expect_s3_class(w, "tfd_reg")
expect_length(w, 3)
})
test_that("tf_estimate_warps CC outer loop improves the CC criterion", {
skip_on_cran()
withr::local_seed(42)
t <- seq(0, 1, length.out = 61)
x <- tfd(
t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))),
arg = t
)
w_2 <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "cc", max_iter = 2L, iterlim = 5)
)
w_5 <- quiet_expected_registration_warnings(
tf_estimate_warps(x, method = "cc", max_iter = 5L, iterlim = 5)
)
aligned_2 <- suppressWarnings(tf_align(x, w_2))
aligned_5 <- suppressWarnings(tf_align(x, w_5))
obj_2 <- tf:::registration_objective_cc(
aligned = aligned_2,
template = attr(w_2, "template")
)
obj_5 <- tf:::registration_objective_cc(
aligned = aligned_5,
template = attr(w_5, "template")
)
expect_lte(obj_5, obj_2 + 1e-8)
})
test_that("CC registration handles pinch warps with boundary ties", {
skip_on_cran()
pinch_small <- pinch[1:10]
reg <- quiet_expected_registration_warnings(
tf_register(
pinch_small,
method = "cc",
max_iter = 10L,
nbasis = 10L
)
)
expect_s3_class(reg, "tf_registration")
expect_no_error(tf_invert(tf_inv_warps(reg)))
expect_true(all(vapply(
tf_evaluations(tf_inv_warps(reg)),
\(x) all(diff(x) > 0),
logical(1)
)))
})
test_that("SRVF with template=NULL gives same result regardless of max_iter", {
skip_if_not_installed("fdasrvf")
withr::local_seed(42)
t <- seq(0, 1, length.out = 101)
x <- tfd(
t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))),
arg = t
)
w1 <- tf_estimate_warps(x, method = "srvf", max_iter = 1L)
w5 <- tf_estimate_warps(x, method = "srvf", max_iter = 5L)
expect_equal(as.matrix(w1), as.matrix(w5))
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.