tests/testthat/test-write-parquet-statistics.R

test_that("null_count is written", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- test_df(missing = TRUE)
  write_parquet(
    df, tmp,
    options = parquet_options(num_rows_per_row_group = 10)
  )
  expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
  expect_snapshot(
    as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]][
      , c("row_group", "column", "null_count")
    ])
  )
})

test_that("min/max for integers", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = c(
    sample(1:5),
    sample(c(1:3, -100L, 100L)),
    sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  ))

  as_int <- function(x) {
    sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int(mtd[["min_value"]]),
      as_int(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for DATEs", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)

  do <- function(...) {
    df <- data.frame(
      day = rep(as.Date("2024-09-16") - 10:1, each = 10),
      count = 1:100
    )
    df$day[c(1, 20, 25, 40)] <- as.Date(NA_character_)
    write_parquet(
      df, tmp,
      options = parquet_options(num_rows_per_row_group = 20),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    minv <- mtd[mtd$column == 0, "min_value"]
    maxv <- mtd[mtd$column == 0, "max_value"]
    list(
      as.data.frame(read_parquet_schema(tmp)[, -1]),
      as.Date(map_int(minv, readBin, what = "integer", n = 1), origin = "1970-01-01"),
      as.Date(map_int(maxv, readBin, what = "integer", n = 1), origin = "1970-01-01")
    )
  }

  expect_snapshot(do())
})

test_that("min/max for double -> signed integers", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = as.double(c(
    sample(1:5),
    sample(c(1:3, -100L, 100L)),
    sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  )))

  as_int <- function(x) {
    sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "INT32"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int(mtd[["min_value"]]),
      as_int(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for double -> unsigned integers", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = as.double(c(
    sample(1:5),
    sample(c(1:3, 1L, 100L)),
    sample(c(0L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  )))

  as_int <- function(x) {
    sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "UINT_32"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int(mtd[["min_value"]]),
      as_int(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("minmax for double -> INT32 TIME(MULLIS)", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  # IDK what's the point of signed TIME, but it seems to be allowed, the
  # sort order is signed
  df <- data.frame(x = hms::as_hms(c(
    sample(1:5),
    sample(c(1:3, -100L, 100L)),
    sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  )))

  as_int <- function(x) {
    sapply(x, function(xx) xx %&&% readBin(xx, what = "integer") %||% NA_integer_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "TIME_MILLIS"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int(mtd[["min_value"]]),
      as_int(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for DOUBLE", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = as.double(c(
    sample(1:5),
    sample(c(1:3, -100, 100)),
    sample(c(-1000, NA_real_, 1000, NA_real_, NA_real_)),
    rep(NA_real_, 3)
  )))

  as_dbl <- function(x) {
    sapply(x, function(xx) xx %&&% readBin(xx, what = "double") %||% NA_real_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_dbl(mtd[["min_value"]]),
      as_dbl(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for FLOAT", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = as.double(c(
    sample(1:5),
    sample(c(1:3, -100, 100)),
    sample(c(-1000, NA_real_, 1000, NA_real_, NA_real_)),
    rep(NA_real_, 3)
  )))

  as_flt <- function(x) {
    sapply(x, function(xx) xx %&&% .Call(read_float, xx) %||% NA_real_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "FLOAT"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(read_parquet(tmp)), as.data.frame(df))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_flt(mtd[["min_value"]]),
      as_flt(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for integer -> INT64", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = c(
    sample(1:5),
    sample(c(1:3, -100L, 100L)),
    sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  ))

  as_int64 <- function(x) {
    sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "INT64"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int64(mtd[["min_value"]]),
      as_int64(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for REALSXP -> INT64", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = as.double(c(
    sample(1:5),
    sample(c(1:3, -100L, 100L)),
    sample(c(-1000L, NA_integer_, 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  )))

  as_int64 <- function(x) {
    sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = "INT64"),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int64(mtd[["min_value"]]),
      as_int64(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

test_that("min/max for STRING", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  df <- data.frame(x = c(
    sample(letters[1:5]),
    sample(c(letters[1:3], "!!!", "~~~")),
    sample(c("!", NA_character_, "~", NA_character_, NA_character_)),
    rep(NA_character_, 3)
  ))

  as_str <- function(x) {
    sapply(x, function(xx) xx %&&% rawToChar(xx) %||% NA_character_)
  }

  do <- function(encoding = "PLAIN", type = "STRING", ...) {
    write_parquet(
      df, tmp,
      schema = parquet_schema(x = type),
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    expect_equal(read_parquet_schema(tmp)$logical_type[[2]]$type, type)
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_str(mtd[["min_value"]]),
      as_str(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))

  expect_snapshot(do(compression = "snappy", type = "JSON"))
  expect_snapshot(do(compression = "uncompressed", type = "JSON"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "JSON"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "JSON"))

  expect_snapshot(do(compression = "snappy", type = "BSON"))
  expect_snapshot(do(compression = "uncompressed", type = "BSON"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "BSON"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "BSON"))

  expect_snapshot(do(compression = "snappy", type = "ENUM"))
  expect_snapshot(do(compression = "uncompressed", type = "ENUM"))

  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy", type = "ENUM"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed", type = "ENUM"))
})

test_that("min/max for REALSXP -> TIMESTAMP (INT64)", {
  tmp <- tempfile(fileext = ".parquet")
  on.exit(unlink(tmp), add = TRUE)
  now <- 0L
  df <- data.frame(x = .POSIXct(as.double(c(
    sample(now + 1:5),
    sample(c(now + c(1:3, -100L, 100L))),
    sample(c(now - 1000L, NA_integer_, now + 1000L, NA_integer_, NA_integer_)),
    rep(NA_integer_, 3)
  )), tz = "UTC"))

  as_int64 <- function(x) {
    sapply(x, function(xx) xx %&&% .Call(read_int64, xx) %||% NA_real_)
  }

  do <- function(encoding = "PLAIN",...) {
    write_parquet(
      df, tmp,
      encoding = encoding,
      options = parquet_options(num_rows_per_row_group = 5),
      ...
    )
    expect_equal(as.data.frame(df), as.data.frame(read_parquet(tmp)))
    mtd <- as.data.frame(read_parquet_metadata(tmp)[["column_chunks"]])
    list(
      as_int64(mtd[["min_value"]]),
      as_int64(mtd[["max_value"]]),
      mtd[["is_min_value_exact"]],
      mtd[["is_max_value_exact"]]
    )
  }
  expect_snapshot(do(compression = "snappy"))
  expect_snapshot(do(compression = "uncompressed"))
return()
  # dictionary
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "snappy"))
  expect_snapshot(do(encoding = "RLE_DICTIONARY", compression = "uncompressed"))
})

Try the nanoparquet package in your browser

Any scripts or data that you put into this service are public.

nanoparquet documentation built on April 3, 2025, 11:26 p.m.