This package can be installed from the main git repository
remotes::install_gitlab("zero323/dlt")
or its GitHub mirror
remotes::install_github("zero323/dlt")
and requires following R packages:
SparkR (>= 3.3.0)
magrittr
Additionally, you'll have to ensure that a compatible Delta Lake jar is available,
for example by adding delta-core
to spark.jars.packages
:
spark.jars.packages io.delta:delta-core_2.12:2.1.0
This package provides:
dlt_read
and dlt_write
can be used to read and write data in Delta format.
library(dlt)
target %>%
printSchema()
# root
# |-- id: integer (nullable = true)
# |-- key: string (nullable = true)
# |-- val: integer (nullable = true)
# |-- ind: integer (nullable = true)
# |-- category: string (nullable = true)
# |-- lat: double (nullable = true)
# |-- long: double (nullable = true)
target %>%
dlt_write("/tmp/target")
dlt_read("/tmp/target") %>%
showDF(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| -1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| 1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| -1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| 1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| -1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
These also come with aliases following SparkR conventions - read.delta
and write.delta
.
source %>%
printSchema()
# root
# |-- id: integer (nullable = true)
# |-- key: string (nullable = true)
# |-- val: integer (nullable = true)
# |-- ind: integer (nullable = true)
# |-- category: string (nullable = true)
# |-- lat: double (nullable = true)
# |-- long: double (nullable = true)
source %>%
write.delta("/tmp/source")
read.delta("/tmp/source") %>%
showDF(5)
# +---+---+---+---+--------+------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+------------------+-------------------+
# | 1| a| 1| 1| NTD| 72.72564971353859| 5.116242365911603|
# | 3| b| 5| 1| RSL|-65.03216980956495| -39.52675184234977|
# | 5| b| 1| -1| SYG| 88.00051120575517| 146.06572712771595|
# | 14| c| 9| -1| MYZ| 80.40028186049312|-19.090933883562684|
# | 16| d| 10| -1| DMO|-75.16123954206705| 120.96153359860182|
# +---+---+---+---+--------+------------------+-------------------+
DataTable
objects can be created for file system path:
dlt_for_path("/tmp/target/") %>%
dlt_to_df()
# SparkDataFrame[id:int, key:string, val:int, ind:int, category:string, lat:double, long:double]
or for the table name:
source %>%
saveAsTable("source", source="delta")
dlt_for_name("source") %>%
dlt_to_df()
# SparkDataFrame[id:int, key:string, val:int, ind:int, category:string, lat:double, long:double]
dlt_read_stream
and dlt_read_stream
can be used for streaming reads and writes respectively.
query <- dlt_read_stream("/tmp/target") %>%
dlt_write_stream(
path = "/tmp/target-stream", queryName = "test", trigger.once = TRUE,
checkpointLocation = "/tmp/target-stream/_checkpoints/test"
)
awaitTermination(query, 10000)
# [1] TRUE
dlt_detail
can be used to retrieve detailed information about the format, location and
other important properties of the table.
dlt_for_path("/tmp/target/") %>%
dlt_detail() %>%
drop(c("id", "location", "createdAt", "lastModified")) %>%
showDF()
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
# |format|name|description|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
# | delta|null| null| []| 1| 2290| {}| 1| 2|
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
Updates without
dlt_for_path("/tmp/target") %>%
dlt_update(list(ind = "-ind")) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| -1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| -1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
and with
dlt_for_path("/tmp/target") %>%
dlt_update(list(
lat = lit(39.08067389467202),
long = lit(-89.73335678516888)
), column("id") %in% c(2, 4)) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| -1| ROB| 39.08067389467202| -89.73335678516888|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| -1| ACP| 39.08067389467202| -89.73335678516888|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
condition are supported.
Deletes with
dlt_for_path("/tmp/target") %>%
dlt_delete(column("category") %in% c("ROB", "ACP")) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# | 6| b| 2| -1| SMT| 80.79935231711715| -46.80488987825811|
# | 7| b| 9| 1| LBC| 51.65884342510253| 97.16074059717357|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
and without
dlt_for_path("/tmp/target") %>%
dlt_delete() %>%
dlt_to_df() %>%
count()
# [1] 0
condition are supported.
dlt
supports a complete set of DeltaMergeBuilder
methods (dlt_when_matched_delete
, dlt_when_matched_update
, dlt_when_matched_update_all
, dlt_when_not_matched_insert
, dlt_when_not_matched_insert_all
).
target %>%
dlt_write("/tmp/target", mode="overwrite")
dlt_for_path("/tmp/target") %>%
dlt_alias("target") %>%
dlt_merge(alias(source, "source"), expr("source.id = target.id")) %>%
dlt_when_matched_update_all() %>%
dlt_when_not_matched_insert_all() %>%
dlt_execute() %>%
dlt_to_df() %>%
arrange(desc(column("id"))) %>%
showDF(10)
# +---+---+---+---+--------+------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+------------------+-------------------+
# | 16| d| 10| -1| DMO|-75.16123954206705| 120.96153359860182|
# | 14| c| 9| -1| MYZ| 80.40028186049312|-19.090933883562684|
# | 12| c| 10| 1| TBL| 6.229456798173487| 55.28501939959824|
# | 11| c| 5| -1| ZSH| 89.73377700895071| 61.67111137881875|
# | 10| c| 1| 1| GKP| 58.43853528611362| 100.64806896261871|
# | 9| c| 10| -1| LCN| 76.90145746339113| -138.4841349441558|
# | 8| b| 8| 1| BOB| 47.12074530310929| -91.43876885063946|
# | 7| b| 9| -1| LBC| 51.65884342510253| 97.16074059717357|
# | 6| b| 2| 1| SMT| 80.79935231711715| -46.80488987825811|
# | 5| b| 1| -1| SYG| 88.00051120575517| 146.06572712771595|
#+---+---+---+---+--------+------------------+-------------------+
# only showing top 10 rows
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
printSchema()
# root
# |-- version: long (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- userId: string (nullable = true)
# |-- userName: string (nullable = true)
# |-- operation: string (nullable = true)
# |-- operationParameters: map (nullable = true)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
# |-- job: struct (nullable = true)
# | |-- jobId: string (nullable = true)
# | |-- jobName: string (nullable = true)
# | |-- runId: string (nullable = true)
# | |-- jobOwnerId: string (nullable = true)
# | |-- triggerType: string (nullable = true)
# |-- notebook: struct (nullable = true)
# | |-- notebookId: string (nullable = true)
# |-- clusterId: string (nullable = true)
# |-- readVersion: long (nullable = true)
# |-- isolationLevel: string (nullable = true)
# |-- isBlindAppend: boolean (nullable = true)
# |-- operationMetrics: map (nullable = true)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
# |-- userMetadata: string (nullable = true)
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
select("version", "operation", "operationParameters") %>%
showDF(truncate = FALSE)
# +-------+----------------+-------------------------------------------------------------------------------------+
# |version|operation |operationParameters |
# +-------+----------------+-------------------------------------------------------------------------------------+
# |0 |STREAMING UPDATE|{outputMode -> Append, queryId -> bb30dba2-3327-44f1-b6b3-b4bb99fdf57b, epochId -> 0}|
# +-------+----------------+-------------------------------------------------------------------------------------+
dlt_for_path("/tmp/target-stream/") %>%
dlt_delete("id IN (1, 3, 5)")
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
select("version", "operation", "operationParameters") %>%
showDF(truncate = FALSE)
# +-------+----------------+-------------------------------------------------------------------------------------+
# |version|operation |operationParameters |
# +-------+----------------+-------------------------------------------------------------------------------------+
# |1 |DELETE |{predicate -> ["(id IN (1, 3, 5))"]} |
# |0 |STREAMING UPDATE|{outputMode -> Append, queryId -> bb30dba2-3327-44f1-b6b3-b4bb99fdf57b, epochId -> 0}|
# +-------+----------------+-------------------------------------------------------------------------------------+
Time travel is possible by setting versionAsOf
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 9
dlt_read("/tmp/target-stream/", versionAsOf=0) %>%
count()
# [1] 12
or timestampAsOf
options
.
timestamps <- dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
where(column("version") %in% c(0, 1)) %>%
arrange("version") %>%
select(alias(date_format(column("timestamp") + expr("INTERVAL 1 second"), "yyyy-MM-dd HH:mm:ss"), "timestamp")) %>%
collect()
dlt_read("/tmp/target-stream/", timestampAsOf = timestamps$timestamp[1]) %>%
count()
# [1] 12
It is also possible to restore data to a specific version
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_version(0) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2295| 1| 1| 1| 2175| 2295|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 12
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_version(1) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2175| 1| 1| 1| 2295| 2175|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 9
or timestamp
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_timestamp(timestamp = timestamps$timestamp[1]) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2295| 1| 1| 1| 2175| 2295|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 12
New tables can be created (dlt_create
), created if not exists (dlt_create_if_not_exists
), replaced (dlt_replace
) and created or replaced (dlt_create_or_replace
). All DeltaTableBuilder
methods are supported.
dlt_create() %>%
dlt_location("/tmp/key-val") %>%
dlt_add_column("id", "integer", nullable = FALSE) %>%
dlt_add_columns(structType("key string, value double")) %>%
dlt_partitioned_by("key") %>%
dlt_comment("Key-value table") %>%
dlt_property("creation-time", as.character(Sys.time())) %>%
dlt_execute() %>%
dlt_to_df() %>%
printSchema()
# root
# |-- id: integer (nullable = false)
# |-- key: string (nullable = true)
# |-- value: double (nullable = true)
You can use dlt
to convert Parquet directories to DeltaTable
target %>%
write.parquet("/tmp/target-parquet")
dlt_is_delta_table("/tmp/target-parquet/")
# [1] FALSE
tbl <- dlt_convert_to_delta("parquet.`/tmp/target-parquet/`")
dlt_is_delta_table("/tmp/target-parquet/")
# [1] TRUE
tbl %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| -1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| 1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| -1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| 1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| -1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
vacuum DeltaTables
:
dlt_for_path("/tmp/target") %>%
dlt_vacuum()
# Deleted 0 files and directories in a total of 1 directories.
generate manifests:
dlt_for_path("/tmp/target") %>%
dlt_generate_manifest("symlink_format_manifest")
and upgrade Delta protocols
key_val_log_path <- "/tmp/key-val/_delta_log/*json"
read.json(key_val_log_path) %>%
select("metaData.id", "commitInfo.operation", "protocol") %>%
showDF()
# +--------------------+------------+--------+
# | id| operation|protocol|
# +--------------------+------------+--------+
# | null|CREATE TABLE| null|
# | null| null| {1, 2}|
# |72d4784b-e656-44a...| null| null|
# +--------------------+------------+--------+
dlt_for_path("/tmp/key-val") %>%
dlt_upgrade_table_protocol(1, 3)
read.json(key_val_log_path) %>%
select("metaData.id", "commitInfo.operation", "protocol") %>%
showDF()
# +--------------------+----------------+--------+
# | id| operation|protocol|
# +--------------------+----------------+--------+
# | null| CREATE TABLE| null|
# | null| null| {1, 2}|
# |72d4784b-e656-44a...| null| null|
# | null|UPGRADE PROTOCOL| null|
# | null| null| {1, 3}|
# +--------------------+----------------+--------+
This package also provides an interface for layout optimizations. One can compact with
target %>%
repartition(10) %>%
dlt_write("/tmp/target-optimize-compact", mode = "overwrite", partitionBy = "key")
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_where("key = 'a'") %>%
dlt_execute_compaction() %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +---------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +---------------------------------------------------------------------------------------------------------------------+
# |{1, 4, {1750, 1750, 1750.0, 1, 1750}, {1633, 1633, 1633.0, 4, 6532}, 1, null, 1, 4, 0, false, 0, 0, 1669291200256, 0}|
# +---------------------------------------------------------------------------------------------------------------------+
or without partition filter.
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_execute_compaction() %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +-----------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +-----------------------------------------------------------------------------------------------------------------------+
# |{2, 8, {1753, 1780, 1766.5, 2, 3533}, {1632, 1633, 1632.75, 8, 13062}, 2, null, 2, 9, 1, false, 0, 0, 1669291227707, 0}|
# +-----------------------------------------------------------------------------------------------------------------------+
Similarly, one can Z-Order the files with
target %>%
repartition(10) %>%
dlt_write("/tmp/target-optimize-zorderby", mode="overwrite", partitionBy="key")
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_where("key = 'a'") %>%
dlt_execute_z_order_by("lat", "long") %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |{1, 1, {1750, 1750, 1750.0, 1, 1750}, {1750, 1750, 1750.0, 1, 1750}, 1, {all, {0, 0}, {1, 1750}, 0, {1, 1750}, 1, null}, 1, 1, 0, false, 0, 0, 1669291309308, 0}|
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
or without partition filter
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_execute_z_order_by("lat", "long") %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |{3, 3, {1750, 1780, 1761.0, 3, 5283}, {1750, 1780, 1761.0, 3, 5283}, 3, {all, {0, 0}, {3, 5283}, 0, {3, 5283}, 3, null}, 3, 3, 0, false, 0, 0, 1669291365663, 0}|
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
Examples use source
and target
datasets as described in tests/testthat/data/README.md
.
Logo based on Yellow wasp, m, left, Kruger National Park, South Africa by USGS Bee Inventory and Monitoring Lab.
Delta is a trademark of the LF Projects LLC. This project is not owned, endorsed or sponsored by the LF Projects LLC.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.